欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka与Spring的集成

程序员文章站 2022-07-13 12:14:28
...
producer
public class KafkaServiceImpl implements KafkaService {

	private Producer<byte[], byte[]> inner;
	
	private Properties properties;

	
	public void setInner(Producer<byte[], byte[]> inner) {
		this.inner = inner;
	}

	public void setProperties(Properties properties) {
		this.properties = properties;
	}

	public void init() throws IOException {
		ProducerConfig config = new ProducerConfig(properties);
		inner = new Producer<byte[], byte[]>(config);
	}
	@Override
	public void sendMessage(String topicName, byte[] message) {
		if (topicName == null || message == null) {
			return;
		}
		KeyedMessage<byte[], byte[]> km = new KeyedMessage<byte[], byte[]>(topicName, "".getBytes(), message);
		inner.send(km);
	}
}


配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:util="http://www.springframework.org/schema/util" 
	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/util
		http://www.springframework.org/schema/util/spring-util-4.0.xsd
		http://www.springframework.org/schema/tx 
		http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
		http://www.springframework.org/schema/context
		http://www.springframework.org/schema/context/spring-context-4.0.xsd
		http://www.springframework.org/schema/task
		http://www.springframework.org/schema/task/spring-task-4.0.xsd">

	
	<bean id="producerConfig" class="org.springframework.beans.factory.config.PropertiesFactoryBean">  
        <property name="locations">  
            <list>  
                <value>config/properties/producer.properties</value>  
            </list>  
        </property>  
    </bean>
	<bean id="kafkaService" class="com.aiyou.gamecloud.kafka.KafkaServiceImpl" init-method="init"> 
		<property name="properties" ref="producerConfig"/>
	</bean>
</beans>


property
metadata.broker.list=192.168.113.181:9092
producer.type=async  
compression.codec=0  
#serializer.class=kafka.serializer.StringEncoder 
#key.serializer.class=kafka.serializer.StringEncoder

编解码部分如果使用字符串需要设置,默认是字节数组
使用异(async)通信时,消息队列默认发送时间间隔由queue.buffering.max.ms决定(kafka.producer.async.AsyncProducerConfig中),默认时间间隔为5000ms,也就是说异步方式默认每5s发送一次消息

Customer

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;

import com.google.protobuf.InvalidProtocolBufferException;

import kafka.consumer.Consumer;

/**
 * @project: gate
 * @Title: KafkaCustomerService.java
 * @author: chenpeng
 * @email: 46731706@qq.com
 * @date: 2016年1月14日下午1:40:47
 * @description:
 * @version:
 */
public class KafkaCustomerService {
	private static final Logger logger = LoggerFactory.getLogger(KafkaCustomerService.class);
	private static final String CONFIG = "kafka-customer-config.xml";
	private static Random rand = new Random();

	public static void main(String[] args) {

		final ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer.class);
		ctx.start();

		final QueueChannel channel = ctx.getBean("inputFromKafka", QueueChannel.class);
		Message msg;
		while ((msg = channel.receive()) != null) {
			HashMap map = (HashMap) msg.getPayload();

			System.out.println("Here in  disb ================" + map.size());
			Set<Map.Entry> set = map.entrySet();

			for (Map.Entry entry : set) {
				String topic = (String) entry.getKey();
				System.out.println("Topic:" + topic);
				ConcurrentHashMap<Integer, List<byte[]>> messages = (ConcurrentHashMap<Integer, List<byte[]>>) entry
						.getValue();
				Collection<List<byte[]>> values = messages.values();

				for (Iterator<List<byte[]>> iterator = values.iterator(); iterator.hasNext();) {
					List<byte[]> list = iterator.next();
					System.out.println("================" + list.size());
					for (byte[] bytes : list) {
						try {
							BroadcastMessage message = BroadcastMessage.parseFrom(bytes);
							logger.debug(message.getGameId());
						} catch (InvalidProtocolBufferException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}

					}

				}

			}

		}

		try {
			Thread.sleep(100000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		ctx.close();
	}
}



配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
	xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/integration
  http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
  http://www.springframework.org/schema/integration/kafka
  http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
  http://www.springframework.org/schema/integration/stream
  http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

	<int:channel id="inputFromKafka" ><int:queue/></int:channel>

	<int-kafka:inbound-channel-adapter
		id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
		auto-startup="false" channel="inputFromKafka">
		<int:poller fixed-delay="10" time-unit="MILLISECONDS"
			max-messages-per-poll="50" />
	</int-kafka:inbound-channel-adapter>
	

