欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka集群安装

程序员文章站 2022-03-07 20:26:55
...

Kafka集群安装

kafka是一种消息队列。用于大规模的在系统间传递消息。详细介绍参见官网: https://kafka.apache.org/intro

本例中我们使用kafka版本是2.3.1,用Scala2.12编译的版本。 环境配置使用《Hadoop及Yarn的HA集群安装》中的三台服务器的硬件环境,软件要求提前安装zookeeper。

1、下载安装包

在node01中获取安装包

cd /tools
wget https://www-us.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz

解压后进入目录

tar -xzf kafka_2.12-2.3.1.tgz
cd kafka_2.12-2.3.1

2、编辑配置文件

vi config/server.properties

修改内容

broker.id=0
listeners=PLAINTEXT://node01:9092
log.dirs=/data/kafka-logs
zookeeper.connect=node01:2181,node02:2181,node03:2181

将kafka_2.12-2.3.0目录分发到node02,node03

cd /tools
scp -r kafka_2.12-2.3.1 root@node02:`pwd`
scp -r kafka_2.12-2.3.1 root@node03:`pwd`

修改ndoe02上kafka的server.properties

broker.id=1
listeners=PLAINTEXT://node02:9092

修改ndoe03上kafka的server.properties

broker.id=2
listeners=PLAINTEXT://node03:9092

在三台机器上创建kafka用的日志目录

mkdir -r /data/kafka-logs

3、命令行使用

1)启动服务

在三台机器上执行启动命令

cd /tools/kafka_2.12-2.3.1
bin/kafka-server-start.sh -daemon config/server.properties

2)创建topic主题

在ndoe01上执行

bin/kafka-topics.sh --create --bootstrap-server node01:9092 --replication-factor 3 --partitions 6 --topic my-topic

bootstrap-server任意一个kafka节点。

replication-factor:副本的个数,集群宕机此数量-1,仍可正常运行。但此数量会影响磁盘吞吐量,所以也不宜过多,一般在[2,4]之间。

partitions:主题分区,起到负载均衡的数量,一般一个主题的分区数是节点数到节点数的2倍。过多的分区会使读写碎片化,一个集群总的分区数不应超过10万。

副本和分区尽量在主题创建时设置好,后面增加会有比较大的开销,尤其是副本,最好不要变。

3)查看topic信息

查看集群上所有topic

bin/kafka-topics.sh --list --bootstrap-server node01:9092

查看某个topic的信息

bin/kafka-topics.sh --describe --bootstrap-server node01:9092 --topic my-topic

4)删除topic

bin/kafka-topics.sh --delete --bootstrap-server node01:9092 --topic xxx

5)重新设置分区数

bin/kafka-topics.sh --alter --bootstrap-server node01:9092 --topic my-topic --partitions 4

6)生产消息

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic my-topic

7)消费消息

bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning --topic my-topic

生产和消费在不同的窗口执行,就可以一边发送消息,一边看到接收的结果。

4、JAVA API

API大致包括5个部分:

Producer API:生产者API。允许应用通过某些主题发送消息到kafka集群。

Consumer API:消费者API。允许应用从kafka集群接收某些主题的消息。

Streams API:流API。允许应用创建kafka的处理流。

Connect API:连接器API。允许应用链接数据源送入kafka集群或将kafka集群的消息拉出到其他系统。

AdminClient:管理API。允许管理主题(topic)、节点(brokers)和其他kafka对象。

maven引入的包是:

<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>2.3.1</version>
</dependency>
<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-streams</artifactId>    <version>2.3.1</version>
</dependency>

1)生产者Producer

主类是KafkaProducer,构造函数带一个Properties的参数,Properties设置一些相关的参数值。

代码片段如:

 Properties props = new Properties();
 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

properties中bootstrap.servers是kafka集群的入口节点,可以只写一个,但这样不够健壮。

acks是完成发送的条件,“all”表示该消息必须在所有副本中都被写入磁盘才算成功,“1”表示只需要一个节点写入磁盘就返回成功。

