欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

kafka生产者和消费者的javaAPI的示例代码

程序员文章站 2023-12-18 20:12:46
写了个kafka的java demo 顺便记录下,仅供参考 1.创建maven项目 目录如下: 2.pom文件:

写了个kafka的java demo 顺便记录下,仅供参考

1.创建maven项目

目录如下:

kafka生产者和消费者的javaAPI的示例代码

2.pom文件:

<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
  xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelversion>4.0.0</modelversion>
  <groupid>kafka-maven</groupid>
  <artifactid>kafka-maven</artifactid>
  <version>0.0.1-snapshot</version>
  <dependencies>
    <dependency>
      <groupid>org.apache.kafka</groupid>
      <artifactid>kafka_2.11</artifactid>
      <version>0.10.1.1</version>
    </dependency>
    <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-common</artifactid>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-hdfs</artifactid>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-client</artifactid>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupid>org.apache.hbase</groupid>
      <artifactid>hbase-client</artifactid>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupid>org.apache.hbase</groupid>
      <artifactid>hbase-server</artifactid>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-hdfs</artifactid>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupid>jdk.tools</groupid>
      <artifactid>jdk.tools</artifactid>
      <version>1.7</version>
      <scope>system</scope>
      <systempath>${java_home}/lib/tools.jar</systempath>
    </dependency>
    <dependency>
      <groupid>org.apache.httpcomponents</groupid>
      <artifactid>httpclient</artifactid>
      <version>4.3.6</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupid>org.apache.maven.plugins</groupid>
        <artifactid>maven-compiler-plugin</artifactid>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

3.kafka生产者kafkaproduce:

package com.lijie.producer;

import java.io.file;
import java.io.fileinputstream;
import java.util.properties;

import org.apache.kafka.clients.producer.callback;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;
import org.slf4j.logger;
import org.slf4j.loggerfactory;

public class kafkaproduce {
  private static properties properties;

  static {
    properties = new properties();
    string path = kafkaproducer.class.getresource("/").getfile().tostring()
        + "kafka.properties";
    try {
      fileinputstream fis = new fileinputstream(new file(path));
      properties.load(fis);
    } catch (exception e) {
      e.printstacktrace();
    }
  }

  /**
   * 发送消息
   * 
   * @param topic
   * @param key
   * @param value
   */
  public void sendmsg(string topic, byte[] key, byte[] value) {

    // 实例化produce
    kafkaproducer<byte[], byte[]> kp = new kafkaproducer<byte[], byte[]>(
        properties);

    // 消息封装
    producerrecord<byte[], byte[]> pr = new producerrecord<byte[], byte[]>(
        topic, key, value);

    // 发送数据
    kp.send(pr, new callback() {
      // 回调函数
      @override
      public void oncompletion(recordmetadata metadata,
          exception exception) {
        if (null != exception) {
          system.out.println("记录的offset在:" + metadata.offset());
          system.out.println(exception.getmessage() + exception);
        }
      }
    });

    // 关闭produce
    kp.close();
  }
}

4.kafka消费者kafkaconsume:

package com.lijie.consumer;

import java.io.file;
import java.io.fileinputstream;
import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.properties;

import org.apache.htrace.fasterxml.jackson.databind.objectmapper;

import com.lijie.pojo.user;
import com.lijie.utils.jsonutils;

import kafka.consumer.consumerconfig;
import kafka.consumer.consumeriterator;
import kafka.consumer.kafkastream;
import kafka.javaapi.consumer.consumerconnector;
import kafka.serializer.stringdecoder;
import kafka.utils.verifiableproperties;

public class kafkaconsume {

  private final static string topic = "lijietest";

  private static properties properties;

  static {
    properties = new properties();
    string path = kafkaconsume.class.getresource("/").getfile().tostring()
        + "kafka.properties";
    try {
      fileinputstream fis = new fileinputstream(new file(path));
      properties.load(fis);
    } catch (exception e) {
      e.printstacktrace();
    }
  }

  /**
   * 获取消息
   * 
   * @throws exception
   */
  public void getmsg() throws exception {
    consumerconfig config = new consumerconfig(properties);

    consumerconnector consumer = kafka.consumer.consumer
        .createjavaconsumerconnector(config);

    map<string, integer> topiccountmap = new hashmap<string, integer>();

    topiccountmap.put(topic, new integer(1));

    stringdecoder keydecoder = new stringdecoder(new verifiableproperties());

    stringdecoder valuedecoder = new stringdecoder(
        new verifiableproperties());

    map<string, list<kafkastream<string, string>>> consumermap = consumer
        .createmessagestreams(topiccountmap, keydecoder, valuedecoder);

    kafkastream<string, string> stream = consumermap.get(topic).get(0);

    consumeriterator<string, string> it = stream.iterator();

    while (it.hasnext()) {
      string json = it.next().message();
      user user = (user) jsonutils.jsontoobj(json, user.class);
      system.out.println(user);
    }
  }
}

5.kafka.properties文件

##produce
bootstrap.servers=192.168.80.123:9092
producer.type=sync
request.required.acks=1
serializer.class=kafka.serializer.defaultencoder
key.serializer=org.apache.kafka.common.serialization.bytearrayserializer
value.serializer=org.apache.kafka.common.serialization.bytearrayserializer
bak.partitioner.class=kafka.producer.defaultpartitioner
bak.key.serializer=org.apache.kafka.common.serialization.stringserializer
bak.value.serializer=org.apache.kafka.common.serialization.stringserializer

##consume
zookeeper.connect=192.168.80.123:2181 
group.id=lijiegroup 
zookeeper.session.timeout.ms=4000 
zookeeper.sync.time.ms=200 
auto.commit.interval.ms=1000 
auto.offset.reset=smallest 
serializer.class=kafka.serializer.stringencoder 

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

上一篇:

下一篇: