kafka生产者和消费者的javaAPI的示例代码
程序员文章站
2023-12-18 20:12:46
写了个kafka的java demo 顺便记录下,仅供参考
1.创建maven项目
目录如下:
2.pom文件:
写了个kafka的java demo 顺便记录下,仅供参考
1.创建maven项目
目录如下:
2.pom文件:
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>kafka-maven</groupid> <artifactid>kafka-maven</artifactid> <version>0.0.1-snapshot</version> <dependencies> <dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka_2.11</artifactid> <version>0.10.1.1</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-common</artifactid> <version>2.2.0</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-hdfs</artifactid> <version>2.2.0</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-client</artifactid> <version>2.2.0</version> </dependency> <dependency> <groupid>org.apache.hbase</groupid> <artifactid>hbase-client</artifactid> <version>1.0.3</version> </dependency> <dependency> <groupid>org.apache.hbase</groupid> <artifactid>hbase-server</artifactid> <version>1.0.3</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-hdfs</artifactid> <version>2.2.0</version> </dependency> <dependency> <groupid>jdk.tools</groupid> <artifactid>jdk.tools</artifactid> <version>1.7</version> <scope>system</scope> <systempath>${java_home}/lib/tools.jar</systempath> </dependency> <dependency> <groupid>org.apache.httpcomponents</groupid> <artifactid>httpclient</artifactid> <version>4.3.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-compiler-plugin</artifactid> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>
3.kafka生产者kafkaproduce:
package com.lijie.producer; import java.io.file; import java.io.fileinputstream; import java.util.properties; import org.apache.kafka.clients.producer.callback; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producerrecord; import org.apache.kafka.clients.producer.recordmetadata; import org.slf4j.logger; import org.slf4j.loggerfactory; public class kafkaproduce { private static properties properties; static { properties = new properties(); string path = kafkaproducer.class.getresource("/").getfile().tostring() + "kafka.properties"; try { fileinputstream fis = new fileinputstream(new file(path)); properties.load(fis); } catch (exception e) { e.printstacktrace(); } } /** * 发送消息 * * @param topic * @param key * @param value */ public void sendmsg(string topic, byte[] key, byte[] value) { // 实例化produce kafkaproducer<byte[], byte[]> kp = new kafkaproducer<byte[], byte[]>( properties); // 消息封装 producerrecord<byte[], byte[]> pr = new producerrecord<byte[], byte[]>( topic, key, value); // 发送数据 kp.send(pr, new callback() { // 回调函数 @override public void oncompletion(recordmetadata metadata, exception exception) { if (null != exception) { system.out.println("记录的offset在:" + metadata.offset()); system.out.println(exception.getmessage() + exception); } } }); // 关闭produce kp.close(); } }
4.kafka消费者kafkaconsume:
package com.lijie.consumer; import java.io.file; import java.io.fileinputstream; import java.util.hashmap; import java.util.list; import java.util.map; import java.util.properties; import org.apache.htrace.fasterxml.jackson.databind.objectmapper; import com.lijie.pojo.user; import com.lijie.utils.jsonutils; import kafka.consumer.consumerconfig; import kafka.consumer.consumeriterator; import kafka.consumer.kafkastream; import kafka.javaapi.consumer.consumerconnector; import kafka.serializer.stringdecoder; import kafka.utils.verifiableproperties; public class kafkaconsume { private final static string topic = "lijietest"; private static properties properties; static { properties = new properties(); string path = kafkaconsume.class.getresource("/").getfile().tostring() + "kafka.properties"; try { fileinputstream fis = new fileinputstream(new file(path)); properties.load(fis); } catch (exception e) { e.printstacktrace(); } } /** * 获取消息 * * @throws exception */ public void getmsg() throws exception { consumerconfig config = new consumerconfig(properties); consumerconnector consumer = kafka.consumer.consumer .createjavaconsumerconnector(config); map<string, integer> topiccountmap = new hashmap<string, integer>(); topiccountmap.put(topic, new integer(1)); stringdecoder keydecoder = new stringdecoder(new verifiableproperties()); stringdecoder valuedecoder = new stringdecoder( new verifiableproperties()); map<string, list<kafkastream<string, string>>> consumermap = consumer .createmessagestreams(topiccountmap, keydecoder, valuedecoder); kafkastream<string, string> stream = consumermap.get(topic).get(0); consumeriterator<string, string> it = stream.iterator(); while (it.hasnext()) { string json = it.next().message(); user user = (user) jsonutils.jsontoobj(json, user.class); system.out.println(user); } } }
5.kafka.properties文件
##produce bootstrap.servers=192.168.80.123:9092 producer.type=sync request.required.acks=1 serializer.class=kafka.serializer.defaultencoder key.serializer=org.apache.kafka.common.serialization.bytearrayserializer value.serializer=org.apache.kafka.common.serialization.bytearrayserializer bak.partitioner.class=kafka.producer.defaultpartitioner bak.key.serializer=org.apache.kafka.common.serialization.stringserializer bak.value.serializer=org.apache.kafka.common.serialization.stringserializer ##consume zookeeper.connect=192.168.80.123:2181 group.id=lijiegroup zookeeper.session.timeout.ms=4000 zookeeper.sync.time.ms=200 auto.commit.interval.ms=1000 auto.offset.reset=smallest serializer.class=kafka.serializer.stringencoder
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。