kafka的安装和使用
kafka的介绍
Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式 发布-订阅消息传递系统。 它最初由LinkedIn公司开发,Linkedin于2010年贡献给了Apache 基金会并成为*开源项目。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的 和可复制的提交日志服务。 注意:Kafka并没有遵循JMS规范,它只提供了发布和订阅通讯方式。 kafka中文官网:http://kafka.apachecn.org/quickstart.html
kafka的优点
高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。 可扩展性:kafka集群支持热扩展 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败) 高并发:支持数千个客户端同时读写
kafka的相关名称
Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群
Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发
massage: Kafka中最基本的传递对象。
Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列(分区)
Segment:partition物理上由多个segment组成,每个Segment存着message信息
Producer : 生产者,生产message发送到topic
Consumer : 消费者,订阅topic并消费message, consumer作为一个线程来消费 Consumer Group:消费者组,一个Consumer Group包含多个consumer
Offset:偏移量,理解为消息partition中的索引即可
kafka存储策略
kafka以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成。 每个segment中存储多条消息(见下图),消息id由其逻辑位置决定,即从消息id可直 接定位到消息的存储位置,避免id到位置的额外映射。 每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。 发布者发到某个topic的消息会被均匀的分布到多个partition上(或根据用户指定的路由 规则进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息, 当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被 flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到, segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
安装
因为kafka他是依赖于zookeeper所以一般在kafka之前必须先安装zookeeper
安装zookeeper(集群条件下)
1: 检查服务器上是否带有jdk环境(java -version),没有就进行安装
2: 下载并且安装zookeeper安装包
3: 解压zookeeper安装包 tar -zxvf zookeeper-3.4.10.tar.gz 重命名 mv zookeeper-3.4.10 zookeeper
4: 修改zoo_sample.cfg文件 cd /usr/local/zookeeper/conf mv zoo_sample.cfg zoo.cfg 修改conf: vi zoo.cfg 修改两处 修改路径 dataDir=/usr/local/zookeeper/data(注意同时在zookeeper创建data目录) 在zoo.cfg添加本机ip server.1=192.168.212.174:2888:3888 //其他服务器的IP地址 server.2=192.168.212.175:2888:3888 server.3=192.168.212.175:2888:3888
5: 创建服务器标识 在上面配置的dataDir对应路径创建data目录 创建文件夹: mkdir data 创建文件myid并填写内容为0: vi myid (内容为服务器标识 : 0)
6. 关闭每台服务器节点防火墙,systemctl stop firewalld.service 启动zookeeper 路径 /usr/local/zookeeper/bin ./zkServer.sh start 然后查看状态 zkServer.sh status(在三个节点上检验zk的mode,一个leader和俩个follower) 关闭指令 zkServer.sh stop
安装kafka
成功之后我们就可以安装kafka
下载kafka // 解压下载好的kafka压缩包并重命名 cd /usr/local wget http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
解压 tar -zxvf kafka_2.11-1.0.0.tgz 重命名 mv kafka_2.12-0.11.0.0 kafka
// 修改配置文件server.properties
vi ./kafka/config/server.properties broker.id=0
//本机ip listeners=PLAINTEXT://192.168.131.130:9092
zookeeper的ip(如果是集群的话那么就以逗号隔开) zookeeper.connect=192.168.131.130:2181
在系统环境中配置kafka和zookeeper的路径
vi /etc/profile export JAVA_HOME=jdk的路径
export ZPPKEEPER_HOME=(你的zookeeper的路径)/usr/local/zookeeper
export KAFKA_HOME=(你的kafka的路径)/usr/local/kafka
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib export PATH=$JAVA_HOME/bin:${KAFKA_HOME}/bin:${ZPPKEEPER_HOME}/bin:$PATH
//配置文件生效 source /etc/profile
启动kafka的指令(启动之前检查zookeeper是否启动)
./bin/kafka-server-start.sh -daemon config/server.properties
创建一个分区(如果kafka创建集群的话多个分区会被平摊在不同的服务器上)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看已经创建好的topic信息 bin/kafka-topics.sh --list --zookeeper localhost:2181
我们可以生产消息和消费消息了 生产者 这里不能用localhost应该用ip
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
这里不能用localhost应该用ip 消费者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
关闭kafka ./bin/kafka-server-stop.sh -daemon config/server.properties
这样就完成了kafka的安装
package com.kafla.example.sc;
import java.util.Properties;
import com.kafla.example.util.Config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
/**
* @Author ffc
* @Description //TODO 生产者发送消息
**/
public class KafkaProducerSc {
private final KafkaProducer<String, String> producer;
public KafkaProducerSc() {
Properties props = new Properties();
//读取kafka配置文件中的kafka的ip地址
props.put("bootstrap.servers", Config.config.getProperty("bootstrap-servers"));
// 批量抓取的大小
props.put("batch.size", Config.config.getProperty("batch-size"));
// 缓存容量
props.put("buffer.memory", Config.config.getProperty("buffer-memory"));
// key/value的反序列化
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer<String, String>(props);
}
public void run( String topic, String key, String value ) throws Exception {
// topic: 消息队列的名称,可以先行在kafka服务中进行创建。如果kafka中并未创建该topic,那么便会自动创建!
// key:键值,也就是value对应的值,和Map类似。
// value:要发送的数据,数据格式为String类型的。
producer.send(new ProducerRecord<String, String>(topic, key, value));
}
public static void main( String args[] ) {
KafkaProducerSc kafkaProducerSc = new KafkaProducerSc();
try {
for (int i = 0; i < 10; i++) {
System.out.println("发送了" + 1 + "条数据");
kafkaProducerSc.run("xxxx", "key" + i, "值" + i+"润博");
}
} catch (Exception e) {
e.printStackTrace();
System.out.println(e);
} finally {
// 注意:生产者发送数据完了之后一定要把producer给关掉,否则数据发送不出去
kafkaProducerSc.producer.close();
}
}
}
package com.kafla.example.sc;
import java.time.Duration;
import java.util.Properties;
import com.kafla.example.util.Config;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
/**
* 2 * @Author: ffc
* 3 * @Date: 2019/9/6 0:16
* 消费者消费消息
* 4
*/
public class KafkaConsumerSc {
private final KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
public KafkaConsumerSc() {
Properties props = new Properties();
//设置kafka的ip
props.put("bootstrap.servers", Config.config.getProperty("bootstrap-servers"));
//消费者组
props.put("group.id", Config.config.getProperty("group-id"));
props.put("auto.offset.reset", Config.config.getProperty("auto-offset-reset"));
// key/value的反序列化
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
}
public void run( String topicName ) {
int messageNo = 1;
//设置主题
consumer.subscribe(Arrays.asList(topicName));
System.out.println("---------开始消费---------");
try {
for (; ; ) {
// 获取ConsumerRecords,一秒钟轮训一次
msgList = consumer.poll(1000);
if (null != msgList && msgList.count() > 0) {
for (ConsumerRecord<String, String> record : msgList) {
// 接受并打印生产者发送过来的消息
System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());
messageNo++;
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void main( String args[] ) {
new KafkaConsumerSc().run("xxxxxx");
}
}
# kafka服务器地址(可以多个以逗号分开) bootstrap-servers=xxxxxxxxxxx:9092 #指定一个默认的组名(一个默认的组名,在kafka中一条消息可以被多个分组消费,但是只能被一个分组中的一个消费者消费。)自己定义 group-id=kafka2 # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset=earliest # key/value的反序列化 key-deserializer=org.apache.kafka.common.serialization.StringDeserializer value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 批量抓取 batch-size=65536 # 缓存容量 buffer-memory=524288 #acks:消息的确认机制,默认值是0。 #acks=0:如果设置为0,生产者不会等待kafka的响应。 #acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。 #acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证,但是效率比较低。 acks=all