欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

centos安装kafka

程序员文章站 2023-12-29 13:10:52
首先需要安装jdk查看有无:java -version如果没有的话,安装jdkyum install -y java-1.8.0-openjdk下载kafka,安装tar -zvxf kafka_2.11-2.4.0.tgz -C /opt/cd /optmv kafka_2.11-2.4.0 kafka添加系统服务vi /etc/systemd/system/zookeeper.service[Unit]Description=zookeeper [Service]...

首先需要安装jdk
查看有无:

java -version

如果没有的话,安装jdk

yum install -y java-1.8.0-openjdk

下载kafka,
安装

tar -zvxf kafka_2.11-2.4.0.tgz -C  /opt/
cd /opt
mv kafka_2.11-2.4.0 kafka

添加系统服务

vi /etc/systemd/system/zookeeper.service

[Unit]
Description=zookeeper
 
[Service]
Type=simple
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
User=root
Group=root
 
[Install]
WantedBy=multi-user.target
#启动
systemctl enable zookeeper
systemctl start zookeeper
vi /etc/systemd/system/kafka.service

 
[Unit]
Description=Apache Kafka Server
Requires=zookeeper.service
 
[Service]
Type=simple
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
User=root
Group=root
 
[Install]
WantedBy=multi-user.target
#启动
systemctl enable kafka
systemctl start kafka

为了实现外部访问,比如调用python-kafka的话,需要在
config/server.properties中修改配置

advertised.listeners=PLAINTEXT://192.168.122.137:9092

python使用kafka

#生产者
from kafka import KafkaProducer, KafkaConsumer
import json
productor = KafkaProducer(bootstrap_servers='192.168.122.137:9092')
msg_dict = {
    "sleep_time": 10,
    "db_config": {
        "database": "test_1",
        "host": "xxxx",
        "user": "root",
        "password": "root"
    },
    "table": "msg",
    "msg": "Hello World"
}
msg = json.dumps(msg_dict).encode('utf-8')
str = 'helloworld'
productor.send('test_rhj', str.encode('utf-8'))
productor.send('test_rhj', msg)

productor.close()
#消费者
from kafka import KafkaProducer, KafkaConsumer

consumer = KafkaConsumer('test_rhj',bootstrap_servers=['192.168.122.137:9092'])

msg = "Hello World".encode('utf-8')

for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print(recv)

本文地址:https://blog.csdn.net/qq_41122834/article/details/107244698

上一篇:

下一篇: