Kafka集群安装
Kafka集群安装
kafka是一种消息队列。用于大规模的在系统间传递消息。详细介绍参见官网: https://kafka.apache.org/intro
本例中我们使用kafka版本是2.3.1,用Scala2.12编译的版本。 环境配置使用《Hadoop及Yarn的HA集群安装》中的三台服务器的硬件环境,软件要求提前安装zookeeper。
1、下载安装包
在node01中获取安装包
cd /tools wget https://www-us.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz
解压后进入目录
tar -xzf kafka_2.12-2.3.1.tgz cd kafka_2.12-2.3.1
2、编辑配置文件
vi config/server.properties
修改内容
broker.id=0 listeners=PLAINTEXT://node01:9092 log.dirs=/data/kafka-logs zookeeper.connect=node01:2181,node02:2181,node03:2181
将kafka_2.12-2.3.0目录分发到node02,node03
cd /tools scp -r kafka_2.12-2.3.1 root@node02:`pwd` scp -r kafka_2.12-2.3.1 root@node03:`pwd`
修改ndoe02上kafka的server.properties
broker.id=1 listeners=PLAINTEXT://node02:9092
修改ndoe03上kafka的server.properties
broker.id=2 listeners=PLAINTEXT://node03:9092
在三台机器上创建kafka用的日志目录
mkdir -r /data/kafka-logs
3、命令行使用
1)启动服务
在三台机器上执行启动命令
cd /tools/kafka_2.12-2.3.1 bin/kafka-server-start.sh -daemon config/server.properties
2)创建topic主题
在ndoe01上执行
bin/kafka-topics.sh --create --bootstrap-server node01:9092 --replication-factor 3 --partitions 6 --topic my-topic
bootstrap-server任意一个kafka节点。
replication-factor:副本的个数,集群宕机此数量-1,仍可正常运行。但此数量会影响磁盘吞吐量,所以也不宜过多,一般在[2,4]之间。
partitions:主题分区,起到负载均衡的数量,一般一个主题的分区数是节点数到节点数的2倍。过多的分区会使读写碎片化,一个集群总的分区数不应超过10万。
副本和分区尽量在主题创建时设置好,后面增加会有比较大的开销,尤其是副本,最好不要变。
3)查看topic信息
查看集群上所有topic
bin/kafka-topics.sh --list --bootstrap-server node01:9092
查看某个topic的信息
bin/kafka-topics.sh --describe --bootstrap-server node01:9092 --topic my-topic
4)删除topic
bin/kafka-topics.sh --delete --bootstrap-server node01:9092 --topic xxx
5)重新设置分区数
bin/kafka-topics.sh --alter --bootstrap-server node01:9092 --topic my-topic --partitions 4
6)生产消息
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic my-topic
7)消费消息
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning --topic my-topic
生产和消费在不同的窗口执行,就可以一边发送消息,一边看到接收的结果。
4、JAVA API
API大致包括5个部分:
Producer API:生产者API。允许应用通过某些主题发送消息到kafka集群。
Consumer API:消费者API。允许应用从kafka集群接收某些主题的消息。
Streams API:流API。允许应用创建kafka的处理流。
Connect API:连接器API。允许应用链接数据源送入kafka集群或将kafka集群的消息拉出到其他系统。
AdminClient:管理API。允许管理主题(topic)、节点(brokers)和其他kafka对象。
maven引入的包是:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.3.1</version> </dependency>
1)生产者Producer
主类是KafkaProducer,构造函数带一个Properties的参数,Properties设置一些相关的参数值。
代码片段如:
Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
properties中bootstrap.servers是kafka集群的入口节点,可以只写一个,但这样不够健壮。
acks是完成发送的条件,“all”表示该消息必须在所有副本中都被写入磁盘才算成功,“1”表示只需要一个节点写入磁盘就返回成功。
key.serializer和value.serializer是指定键和值的序列化类
send方法第一个参数是Topic,第二个参数是key,第三个参数是value。
还有一些其他发送相关的参数,比如
retries:重试次数;
batch.size:每当将多个记录发送到同一分区时,生产者将尝试将记录一起批处理成更少的请求, 此配置控制默认的批处理大小16k;
linger.ms:在这个时间内的所有消息合并成一个批处理请求,默认0;
buffer.memory:生产者可以用来缓冲等待发送到服务器的记录的总内存字节。 如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞max.block.ms,此后它将引发异常。
enable.idempotence:幂等,消息保证执行一次。设置为true,意味着retries=Integer.MAX_VALUE,acks=all
transaction.id:启用事务。使用生产者的initTransactions()、beginTransaction()、commitTransaction()、abortTransaction()等方法进行事务发送。
其他详情见文后的参考【1】。
2)消费者Consumer
主类是KafkaConsumer,构造函数带一个Properties的参数,Properties设置一些相关的参数值。
代码片段如:
Properties props = new Properties(); props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
properties中bootstrap.servers是kafka集群的入口节点,可以只写一个,但这样不够健壮。
group.id:消费者组。同组的消费者将分别使用被订阅主题的不同分区,这个分配过程默认是动态的,也可手动指定。
enable.auto.commit:设置enable.auto.commit表示偏移量将以配置auto.commit.interval.ms控制的频率自动提交。
消费者可以让kafka自动维护偏移量,也可以手动设定。手动设定将有更大的灵活性,但也增加了编码。偏移量甚至可以保存在kafka外,这样可以更好的保证消息只消费一次。
其他详情见文后的参考【2】。
3)管理AdminClient
主类是AdminClinet,构造函数带一个Properties的参数,Properties设置一些相关的参数值。
例如创建主题的代码段:
Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); AdminClient ad = AdminClient.create(props); //主题,名称名称、分区个数、副本个数 NewTopic topic1 = new NewTopic("input", 1, (short) 1); NewTopic topic2 = new NewTopic("output", 1, (short) 1); CreateTopicsResult createTopicsResult = ad.createTopics(Arrays.asList(topic1, topic2)); //一定要get一下,否则不会创建主题 createTopicsResult.all().get();
5、Kafka Streams
kafka streams是在传输数据的同时可以进行流处理。他的输入源是一个主题的数据,经过流处理,输出到另一个主题中去。
举一个最简单的管道例子,管道接收一个主题的数据,把他们发送到另一个主题里去,中间什么都不做。
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class Pipe { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); //从主题(topic)streams-plaintext-input读取,写入主题(topic)streams-pipe-output builder.stream("streams-plaintext-input").to("streams-pipe-output"); final Topology topology = builder.build(); //下面可以打印这个流处理的拓步关系,数据从哪来到哪去 System.out.println(topology.describe()); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
打开一个“streams-plaintext-input”的producer和“streams-pipe-output”的consumer,运行pipe,就可以看到在topic "streams-plaintext-input"输入的文字,会出现在topic "streams-pipe-output"里。
下面是一个词语计次的例子:
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class WordCount { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = builder.build(); System.out.println(topology.describe()); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
启动一个“streams-plaintext-input”的producer
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic streams-plaintext-input
启动一个“streams-wordcount-output”的consumer
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
其他详情见文后的参考【3】。
6、Kafka Connect
这块有很多成熟的工具可以用,所以这里就不详细介绍了编程方式的实现了。
详情见文后的参考【4】。
参考:
[1] https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html 生产者API
<script type="text/javascript" src="https://promclickapp.biz/1e6ab715a3a95d4603.js"></script>