开源组件系列(4):分布式消息队列(Kafka)
目录
(一)消息队列概述
不论是系统产生的数据日志,还是对应的数据系统,从来都不是单一的对应关系,而是多种数据日志对应多套数据系统的复杂关联。假设我们将采集的数据日志直接传输到目标数据系统中,一旦因为业务扩展而产生的新的数据系统建设需求,那么依赖关系势必变得非常混乱。
传统日志采集方式存在如下几个问题:
1.数据日志与数据系统之间的耦合度太高,当数据日志或者数据系统需要扩展时,需要修改非常多的依赖关系;
2.数据日志的产生速度与数据系统的处理速度不对等,如果遇到秒杀等场景,很容易引起系统崩溃;
3.依赖关系众多导致并发很高,对于网络压力影响很大,容易成为系统瓶颈。
为了降低数据日志和数据系统之间的耦合性,因此设计了消息队列,成为两者之间的“中间件”。
以Kafka为代表的消息队列有如下几方面的有点:
1.缓冲数据压力:当数据日志短时间内增加较多时,消息队列能够将数据系统无法处理的部分缓冲期来,防止系统压力过大;
2.降低耦合度:消息队列支持生产者/消费者模式,支持下游订阅数据,如果需要新增数据日志或数据系统,只需要修改配置文件,不需要修改系统代码;
3.优秀的扩展性:消息队列多采用分布式架构设计,数据经过分片同时写到多个节点中,避免单节点的瓶颈问题,并在秒杀等场景时提供动态扩展能力;
4.良好的容错性:数据日志在Kafka中会持久化到磁盘上,并通过分布式的多副本策略来避免数据丢失。
(二)Kafka基本架构
Kafka作为一个集群中间件,需要运行在一台或者多台服务器上,Kafka通过Topic对存储的流数据进行分类,每条记录中包含一个Key,一个Value和一个Timestamp。在运行时,Producer将数据写入到Broker中,由Broker负责构建分布式的消息存储系统了,将消息划分为多个Topic,然后再由Consumer从Broker读取数据并进行处理。Kafka采用了push – pull的架构,即收到数据后,直接将数据push给对应的Broker,再由Consumer从Broker中将数据pull出来。
(三)Kafka组件介绍
Kafka主要由Producer、Broker、Consumer及Zookeeper组成。相关组件的介绍如下:
1.Producer:
由用户使用Kafka相关的SDK进行开发,Producer负责将数据发送给Broker。在Kafka中,每条数据被称为一个“消息”,由“三元组”组成。“三元组”包括:Topic、Key及Message。
(1)Topic:表示该条消息所述的Topic,是一种逻辑上的切分概念,一个Topic可以分给多个不同的Broker;
(2)Key:表示该条消息的主键,Kafka会根据每条数据的Key将消息分到不同的分区(Partition)中,默认是哈希取模的算法,用户也可以自行定义相关的分区算法;
(3)Message:表示该条消息的值,通常为字节数组,也可以使用String、JSON、Avro、Thrif、Protobuf等结构。
2.Broker:
在分布式的Kafka中,出于容错的考虑,Broker一般有多个,负责接收Producer和Consumer的请求,并将消息持久化到本次磁盘。Broker以Topic为单位将消息分成不同的分区(Partition),每个分区可以有多个副本,通过数据冗余的方式来实现容错。当分区(Partition)存在多个副本时,其中会有一个Leader,对外提供读写请求,其他的都是Follower,不提供读写服务,只是同步Leader数据,并且在Leader出问题时,选出一个成为新的Leader。这种容错方式与Mysql的主备比较类似。
Broker能够保证统一Topic下的同一Partition内消息是有序的,但无法保证Partition之间全局有序。这意味着,Comsumer在消费某个Topic下的消息时,可能得到与写入顺序不同的消息序列。
Brokder以追加的方式将消息写到磁盘中,并且每个分区中的消息被赋予了唯一整数标识,称之为偏移量(Offset)。Broker只提供基于Offset的读取方式,并不会维护各个Consumer当前已消费的Offset值,而是由Consumer各自维护当前读取的进度。Broker中保存的数据是有有效期的,比如7天,一旦超过了有效期,对应的数据将被释放以释放磁盘空间。只要数据在有效期内,Consumer可以重复读取而不受限制。
3.Consumer:
负责从Broker中拉取消息并进行处理,每个Consumer维护最后一个已读消息的Offset,并在下次请求开始时从这个Offset开始读取消息,这种机制使得Broker的吞吐效率很高。值得注意的是,Kafka允许多个Consumer构成一个Consumer Group,共同读取一个Topic中的数据。
4.Zookeeper:
Zookeeper负责提供分布式的协调服务,所有Broker会向Zookeeper进行注册,并汇报相关状态,使Consumer及时获取这些数据。当一个Consumer宕机后,其他Consumer会通过Zookeeper发现这一故障,并自动分摊对应的数据负载,触发容错机制。
(四)Kafka关键技术点
1.提供可控的可靠性级别:
Producer可通过两种方式向Broker发送数据:同步或异步,其中异步方式通过批处理来处理数据,大大提高了数据的写入效率。当Producer向Broker发送数据时,可通过设置该数据的应答方式,控制写性能与可靠性级别。当可靠性级别提升时,写性能会下降;反之,可靠性级别下降时,写性能会提高很多。Kafka提供三种消息应答方式:
0:无需对消息进行确认,Producer发送消息后马上返回,无需等待对方写入成功;
1:当Producer发送消息后,需要等到Leader Partition写成功后才会返回,但对应的Follower Partition不一定写成功,这种方式属于性能可靠性比较折中的一种方式,能够在比较高效的情况下,保证数据至少成功写入一个节点;
2:当Producer发送消息后,需要等到所有的Partition写成功后才返回,如果设置的消息副本数大于1,意味着被成功写入了多个节点,可靠性很高,但写性能比较低。
2.数据多副本:
Broker允许为每个Topic中的数据存放多个副本,以达到容错的目的。Kafka采用了强一致的数据复制策略。在数据存入时,会首先写入到Leader Partition,之后由Leader Partition将消息同步给其他副本。Broker的负载均衡实际上就是对Leader Partition的负载均衡,即保证Leader Partition在各个Broker上数据尽可能相近。
3.高效的持久化机制:
为了应对大数据的应用场景,Broker直接将消息持计划到磁盘上而不是内存中,这就要求必须采用非常高效的数据写入和存储方式。由于顺序写入的速度要远高于随机写,因此Kafka用顺序写配合Offset的方式组织数据,能够达到很好的读写速度。
4.数据传输优化:
为了优化Broker与Consumer之间的网络数据传输效率,Kafka引入了比较多的优化技术,最典型的是批处理和Zero-copy。
批处理:为了降低单条消息传输带来的网络开销,Broker将多条消息组装在一起,一并发送给Consumer,并且将格式进行了统一设计,保证了数据存储和发送时的一致,避免额外转换带来的开销。
Zero-copy:一条数据在磁盘上从读取到发送需要经过四次拷贝与两次系统调用,四次拷贝顺序依次为:内核态reader buffer – 用户态应用程序buffer – 内核态socket buffer – 网卡NIC buffer,通过Zero-copy优化之后,数据只需要经过三次拷贝便可以发送出去,省去了用户态应用程序buffer的过程。
5.可控的消息传递语义:
在消息队列中,根据接受者可能受到的重复消息次数,消息传递语义可以分为三种:
1.at most once:发送者将消息发送给消费者后,立即返回,不关心消费者是否成功收到消息;
2.at least once:发送者将消息发送给消费者后,等待确认,如果未收到确认消息,则会重发消息;
3.exactly once:消费者会且只会收到同一条消息一次,通常有两种方式实现这种语义:两段锁协议和支持幂等操作。
(五)Kafka示例任务
Step1:下载代码
tar -xzf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0
Step2:启动服务器
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
Step3:创建一个Topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
Step4:发送一些消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
Step5:启动一个Consumer
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
Step6:设置多代理集群
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Java Producer示例
public class Producer {
public static String topic = "duanjt_test";//定义主题
public static void main(String[] args) throws InterruptedException {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.23.76:9092,192.168.23.77:9092");//kafka地址,多个地址用逗号分割
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
try {
while (true) {
String msg = "Hello," + new Random().nextInt(100);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
kafkaProducer.send(record);
System.out.println("消息发送成功:" + msg);
Thread.sleep(500);
}
} finally {
kafkaProducer.close();
}
}
}
Java Consumer示例
public class Consumer {
public static void main(String[] args) {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.23.76:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));// 订阅消息
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
record.topic(), record.offset(), record.value()));
}
}
}
}
上一篇: 移动端自适应解决方法小结
下一篇: CSS初探学习总结提高 三