Flume读取数据写入Kafka消息队列中
程序员文章站
2024-01-11 20:36:52
...
前面已经给大家讲过flume和Kafka的简介以及安装,今天就给大家讲讲二者如何关联使用。
本文主要就是讲解如何使用flume采集日志信息后把数据写入kafka中,由于时间关系,这里就暂时用伪数据,把存放伪数据的文件放到专门用于flume监听文件的目录中就是前面提到过的/opt/soft/datas下。
1.配置flume
先新建配置文件用于关联kafka
还是在/opt/flumeconf下创建properties文件,并添加以下配置
cd /opt/flumeconf
vi conf_0812_kafka.properties
a5.channels=c5
a5.sources=s5
a5.sinks=k5
a5.sources.s5.type=spooldir
a5.sources.s5.spoolDir=/opt/soft/datas
a5.sources.s5.interceptors=head_filter
#正则拦截器
a5.sources.s5.interceptors.head_filter.type=regex_filter
a5.sources.s5.interceptors.head_filter.regex=^event_id.*
a5.sources.s5.interceptors.head_filter.excludeEvents=true
#用来关联kafka
a5.sinks.k5.type=org.apache.flume.sink.kafka.KafkaSink
a5.sinks.k5.kafka.bootstrap.servers=192.168.5.150:9092
#topic的主题名要一致
a5.sinks.k5.kafka.topic=msgEvent
a5.channels.c5.type=memory
a5.channels.c5.capacity=10000
a5.channels.c5.transactionCapacity=10000
a5.sinks.k5.channel=c5
a5.sources.s5.channels=c5
2.先启动zookeeper和Kafka
zkServer.sh start
cd /opt/soft/kafka211/bin
./kafka-server-start.sh /opt/soft/kafka211/config/server.properties
使用jps验证服务是否启动!
3.创建topic
kafka-topics.sh --create --zookeeper 192.168.5.150:2181 --replication-factor 1 --partitions 1 --topic msgEvent
注意:
创建的topic 和 配置文件里的sinks组件中的topic必须要一致
4.启动消费者以接收消息
kafka-console-consumer.sh --bootstrap-server 192.168.5.150:9092 --topic msgEvent --from-beginning
5.启动生产者以发送消息
kafka-console-producer.sh --broker-list 192.168.5.150:9092 --topic msgEvent
6.启动flume的agent
flume-ng agent -n a5 -c conf -f /opt/flumeconf/conf_0812_kafka.properties -Dflume.root.logger=INFO,console
这样就开启了日志采集 可能要等一段时间, 日志采集完毕之后 flume会提示文件的后缀名会有COMPLETED,说明文件按已采集完毕!如下图所示:
文件会写入到kafka中 具体路径是kafka配置文件中server.properties里面Log Basics的配置 如下图所示:
查看文件信息,是否已经写入到kafka的日志目录中。
如下图所示:
megEvent已出现子datas日志目录中,接下来查看文件中是否有日志信息,如下图所示:
这样数据就写入到了kafka中!