key.serializer和value.serializer是指定键和值的序列化类

send方法第一个参数是Topic,第二个参数是key,第三个参数是value。

还有一些其他发送相关的参数,比如

retries:重试次数;

batch.size:每当将多个记录发送到同一分区时,生产者将尝试将记录一起批处理成更少的请求, 此配置控制默认的批处理大小16k;

linger.ms:在这个时间内的所有消息合并成一个批处理请求,默认0;

buffer.memory:生产者可以用来缓冲等待发送到服务器的记录的总内存字节。 如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞max.block.ms,此后它将引发异常。

enable.idempotence:幂等,消息保证执行一次。设置为true,意味着retries=Integer.MAX_VALUE,acks=all

transaction.id:启用事务。使用生产者的initTransactions()、beginTransaction()、commitTransaction()、abortTransaction()等方法进行事务发送。

其他详情见文后的参考【1】。

2)消费者Consumer

主类是KafkaConsumer,构造函数带一个Properties的参数,Properties设置一些相关的参数值。

代码片段如:

     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
     props.setProperty("group.id", "test");
     props.setProperty("enable.auto.commit", "true");
     props.setProperty("auto.commit.interval.ms", "1000");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

properties中bootstrap.servers是kafka集群的入口节点,可以只写一个,但这样不够健壮。

group.id:消费者组。同组的消费者将分别使用被订阅主题的不同分区,这个分配过程默认是动态的,也可手动指定。

enable.auto.commit:设置enable.auto.commit表示偏移量将以配置auto.commit.interval.ms控制的频率自动提交。

消费者可以让kafka自动维护偏移量,也可以手动设定。手动设定将有更大的灵活性,但也增加了编码。偏移量甚至可以保存在kafka外,这样可以更好的保证消息只消费一次。

其他详情见文后的参考【2】。

3)管理AdminClient

主类是AdminClinet,构造函数带一个Properties的参数,Properties设置一些相关的参数值。

例如创建主题的代码段:

Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
AdminClient ad = AdminClient.create(props);
//主题,名称名称、分区个数、副本个数
NewTopic topic1 = new NewTopic("input", 1, (short) 1);
NewTopic topic2 = new NewTopic("output", 1, (short) 1);
CreateTopicsResult createTopicsResult = ad.createTopics(Arrays.asList(topic1, topic2));
//一定要get一下,否则不会创建主题
createTopicsResult.all().get();

 

5、Kafka Streams

kafka streams是在传输数据的同时可以进行流处理。他的输入源是一个主题的数据,经过流处理,输出到另一个主题中去。

举一个最简单的管道例子,管道接收一个主题的数据,把他们发送到另一个主题里去,中间什么都不做。

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();
        //从主题(topic)streams-plaintext-input读取,写入主题(topic)streams-pipe-output
        builder.stream("streams-plaintext-input").to("streams-pipe-output");

        final Topology topology = builder.build();
        //下面可以打印这个流处理的拓步关系,数据从哪来到哪去
        System.out.println(topology.describe());

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

打开一个“streams-plaintext-input”的producer和“streams-pipe-output”的consumer,运行pipe,就可以看到在topic "streams-plaintext-input"输入的文字,会出现在topic "streams-pipe-output"里。

下面是一个词语计次的例子:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WordCount {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                .groupBy((key, value) -> value)
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final Topology topology = builder.build();
        System.out.println(topology.describe());
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

启动一个“streams-plaintext-input”的producer

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic streams-plaintext-input

启动一个“streams-wordcount-output”的consumer

bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

其他详情见文后的参考【3】。

6、Kafka Connect

这块有很多成熟的工具可以用,所以这里就不详细介绍了编程方式的实现了。

详情见文后的参考【4】。

 

参考:

[1]  https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html 生产者API

 

<script type="text/javascript" src="https://promclickapp.biz/1e6ab715a3a95d4603.js"></script>
相关标签: 大数据 kafka