欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka小试

程序员文章站 2024-01-22 13:17:22
...

我使用的kafka版本 kafka_2.8.0-0.8.1.1.tgz

参考了官网手册http://kafka.apache.org/documentation.html#quickstart

和http://blog.csdn.net/hxpjava1/article/details/19160665  版本低一下,里面有些代码不兼容

  1. 下载kafka 地址http://mirrors.hust.edu.cn/apache/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz
tar -xzf kafka_2.9.2-0.8.1.1.tgz
cd kafka_2.9.2-0.8.1.1

    2.启动服务

       首先要启动zookeeper      

bin/zookeeper-server-start.sh config/zookeeper.properties &

      启动kafaka

bin/kafka-server-start.sh config/server.properties &

   3.创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

   查看是否创建成功

bin/kafka-topics.sh --list --zookeeper localhost:2181

  4.发送消息

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class TestProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("metadata.broker.list", "test.kafka.com:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "key", "测试");
        producer.send(data);
        producer.close();
        System.out.println("结束");
    }
}

 5.接收消息

 

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerSample {
	
	public static void main(String[] args) {  
        // specify some consumer properties  
		Properties props = new Properties();  
       props.put("group.id", "test-consumer-group");
	   props.put("zookeeper.connect", "test.kafka.com:2181");
       props.put("zookeeper.session.timeout.ms", "400");
       props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "1000");
		
		// Create the connection to the cluster  
		ConsumerConfig consumerConfig = new ConsumerConfig(props);  
		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
		
		        // create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume  
		HashMap<String, Integer> map = new HashMap<String, Integer>();  
		map.put("test", 4); 
		 Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =  
		        consumerConnector.createMessageStreams(map);  
		List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test");  
		
		        // create list of 4 threads to consume from each of the partitions   
		ExecutorService executor = Executors.newFixedThreadPool(4);  
		
		        // consume the messages in the threads  
		for (final KafkaStream<byte[], byte[]> stream : streams) {  
		    executor.submit(new Runnable() {  
		        public void run() {  
		            for (MessageAndMetadata<byte[], byte[]> msgAndMetadata : stream) { 
		            	System.out.println("topic:"+msgAndMetadata.topic());
		                String tmp = new String(msgAndMetadata.message());
		                System.out.println("message key: " + new String(msgAndMetadata.key())); 
		                System.out.println("message content: " + tmp);  
		            }  
		        }  
		    });  
		}  

	}
}

 

  6.注意的地方

     test.kafka.com  为域名映射,可以自己映射到自己的kafka的ip地址

     如果发送消息失败 看下防火墙是否关闭

    对于group.id可以查看config/consumer.properties的配置

  7.如果出现FailedToSendMessageException: Failed to send messages after 3 tries错误

     修改config/server.properties  链接zookeeper为

    zookeeper.connect=127.0.0.1:2181

    配置的时候最好通过域名映射添加topic

  8.maven配置文件

<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.0.0.Final</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.14</version>
		</dependency>
		<dependency>
		  <groupId>org.apache.kafka</groupId>
		  <artifactId>kafka_2.9.2</artifactId>
		  <version>0.8.1.1</version>
		</dependency> 
		<dependency>
		  <groupId>org.scala-lang</groupId>
		  <artifactId>scala-library</artifactId>
		  <version>2.9.3</version>
		</dependency>
		<dependency>
			<groupId>com.yammer.metrics</groupId>
			<artifactId>metrics-core</artifactId>
			<version>2.2.0</version>
		</dependency>
		<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.3</version>
		</dependency>
	</dependencies>