Kafka的生产者,消费者JavaApi操作实例
程序员文章站
2024-01-14 11:36:28
...
[提前声明]
文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章
写作不易,转载请注明,谢谢!
代码案例地址: ????https://github.com/Mydreamandreality/sparkResearch
Kafka JavaApi代码案例
首先引入我们的maven文件
这里注意version和服务器的kafka版本要一致
<!--分布式消息队列-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
Java编写生产者代码
首先我们先定义生产者需要的一些配置
代码:
具体的配置属性在注视中都有详细的解释
/**
* Created by 張燿峰
* Kafka消费者参数配置
*
* @author 孤
* @date 2019/4/25
* @Varsion 1.0
*/
public class ConstantConsumer {
/**
* 用来唯一标识consumer进程所在组的字符串,
* 如果设置同样的group id,表示这些processes都是属于同一个consumer group
*/
public static String GROUP_ID = "group.id";
/**
* 如果值为真,则为偏移启用自动落实,否则不提交。
*/
public static String ENABLE_AUTO_COMMIT = "enable.auto.commit";
/**
* consumer向zookeeper提交offset的频率,单位是秒
*/
public static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
/**
* zookeeper 会话的超时限制。
* 如果consumer在这段时间内没有向zookeeper发送心跳信息,则它会被认为挂掉了,并且reblance将会产生
*/
public static String SESSION_TIMEOUT_MS = "session.timeout.ms";
/**
* zookeeper中没有初始化的offset时,如果offset是以下值的回应:
* smallest:自动复位offset为smallest的offset
* largest:自动复位offset为largest的offset
* anything else:向consumer抛出异常
*/
public static String AUTO_OFFSET_RESET = "auto.offset.reset";
public static String KEY_DESERIALIZER = "key.deserializer";
public static String VALUE_DESERIALIZER = "value.deserializer";
}
定义好配置后开始编写生产者代码:
**
* Created by 張燿峰
* kafka生产者代码案例
*
* @author 孤
* @date 2019/4/25
* @Varsion 1.0
*/
public class KafkaProducer extends Thread{
//定义kafka生产者
private static org.apache.kafka.clients.producer.KafkaProducer producer;
//定义主题
private static final String TOPIC = "JavaKafka";
//初始化Kafka生产者
public KafkaProducer() {
Properties properties = new Properties();
properties.put(ConstantProducer.BOOTSTRAP_SERVERS, "192.168.253.132:9092");
properties.put(ConstantProducer.ACKS, "all");
properties.put(ConstantProducer.RETRIES, 0);
properties.put(ConstantProducer.BATCH_SIZE, 16385);
properties.put(ConstantProducer.LINGER_MS, 1);
properties.put(ConstantProducer.BUFFER_MEMORY, 33554432);
properties.put(ConstantProducer.KEY_SERIALIZER_CLASS, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ConstantProducer.VALUE_SERIALIZER_CLASS, "org.apache.kafka.common.serialization.StringSerializer");
producer = new org.apache.kafka.clients.producer.KafkaProducer(properties);
}
//生产消息
public void sendProducer() {
for (int i = 0; i < 10; i++) {
String key = String.valueOf(i);
String data = "hello message "+ key;
producer.send(new ProducerRecord<>(TOPIC,key,data));
System.out.println("SUCCESS~");
}
producer.close();
}
public static void main(String[] args) {
new KafkaProducer().sendProducer();
}
}
从上面的代码中可以看到,创建生产者大概分为这么几步
- 首先定义kafka生产者客户端
- 然后定义我们的主题
- 接着配置初始kafka生产者需要使用的参数
- 最后使用
producer.send
生产消息 - 最后记得关闭资源
运行生产者示例如下:
Java编写消费者代码
首先我们先定义消费者需要的一些配置
代码:
具体的配置属性在注视中都有详细的解释
package v1.Constant;
/**
* Created by 張燿峰
* Kafka消费者参数配置
*
* @author 孤
* @date 2019/4/25
* @Varsion 1.0
*/
public class ConstantConsumer {
/**
* 用来唯一标识consumer进程所在组的字符串,
* 如果设置同样的group id,表示这些processes都是属于同一个consumer group
*/
public static String GROUP_ID = "group.id";
/**
* 如果值为真,则为偏移启用自动落实,否则不提交。
*/
public static String ENABLE_AUTO_COMMIT = "enable.auto.commit";
/**
* consumer向zookeeper提交offset的频率,单位是秒
*/
public static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
/**
* zookeeper 会话的超时限制。
* 如果consumer在这段时间内没有向zookeeper发送心跳信息,则它会被认为挂掉了,并且reblance将会产生
*/
public static String SESSION_TIMEOUT_MS = "session.timeout.ms";
/**
* zookeeper中没有初始化的offset时,如果offset是以下值的回应:
* smallest:自动复位offset为smallest的offset
* largest:自动复位offset为largest的offset
* anything else:向consumer抛出异常
*/
public static String AUTO_OFFSET_RESET = "auto.offset.reset";
public static String KEY_DESERIALIZER = "key.deserializer";
public static String VALUE_DESERIALIZER = "value.deserializer";
}
定义好配置后开始编写消费者代码:
package v1;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import v1.Constant.ConstantConsumer;
import v1.Constant.ConstantProducer;
import java.util.Arrays;
import java.util.Properties;
/**
* Created by 張燿峰
* kafka消费者代码案例
*
* @author 孤
* @date 2019/4/25
* @Varsion 1.0
*/
public class KafkaConsumers extends Thread{
//定义kafka消费者
private static KafkaConsumer<String, String> consumer;
//定义主题
private static final String TOPIC = "JavaKafka";
//初始化kafka消费者
public KafkaConsumers() {
Properties properties = new Properties();
properties.put(ConstantProducer.BOOTSTRAP_SERVERS, "192.168.253.132:9092");
properties.put(ConstantConsumer.GROUP_ID, "test-consumer-group");
properties.put(ConstantConsumer.ENABLE_AUTO_COMMIT, "true");
properties.put(ConstantConsumer.AUTO_COMMIT_INTERVAL_MS, 1000);
properties.put(ConstantConsumer.SESSION_TIMEOUT_MS, 30000);
properties.put(ConstantConsumer.AUTO_OFFSET_RESET, "earliest");
properties.put(ConstantConsumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConstantConsumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(properties);
}
//消费消息
public void getConsumers() {
consumer.subscribe(Arrays.asList(TOPIC));
while (true) {
//TODO 成功执行后 输出文档~
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> consumerRecords : records) {
System.out.println("key:" + consumerRecords.key() + ", value: " + consumerRecords.value() + ", topic: " + consumerRecords.topic());
}
}
}
public static void main(String[] args) {
new KafkaConsumers().getConsumers();
}
}
如上代码:整体的逻辑和生产者是一样的
- 定义消费者,定义主题,初始化消费者,消费消息
采坑事项:
-
ConstantConsumer.GROUP_ID
这个参数的value需要和服务器kafka/config/consumer.properties文件中的group.id=test-consumer-group
Value一致,否则会造成消费者无法消费的情况
- 在运行代码前需要保证服务器的zookeeper和kafka服务正常启动
- 运行环境和服务器环境的kafka版本要保持一致:
下一篇: 如何样才能快速得到推荐数据