	<int-kafka:consumer-context id="consumerContext"
		consumer-timeout="10" zookeeper-connect="zookeeperConnect" >
		<int-kafka:consumer-configurations>
			<int-kafka:consumer-configuration
				group-id="default" max-messages="5000">
				<int-kafka:topic id="websocket_01" streams="1" />
			</int-kafka:consumer-configuration>
		</int-kafka:consumer-configurations>
	</int-kafka:consumer-context>
	<int-kafka:zookeeper-connect id="zookeeperConnect"
		zk-connect="192.168.113.181:2121" zk-connection-timeout="6000"
		zk-session-timeout="6000" zk-sync-time="200" />

</beans:beans>


这里需要注意两个参数:
fixed-delay:即从上一个任务完成开始到下一个任务开始的间隔,单位是毫秒。即每次sleep的时间间隔。
fixed-rate: 即从上一个任务开始到下一个任务开始的间隔,单位是毫秒。即每次获取消息的时间间隔。
consumer-timeout:如果在指定的时间间隔后,没有发现可用的消息可消费,则抛出一个timeout异常,我的理解是处理完一个消息后等待fixed-delay+consumer-timeout时间间隔,如果还没消息就重连(不知道理解的对不对,不过实验证明将consumer-timeout值修改后会影响接收消息的频率)

结合spring,还可以使用Spring Integration方式进行配置

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
	xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/integration
  http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
  http://www.springframework.org/schema/integration/kafka
  http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
  http://www.springframework.org/schema/integration/stream
  http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

	<int:channel id="inputFromKafka"></int:channel>

	<int:service-activator auto-startup="true"
		input-channel="inputFromKafka" ref="disbService" method="distribute">
	</int:service-activator>


	<int:poller default="true" id="default" fixed-rate="5"
		time-unit="MILLISECONDS">
	</int:poller>

	<int-kafka:inbound-channel-adapter
		kafka-consumer-context-ref="consumerContext" channel="inputFromKafka">
	</int-kafka:inbound-channel-adapter>

	<int-kafka:consumer-context id="consumerContext"
		consumer-timeout="5" zookeeper-connect="zookeeperConnect">
		<int-kafka:consumer-configurations>
			<int-kafka:consumer-configuration
				group-id="default" max-messages="5000">
				<int-kafka:topic id="${gateId}" streams="1" />
			</int-kafka:consumer-configuration>
		</int-kafka:consumer-configurations>
	</int-kafka:consumer-context>
	<int-kafka:zookeeper-connect id="zookeeperConnect"
		zk-connect="192.168.113.181:2121" zk-connection-timeout="6000"
		zk-session-timeout="6000" zk-sync-time="2000" />

</beans:beans>


service实现

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.google.protobuf.InvalidProtocolBufferException;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;


@Service("disbService")
public class DisbService {
	private static final Logger logger = LoggerFactory.getLogger(DisbService.class);

	@Value("#{configProperties['server.requestType']}")
	private String requestType = ERequestType.SOCKET.getValue();
	@Autowired
	private CacheService cacheService;

	@SuppressWarnings({ "rawtypes", "unchecked" })
	public void distribute(HashMap map) {
		System.out.println("Here in  disb ================" + map.size());
		Set<Map.Entry> set = map.entrySet();

		for (Map.Entry entry : set) {
			String topic = (String) entry.getKey();
			logger.debug("Topic:" + topic);
			ConcurrentHashMap<Integer, List<byte[]>> messages = (ConcurrentHashMap<Integer, List<byte[]>>) entry
					.getValue();
			Collection<List<byte[]>> values = messages.values();

			for (Iterator<List<byte[]>> iterator = values.iterator(); iterator.hasNext();) {
				List<byte[]> list = iterator.next();
				System.out.println("================" + list.size());
				for (byte[] bytes : list) {
					// 这里获取到的是广播的信息
					try {
						if (ERequestType.HTTP.getValue().equals(requestType)) {
							// 缓存起来!!!
						} else {
							Message.BroadcastMessage message = Message.BroadcastMessage.parseFrom(bytes);
							if (message.getUserIdsList().isEmpty()) {
								List<ChannelCache> channelList = cacheService.getGameChannelList(topic,
										message.getGameId(), message.getServerId());
								for (ChannelCache channelCache : channelList) {
									sendMessage(channelCache, message);
								}
							}

							for (String userId : message.getUserIdsList()) {
								ChannelCache channelCache = cacheService.getCachedChannel(message.getGameId(),
										message.getServerId(), userId);
								sendMessage(channelCache, message);
							}
						}

					} catch (InvalidProtocolBufferException e) {
						e.printStackTrace();
					}

				}

			}

		}
	}

	private void sendMessage(ChannelCache channelCache, Message.BroadcastMessage message) {
		if (channelCache != null) {
			switch (ERequestType.parse(requestType)) {
			case HTTP:
				break;
			case SOCKET:
				try {
					if (channelCache.getChannel().isActive()) {
						ByteBuf messageData = Unpooled.buffer();
						messageData.writeInt(message.getMessage().toByteArray().length);
						messageData.writeBytes(message.getMessage().toByteArray());
						channelCache.getChannel().writeAndFlush(messageData).sync();
					} else {
						cacheService.userLogout(channelCache.getChannel().hashCode());
					}

				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				break;

			case WEBSOCKET:
				try {
					if (channelCache.getChannel().isActive()) {
						ByteBuf messageData = Unpooled.buffer();
						messageData.writeInt(message.getMessage().toByteArray().length);
						messageData.writeBytes(message.getMessage().toByteArray());
						channelCache.getChannel().writeAndFlush(new BinaryWebSocketFrame(messageData)).sync();
					} else {
						cacheService.userLogout(channelCache.getChannel().hashCode());
					}

				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			default:
				break;

			}
		}
	}

}