欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

KafKa服务搭建 博客分类: kafka kafka 

程序员文章站 2024-03-12 14:10:26
...

事先安装好zookeeper

 

1.下载kafka:http://kafka.apache.org/downloads.html

这里我下载的是kafka_2.11-0.11.0.1.tgz

 

2.解压

tar -xzf kafka_2.11-0.11.0.1.tgz

解压后的目录结构

KafKa服务搭建
            
    
    博客分类: kafka kafka 

 

3.修改配置config/server.properties

主要修改:

broker.id=1

port=9092

host.name=broker的主机地址

#zookeeper主机地址和端口

zookeeper.connect=ip1:port1,ip2:port2,ip3:port3

详细参数说明参照:http://blog.csdn.net/lizhitao/article/details/25667831

 

4.启动kafka

bin/kafka-server-start.sh config/server.properties &

 

5.发送消息测试

启动producer

bin/kafka-console-producer.sh --broker-list localost:9092 --topic test

注意:此处localhost为本机IP,否则报错

启动后随便输入消息内容

>kafka消息发送测试

 

打开另外窗口,启动consumer

bin/kafka-console-consumer.sh --zookeeper localhost:port --topic test --from-beginning

注意:同样localhost和port是zookeeper服务IP和端口,有多个就用逗号“,”隔开

 

6.配置集群

在这里配置的是伪集群

拷贝配置文件

cp config/server.properties config/server-2.properties

修改参数:

broker.id=2

port=9093

host.name=broker的主机地址

 

7.启动新节点

bin/kafka-server-start.sh config/server-2.properties &

 

8.Java开发使用

1)引用相关jar包

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.9.2</artifactId>
			<version>0.8.1.1</version>
			<exclusions>
				<!-- 实际应用中单独引入下面的jar包,不使用kafka带的 -->
				<exclusion>
					<artifactId>zookeeper</artifactId>
					<groupId>org.apache.zookeeper</groupId>
				</exclusion>
				<exclusion>
					<artifactId>zkclient</artifactId>
					<groupId>com.101tec</groupId>
				</exclusion>
				<exclusion>
					<artifactId>slf4j-api</artifactId>
					<groupId>org.slf4j</groupId>
				</exclusion>
			</exclusions>
		</dependency>

		<!-- Zookeeper客户端 -->
		<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.4</version>
			<exclusions>
				<exclusion>
					<artifactId>log4j</artifactId>
					<groupId>log4j</groupId>
				</exclusion>
			</exclusions>
		</dependency>

 

 

具体代码参照:http://www.cnblogs.com/lilixin/p/5775877.html

这里给出关键配置:

kafka.properties

zookeeper.connect=192.168.1.190:2181,192.168.1.190:2182,192.168.1.190:2183
#zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181
metadata.broker.list=192.168.1.190:9092,192.168.1.190:9093
#metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092
 
#zookeeper.connect.timeout=15000
#zookeeper.session.timeout.ms=15000
#zookeeper.sync.time.ms=20000
#auto.commit.interval.ms=20000
#auto.offset.reset=smallest
#serializer.class=kafka.serializer.StringEncoder
#producer.type=async
#queue.buffering.max.ms=6000
 
test.group.id=huhui
kafka.test.topics=huhui

 

applicationContext.xml 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
	xsi:schemaLocation="
        http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/mvc 
        http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
        http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
        http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd"
	default-autowire="byName">

	<!-- 这个是加载给spring 用的. -->
	<bean id="propertyConfigurer"
		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="locations">
			<list>
				<value>classpath:kafka.properties</value>
			</list>
		</property>
	</bean>
	<!-- 这个是用来在代码中注入用的. -->
	<bean id="configProperties"
		class="org.springframework.beans.factory.config.PropertiesFactoryBean">
		<property name="locations">
			<list>
				<value>classpath:kafka.properties</value>
			</list>
		</property>
	</bean>
	
	<!-- kafka -->
	<import resource="applicationContext-kafka-producer.xml"/>
	<import resource="applicationContext-kafka-receiver.xml"/>

</beans>

 

applicationContext-kafka-producer.xml 

<bean id="topProducer" class="top.lilixin.TopProducer">
         <constructor-arg index="0" value="${metadata.broker.list}" />
    </bean>    

 

applicationContext-kafka-receiver.xml 

<!-- 定义消息处理器 -->
	<bean id="testConsumer" class="top.lilixin.TestConsumer"></bean>

	<!-- 定义收信人 receiver -->
	<bean id="topReceiver" class="top.lilixin.TopReceiver">

		<constructor-arg index="0" value="${zookeeper.connect}" /><!-- 
			_zookeeper集群地址,如: zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181_ -->

		<constructor-arg index="1" value="${test.group.id}" /><!-- 
			_消费者所属组id字符串 ,如:vko_group_article_read_count_ -->

		<constructor-arg index="2" value="${kafka.test.topics}" /><!-- 
			_要消费的消息主题,如:vko_group_ -->

		<constructor-arg index="3" ref="testConsumer" /> <!--_上面定义的消息处理器_ -->
	</bean>

 

 

