kafka小试
程序员文章站
2024-01-22 13:12:58
...
我使用的kafka版本 kafka_2.8.0-0.8.1.1.tgz
参考了官网手册http://kafka.apache.org/documentation.html#quickstart
和http://blog.csdn.net/hxpjava1/article/details/19160665 版本低一下,里面有些代码不兼容
- 下载kafka 地址http://mirrors.hust.edu.cn/apache/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz
tar -xzf kafka_2.9.2-0.8.1.1.tgz cd kafka_2.9.2-0.8.1.1
2.启动服务
首先要启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafaka
bin/kafka-server-start.sh config/server.properties &
3.创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看是否创建成功
bin/kafka-topics.sh --list --zookeeper localhost:2181
4.发送消息
import java.util.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("metadata.broker.list", "test.kafka.com:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "key", "测试"); producer.send(data); producer.close(); System.out.println("结束"); } }
5.接收消息
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class ConsumerSample { public static void main(String[] args) { // specify some consumer properties Properties props = new Properties(); props.put("group.id", "test-consumer-group"); props.put("zookeeper.connect", "test.kafka.com:2181"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume HashMap<String, Integer> map = new HashMap<String, Integer>(); map.put("test", 4); Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map); List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test"); // create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool(4); // consume the messages in the threads for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new Runnable() { public void run() { for (MessageAndMetadata<byte[], byte[]> msgAndMetadata : stream) { System.out.println("topic:"+msgAndMetadata.topic()); String tmp = new String(msgAndMetadata.message()); System.out.println("message key: " + new String(msgAndMetadata.key())); System.out.println("message content: " + tmp); } } }); } } }
6.注意的地方
test.kafka.com 为域名映射,可以自己映射到自己的kafka的ip地址
如果发送消息失败 看下防火墙是否关闭
对于group.id可以查看config/consumer.properties的配置
7.如果出现FailedToSendMessageException: Failed to send messages after 3 tries错误
修改config/server.properties 链接zookeeper为
zookeeper.connect=127.0.0.1:2181
配置的时候最好通过域名映射添加topic
8.maven配置文件
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.0.Final</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.9.3</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.3</version> </dependency> </dependencies>
上一篇: tomcat8源码构建