欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume读取数据写入Kafka消息队列中

程序员文章站 2024-01-11 20:36:52
...

前面已经给大家讲过flume和Kafka的简介以及安装,今天就给大家讲讲二者如何关联使用。
本文主要就是讲解如何使用flume采集日志信息后把数据写入kafka中,由于时间关系,这里就暂时用伪数据,把存放伪数据的文件放到专门用于flume监听文件的目录中就是前面提到过的/opt/soft/datas下。

1.配置flume

先新建配置文件用于关联kafka
还是在/opt/flumeconf下创建properties文件,并添加以下配置
cd /opt/flumeconf
vi conf_0812_kafka.properties

a5.channels=c5
a5.sources=s5
a5.sinks=k5

a5.sources.s5.type=spooldir
a5.sources.s5.spoolDir=/opt/soft/datas
a5.sources.s5.interceptors=head_filter
#正则拦截器
a5.sources.s5.interceptors.head_filter.type=regex_filter
a5.sources.s5.interceptors.head_filter.regex=^event_id.*
a5.sources.s5.interceptors.head_filter.excludeEvents=true

#用来关联kafka
a5.sinks.k5.type=org.apache.flume.sink.kafka.KafkaSink
a5.sinks.k5.kafka.bootstrap.servers=192.168.5.150:9092
#topic的主题名要一致
a5.sinks.k5.kafka.topic=msgEvent

a5.channels.c5.type=memory
a5.channels.c5.capacity=10000
a5.channels.c5.transactionCapacity=10000

a5.sinks.k5.channel=c5
a5.sources.s5.channels=c5

2.先启动zookeeper和Kafka

zkServer.sh start

cd /opt/soft/kafka211/bin
./kafka-server-start.sh /opt/soft/kafka211/config/server.properties 

使用jps验证服务是否启动!

3.创建topic

kafka-topics.sh --create --zookeeper 192.168.5.150:2181 --replication-factor 1 --partitions 1 --topic msgEvent

注意:
创建的topic 和 配置文件里的sinks组件中的topic必须要一致

4.启动消费者以接收消息

kafka-console-consumer.sh --bootstrap-server 192.168.5.150:9092 --topic msgEvent --from-beginning

5.启动生产者以发送消息

kafka-console-producer.sh --broker-list 192.168.5.150:9092 --topic msgEvent

6.启动flume的agent

flume-ng agent -n a5 -c conf -f /opt/flumeconf/conf_0812_kafka.properties -Dflume.root.logger=INFO,console
这样就开启了日志采集 可能要等一段时间, 日志采集完毕之后 flume会提示文件的后缀名会有COMPLETED,说明文件按已采集完毕!如下图所示:
Flume读取数据写入Kafka消息队列中
文件会写入到kafka中 具体路径是kafka配置文件中server.properties里面Log Basics的配置 如下图所示:
Flume读取数据写入Kafka消息队列中
查看文件信息,是否已经写入到kafka的日志目录中。
如下图所示:
Flume读取数据写入Kafka消息队列中
megEvent已出现子datas日志目录中,接下来查看文件中是否有日志信息,如下图所示:
Flume读取数据写入Kafka消息队列中
这样数据就写入到了kafka中!

相关标签: kafka