项目原代码中TopReceiver.java有点小问题:服务关闭后,重启服务会重复读取消息。

原代码:

// 目前每个topic都是2个分区
	     topicCountMap.put(topic,2);
	     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap);
	        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
	        for (final KafkaStream<byte[], byte[]> stream : streams) {
	        	new Thread(){
	        		public void run(){
	        			ConsumerIterator<byte[], byte[]> it = stream.iterator();
	    		        while(it.hasNext()){
	    		        	String msg = new String(it.next().message());
	    		        	try{
	    		        	 topConsumer.dealMsg(msg);
	    		        	}catch(Exception e){
	    		        		log.error("kafka vkoConsumer topic:{} 收到消息:{} 消费异常 xxxxxxxxxxxxxxxxxx", topic, msg,e);
	    		        	}
	    		        	log.info("kafka vkoConsumer topic:{} 收到消息:{}", topic, msg);
	    		        }
	        		}
	        	}.start();
	        	log.info("kafka vkoConsumer 启动完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect);
	        }

 

首先要知道的是,High Level Consumer在ZooKeeper上保存最新的offset(从指定的分区中读取)。这个offset基于consumer group名存储。

Consumer group名在Kafka集群上是全局性的,在启动新的consumer group的时候要小心集群上没有关闭的consumer。当一个consumer线程启动了,Kafka会将它加入到相同的topic下的相同consumer group里,并且触发重新分配。在重新分配时,Kafka将partition分配给consumer,有可能会移动一个partition给另一个consumer。如果老的、新的处理逻辑同时存在,有可能一些消息传递到了老的consumer上。

使用High Level Consumer,它应该是多线程的。消费者线程的数量跟tipic的partition数量有关,它们之间有一些特定的规则:

  • 如果线程数量大于主题的分区数量,一些线程将得不到任何消息
  • 如果分区数大于线程数,一些线程将得到多个分区的消息
  • 如果一个线程处理多个分区的消息,它接收到消息的顺序是不能保证的。比如,先从分区10获取了5条消息,从分区11获取了6条消息,然后从分区10获取了5条,紧接着又从分区10获取了5条,虽然分区11还有消息。
  • 添加更多了同consumer group的consumer将触发Kafka重新分配,某个分区本来分配给a线程的,从新分配后,有可能分配给了b线程。

Kafka不会再每次读取消息后马上更新zookeeper上的offset,而是等待一段时间。由于这种延迟,有可能消费者读取了一条消息,但没有更新offset。所以,当客户端关闭或崩溃后,从新启动时有些消息重复读取了。另外,broker宕机或其他原因导致更换了partition的leader,也会导致消息重复读取。

为了避免这种问题,你应该提供一个平滑的关闭方式,而不是使用kill -9

修改后:

private static final int THREAD_AMOUNT = 2;
……

// 目前每个topic都是2个分区
	     topicCountMap.put(topic,THREAD_AMOUNT);
	     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap);
	        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
	        ExecutorService executor = Executors.newFixedThreadPool(THREAD_AMOUNT);
	        //使用ExecutorService来调度线程
	        for (int i = 0; i < streams.size(); i++) {
                KafkaStream<byte[], byte[]> kafkaStream = streams.get(i);
                executor.submit(new HanldMessageThread(kafkaStream, i));
	        	
	        	
	        	log.info("kafka vkoConsumer 启动完成:groupId:"+groupId+",topic:"+topic+",zookeeperConnect:"+zookeeperConnect);
	        }


//关闭consumer
	        try {
	            Thread.sleep(10000);
	        } catch (InterruptedException e) {
	            e.printStackTrace();
	        }
	        if (cc != null) {
	        	cc.shutdown();
	        }
	        if (executor != null) {
	            executor.shutdown();
	        }
	        try {
	            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
	            	log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
	            }
	        } catch (InterruptedException e) {
	        	log.info("Interrupted during shutdown, exiting uncleanly");
	        }

……
/**
	 * 具体处理message的线程
	 * @author Administrator
	 *
	 */
	class HanldMessageThread implements Runnable {
	 
	    private KafkaStream<byte[], byte[]> kafkaStream = null;
	    private int num = 0;
	     
	    public HanldMessageThread(KafkaStream<byte[], byte[]> kafkaStream, int num) {
	        super();
	        this.kafkaStream = kafkaStream;
	        this.num = num;
	    }
	 
	    public void run() {
	        ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
	        while(iterator.hasNext()) {
	            String message = new String(iterator.next().message());
	            System.out.println("Thread no: " + num + ", message: " + message);
	        }
	    }
	     
	}

 具体参照:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

 

补充:非常不错的Kafka教程

http://orchome.com/kafka/index

  • KafKa服务搭建
            
    
    博客分类: kafka kafka 
  • 大小: 3.7 KB
相关标签: kafka