欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Kafka Java Producer代码实例详解

程序员文章站 2022-04-09 16:20:50
根据业务需要可以使用kafka提供的java producer api进行产生数据,并将产生的数据发送到kafka对应topic的对应分区中,入口类为:producerkafka的producer a...

根据业务需要可以使用kafka提供的java producer api进行产生数据,并将产生的数据发送到kafka对应topic的对应分区中,入口类为:producer

kafka的producer api主要提供下列三个方法:

  •   public void send(keyedmessage<k,v> message) 发送单条数据到kafka集群
  •   public void send(list<keyedmessage<k,v>> messages) 发送多条数据(数据集)到kafka集群
  •   public void close() 关闭kafka连接资源

一、javakafkaproducerpartitioner:自定义的数据分区器,功能是:决定输入的key/value键值对的message发送到topic的那个分区中,返回分区id,范围:[0,分区数量); 这里的实现比较简单,根据key中的数字决定分区的值。具体代码如下:

import kafka.producer.partitioner;
import kafka.utils.verifiableproperties;

/**
 * created by gerry on 12/21.
 */
public class javakafkaproducerpartitioner implements partitioner {

  /**
   * 无参构造函数
   */
  public javakafkaproducerpartitioner() {
    this(new verifiableproperties());
  }

  /**
   * 构造函数,必须给定
   *
   * @param properties 上下文
   */
  public javakafkaproducerpartitioner(verifiableproperties properties) {
    // nothings
  }

  @override
  public int partition(object key, int numpartitions) {
    int num = integer.valueof(((string) key).replaceall("key_", "").trim());
    return num % numpartitions;
  }
}

二、 javakafkaproducer:通过kafka提供的api进行数据产生操作的测试类;具体代码如下:

import kafka.javaapi.producer.producer;
import kafka.producer.keyedmessage;
import kafka.producer.producerconfig;
import org.apache.log4j.logger;

import java.util.properties;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.timeunit;
import java.util.concurrent.atomic.atomicboolean;
import java.util.concurrent.threadlocalrandom;

/**
 * created by gerry on 12/21.
 */
public class javakafkaproducer {
  private logger logger = logger.getlogger(javakafkaproducer.class);
  public static final string topic_name = "test";
  public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".tochararray();
  public static final int chartslength = charts.length;


  public static void main(string[] args) {
    string brokerlist = "192.168.187.149:9092";
    brokerlist = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";
    brokerlist = "192.168.187.146:9092";
    properties props = new properties();
    props.put("metadata.broker.list", brokerlist);
    /**
     * 0表示不等待结果返回<br/>
     * 1表示等待至少有一个服务器返回数据接收标识<br/>
     * -1表示必须接收到所有的服务器返回标识,及同步写入<br/>
     * */
    props.put("request.required.acks", "0");
    /**
     * 内部发送数据是异步还是同步
     * sync:同步, 默认
     * async:异步
     */
    props.put("producer.type", "async");
    /**
     * 设置序列化的类
     * 可选:kafka.serializer.stringencoder
     * 默认:kafka.serializer.defaultencoder
     */
    props.put("serializer.class", "kafka.serializer.stringencoder");
    /**
     * 设置分区类
     * 根据key进行数据分区
     * 默认是:kafka.producer.defaultpartitioner ==> 安装key的hash进行分区
     * 可选:kafka.serializer.bytearraypartitioner ==> 转换为字节数组后进行hash分区
     */
    props.put("partitioner.class", "javakafkaproducerpartitioner");

    // 重试次数
    props.put("message.send.max.retries", "3");

    // 异步提交的时候(async),并发提交的记录数
    props.put("batch.num.messages", "200");

    // 设置缓冲区大小,默认10kb
    props.put("send.buffer.bytes", "102400");

    // 2. 构建kafka producer configuration上下文
    producerconfig config = new producerconfig(props);

    // 3. 构建producer对象
    final producer<string, string> producer = new producer<string, string>(config);

    // 4. 发送数据到服务器,并发线程发送
    final atomicboolean flag = new atomicboolean(true);
    int numthreads = 50;
    executorservice pool = executors.newfixedthreadpool(numthreads);
    for (int i = 0; i < 5; i++) {
      pool.submit(new thread(new runnable() {
        @override
        public void run() {
          while (flag.get()) {
            // 发送数据
            keyedmessage message = generatekeyedmessage();
            producer.send(message);
            system.out.println("发送数据:" + message);

            // 休眠一下
            try {
              int least = 10;
              int bound = 100;
              thread.sleep(threadlocalrandom.current().nextint(least, bound));
            } catch (interruptedexception e) {
              e.printstacktrace();
            }
          }

          system.out.println(thread.currentthread().getname() + " shutdown....");
        }
      }, "thread-" + i));

    }

    // 5. 等待执行完成
    long sleepmillis = 600000;
    try {
      thread.sleep(sleepmillis);
    } catch (interruptedexception e) {
      e.printstacktrace();
    }
    flag.set(false);

    // 6. 关闭资源

    pool.shutdown();
    try {
      pool.awaittermination(6, timeunit.seconds);
    } catch (interruptedexception e) {
    } finally {
      producer.close(); // 最后之后调用
    }
  }

  /**
   * 产生一个消息
   *
   * @return
   */
  private static keyedmessage<string, string> generatekeyedmessage() {
    string key = "key_" + threadlocalrandom.current().nextint(10, 99);
    stringbuilder sb = new stringbuilder();
    int num = threadlocalrandom.current().nextint(1, 5);
    for (int i = 0; i < num; i++) {
      sb.append(generatestringmessage(threadlocalrandom.current().nextint(3, 20))).append(" ");
    }
    string message = sb.tostring().trim();
    return new keyedmessage(topic_name, key, message);
  }

  /**
   * 产生一个给定长度的字符串
   *
   * @param numitems
   * @return
   */
  private static string generatestringmessage(int numitems) {
    stringbuilder sb = new stringbuilder();
    for (int i = 0; i < numitems; i++) {
      sb.append(charts[threadlocalrandom.current().nextint(chartslength)]);
    }
    return sb.tostring();
  }
}

三、pom.xml依赖配置如下

<properties>
  <kafka.version>0.8.2.1</kafka.version>
</properties>

<dependencies>
  <dependency>
    <groupid>org.apache.kafka</groupid>
    <artifactid>kafka_2.10</artifactid>
    <version>${kafka.version}</version>
  </dependency>
</dependencies>

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。