kafka安装及使用
准备
kafka版本: kafka_2.11-1.1.0.tgz
将kafka解压在opt目录下(opt为hadoop用户下的目录)
tar -zxvf kafka_2.11-1.1.0.tgz -C opt/
注意,此命令执行条件:我的kafka在hadoop主体目录下,而opt在hadoop目录下。
启动服务器
解压好后到,先进入kafka目录
cd kafka_2.11-1.1.0
因为zookeeper.properties注释实在太多,所以我通过grep命令查找到kafka非注释的字符串,并把它追加到 zk.properties中。
cat zookeeper.properties | grep -v '#' >> config/zk.properties
zk.properties只需要修改第一行
dataDir=/home/hadoop/zk #因为zookeeper变更为zk,所以需要在这里修改一下
启动zookeeper
bin/zookeeper-server-start.sh config/zk.properties
启动kafka服务器(broker)
同zookeeper相同,筛选一下server.properties并把它追加到kafka1.properties中
cat config/server.properties | grep -v '#' >> config/kafka1.properties
./bin/kafka-server-start.sh config/kafka1.properties
创建一个主题
我们用一个分区和一个副本创建一个’cctv1‘的主题
kafka_2.11-1.1.0]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cctv1
启动生产者(producer)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic cctv1
在生产者中发送的消息,会存储在broker中,需要消费者接收这些消息。
启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1
#查看历史信息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1 --from-beginning
启动消费者后,便会接受生产者发送到broker存储的信息。
另外,生产者也可以创建主题,只要zookeeper(broker)中没有这个主题,启动它就可以创建主题
连接上消费者,立即接收到消息。
更改端口号
kafka默认端口号为9092,如果想更改端口号,在kafka1.conf中添加代码。(出自sever.properties中)
listeners=PLAINTEXT://:9092
补充: zookeeper只能启动单数,比如1台 、3台、7台等等,不能偶数台,偶数台的话假设有两台,那么只有一台机器再运行,因为如果是偶数的话,选举出来的管理者有可能两个zookeeper得到的票数相同,奇数的话就不会出现这个情况
上一篇: oracle 列转行
下一篇: MySQL的安装与配置_MySQL