kafka—生产者API
程序员文章站
2022-06-14 13:42:07
...
kafka—生产者API
maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
入门案例
import com.test.demo.config.KafkaConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* @author bigTree
*/
public class ProducersOne {
public static final String TOPIC = "producers_one";
private static final String PRODUCER_KEY_SER = StringSerializer.class.getName();
private static final String PRODUCER_VALUE_SER = StringSerializer.class.getName();
private static final String BROKER_LIST = "hdp01:9092,hdp02:9092,hdp03:9092";
/**
* 生产者初始化
*/
public static Properties getInitProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", BROKER_LIST);
properties.put("key.serializer", PRODUCER_KEY_SER);
properties.put("value.serializer", PRODUCER_VALUE_SER);
return properties;
}
public static void main(String[] args) {
//基础配置
Properties init = getInitProducer();
//获取生产者
KafkaProducer<String, People> producer = new KafkaProducer<>(init);
//指定消息发送的相关配置
String message = "hello, Tom ";
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC, message);
//发送消息
try{
for (int i = 0; i < 10; i++) {
producer.send(record);
}
}catch (Exception e){
e.printStackTrace();
}finally {
//释放资源
producer.close();
}
}
}
消息发送三种模式
1.发后即忘(fire-and-forget):发送消息不关注是否发送成功。性能最好,可靠性最差
2.异步(async):send方法本身是异步的,使用Callback方式:kafka响应时回调,要么发送成功要么抛出异常
try{
producer.send(record, new Callback() {
//RecordMetadata和Exception是互斥的
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null){
exception.printStackTrace();
}else{
String topic = metadata.topic();
int partition = metadata.partition();
System.out.println("topic:"+ topic +",partition:"+ partition);
}
}
});
}catch (Exception e){
e.printStackTrace();
}finally {
producer.close();
}
3.同步(sync):send返回的Future对象链式调用get(),阻塞式等待返回结果
try {
producer.send(record).get();
////方式一:可设置超时时间
//producer.send(record).get(1000L,MILLISECONDS);
//方式二:可获取一些元数据信息
// Future<RecordMetadata> future = producer.send(record);
// RecordMetadata metadata = future.get();
// String topic = metadata.topic();
// int partition = metadata.partition();
// System.out.println("topic:"+ topic +",partition:"+ partition);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
producer.close();
}
推荐阅读