Kafka Java Producer代码实例详解
程序员文章站
2022-04-09 16:20:50
根据业务需要可以使用kafka提供的java producer api进行产生数据,并将产生的数据发送到kafka对应topic的对应分区中,入口类为:producerkafka的producer a...
根据业务需要可以使用kafka提供的java producer api进行产生数据,并将产生的数据发送到kafka对应topic的对应分区中,入口类为:producer
kafka的producer api主要提供下列三个方法:
- public void send(keyedmessage<k,v> message) 发送单条数据到kafka集群
- public void send(list<keyedmessage<k,v>> messages) 发送多条数据(数据集)到kafka集群
- public void close() 关闭kafka连接资源
一、javakafkaproducerpartitioner:自定义的数据分区器,功能是:决定输入的key/value键值对的message发送到topic的那个分区中,返回分区id,范围:[0,分区数量); 这里的实现比较简单,根据key中的数字决定分区的值。具体代码如下:
import kafka.producer.partitioner; import kafka.utils.verifiableproperties; /** * created by gerry on 12/21. */ public class javakafkaproducerpartitioner implements partitioner { /** * 无参构造函数 */ public javakafkaproducerpartitioner() { this(new verifiableproperties()); } /** * 构造函数,必须给定 * * @param properties 上下文 */ public javakafkaproducerpartitioner(verifiableproperties properties) { // nothings } @override public int partition(object key, int numpartitions) { int num = integer.valueof(((string) key).replaceall("key_", "").trim()); return num % numpartitions; } }
二、 javakafkaproducer:通过kafka提供的api进行数据产生操作的测试类;具体代码如下:
import kafka.javaapi.producer.producer; import kafka.producer.keyedmessage; import kafka.producer.producerconfig; import org.apache.log4j.logger; import java.util.properties; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.timeunit; import java.util.concurrent.atomic.atomicboolean; import java.util.concurrent.threadlocalrandom; /** * created by gerry on 12/21. */ public class javakafkaproducer { private logger logger = logger.getlogger(javakafkaproducer.class); public static final string topic_name = "test"; public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".tochararray(); public static final int chartslength = charts.length; public static void main(string[] args) { string brokerlist = "192.168.187.149:9092"; brokerlist = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095"; brokerlist = "192.168.187.146:9092"; properties props = new properties(); props.put("metadata.broker.list", brokerlist); /** * 0表示不等待结果返回<br/> * 1表示等待至少有一个服务器返回数据接收标识<br/> * -1表示必须接收到所有的服务器返回标识,及同步写入<br/> * */ props.put("request.required.acks", "0"); /** * 内部发送数据是异步还是同步 * sync:同步, 默认 * async:异步 */ props.put("producer.type", "async"); /** * 设置序列化的类 * 可选:kafka.serializer.stringencoder * 默认:kafka.serializer.defaultencoder */ props.put("serializer.class", "kafka.serializer.stringencoder"); /** * 设置分区类 * 根据key进行数据分区 * 默认是:kafka.producer.defaultpartitioner ==> 安装key的hash进行分区 * 可选:kafka.serializer.bytearraypartitioner ==> 转换为字节数组后进行hash分区 */ props.put("partitioner.class", "javakafkaproducerpartitioner"); // 重试次数 props.put("message.send.max.retries", "3"); // 异步提交的时候(async),并发提交的记录数 props.put("batch.num.messages", "200"); // 设置缓冲区大小,默认10kb props.put("send.buffer.bytes", "102400"); // 2. 构建kafka producer configuration上下文 producerconfig config = new producerconfig(props); // 3. 构建producer对象 final producer<string, string> producer = new producer<string, string>(config); // 4. 发送数据到服务器,并发线程发送 final atomicboolean flag = new atomicboolean(true); int numthreads = 50; executorservice pool = executors.newfixedthreadpool(numthreads); for (int i = 0; i < 5; i++) { pool.submit(new thread(new runnable() { @override public void run() { while (flag.get()) { // 发送数据 keyedmessage message = generatekeyedmessage(); producer.send(message); system.out.println("发送数据:" + message); // 休眠一下 try { int least = 10; int bound = 100; thread.sleep(threadlocalrandom.current().nextint(least, bound)); } catch (interruptedexception e) { e.printstacktrace(); } } system.out.println(thread.currentthread().getname() + " shutdown...."); } }, "thread-" + i)); } // 5. 等待执行完成 long sleepmillis = 600000; try { thread.sleep(sleepmillis); } catch (interruptedexception e) { e.printstacktrace(); } flag.set(false); // 6. 关闭资源 pool.shutdown(); try { pool.awaittermination(6, timeunit.seconds); } catch (interruptedexception e) { } finally { producer.close(); // 最后之后调用 } } /** * 产生一个消息 * * @return */ private static keyedmessage<string, string> generatekeyedmessage() { string key = "key_" + threadlocalrandom.current().nextint(10, 99); stringbuilder sb = new stringbuilder(); int num = threadlocalrandom.current().nextint(1, 5); for (int i = 0; i < num; i++) { sb.append(generatestringmessage(threadlocalrandom.current().nextint(3, 20))).append(" "); } string message = sb.tostring().trim(); return new keyedmessage(topic_name, key, message); } /** * 产生一个给定长度的字符串 * * @param numitems * @return */ private static string generatestringmessage(int numitems) { stringbuilder sb = new stringbuilder(); for (int i = 0; i < numitems; i++) { sb.append(charts[threadlocalrandom.current().nextint(chartslength)]); } return sb.tostring(); } }
三、pom.xml依赖配置如下
<properties> <kafka.version>0.8.2.1</kafka.version> </properties> <dependencies> <dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka_2.10</artifactid> <version>${kafka.version}</version> </dependency> </dependencies>
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。