Flume获取多mysql数据传入kafka中
程序员文章站
2024-01-11 21:41:04
...
实验条件:
两个mysql 5.3 数据库在不同ip的机器上
一个kafka 0.10.1集群(ambari集群,kafka端口默认6667,apache原生端口9092)
一台flume 1.5.2
按以下步骤:
1. 上传两个包到flume/lib/目录(存放很多Jar包)下[jar包地址]
(1)数据库连接jar包
(2)flume-mysql插件
2. 更改flume配置
a1.sources = s1 s2
a1.channels = c1 c2
a1.sinks = kafkaSink
#Source1配置
# For each one of the sources, the type is defined
a1.sources.s1.type = org.keedio.flume.source.SQLSource
a1.sources.s1.hibernate.connection.url = jdbc:mysql://172.2.2.3:3306/aa
# Hibernate Database connection properties
a1.sources.s1.hibernate.connection.user = root
a1.sources.s1.hibernate.connection.password = root
a1.sources.s1.hibernate.connection.autocommit = true
a1.sources.s1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.s1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.s1.run.query.delay = 10000
#Source1查询状态配置
#确保/var/log/flume目录存在,位置可以自定义,文件名也可以自定义,存储上次查询位置,第一次为空,使用start.from初始值作为第一次查询条件。
a1.sources.s1.status.file.path = /var/log/flume
a1.sources.s1.status.file.name = s1.status
# Source1查询配置
#增量起始值
a1.sources.s1.start.from = 0
#这里引号需要用转移字符\转义,aaa@qq.com$会被替换为:1.当状态文件中增量标识不存在,也就是第一次查询的时候,替换为start.from指定的值;2.当增量标识有值,则替换为状态文件里存储的值
# 查询条件必须为主键,自增
a1.sources.s1.custom.query = select * from piececounts where PieceCountID > \'aaa@qq.com$\'
a1.sources.s1.batch.size = 1000
#每次查询1000条数据,查询的语句会自动拼接 “limit 1000”
a1.sources.s1.max.rows = 1000
#字段分隔符
a1.sources.s1.delimiter.entry = ,
#连接池的配置
a1.sources.s1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.s1.hibernate.c3p0.min_size = 1
a1.sources.s1.hibernate.c3p0.max_size = 10
#Source2配置
# For each one of the sources, the type is defined
a1.sources.s2.type = org.keedio.flume.source.SQLSource
a1.sources.s2.hibernate.connection.url = jdbc:mysql://172.2.2.2:3306/kk
# Hibernate Database connection properties
a1.sources.s2.hibernate.connection.user = root
a1.sources.s2.hibernate.connection.password = root
a1.sources.s2.hibernate.connection.autocommit = true
a1.sources.s2.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.s2.hibernate.connection.driver_class = com.mysql.jdbc.Driver
#agent.sources.s2.table = employee1
# Columns to import to kafka (default * import entire row)
#agent.sources.s2.columns.to.select = *
# Query delay, each configured milisecond the query will be sent
a1.sources.s2.run.query.delay = 10000
#Source2查询状态配置
a1.sources.s2.status.file.path = /var/log/flume
a1.sources.s2.status.file.name = s2.status
# Source查询配置
#增量起始值
a1.sources.s2.start.from = 0
#这里引号需要用转移字符\转义,aaa@qq.com$会被替换为:1.当状态文件中增量标识不存在,也就是第一次查询的时候,替换为start.from指定的值;2.当增量标识有值,则替换为状态文件里存储的值
a1.sources.s2.custom.query = select * from measurementvalues where MeasurementValueID > \'aaa@qq.com$\'
a1.sources.s2.batch.size = 1000
#每次查询1000条数据,查询的语句会自动拼接 “limit 1000”
a1.sources.s2.max.rows = 1000
#字段分隔符
a1.sources.s2.delimiter.entry = ,
#连接池的配置
a1.sources.s2.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.s2.hibernate.c3p0.min_size = 1
a1.sources.s2.hibernate.c3p0.max_size = 10
#channel配置
# The channel can be defined as follows.
a1.sources.s1.channels = c1
a1.sources.s2.channels = c2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 500
a1.channels.c2.type = memory
a1.channels.c2.capacity = 500
#sink配置
#使用的flume自带的kafka sink
a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaSink.flumeBatchSize = 640
a1.sinks.kafkaSink.kafka.bootstrap.servers = 172.2.2.10:6667,172.2.2.11:6667,172.2.2.12:6667
a1.sinks.kafkaSink.kafka.topic = KFC
#关联配置
a1.sources.s1.channels = c1
a1.sources.s2.channels = c2
a1.sinks.kafkaSink.channel = c1 c2
更改完记得重启flume服务
3. kafka收到数据如下
获取的为mysql增量数据,但是得区分表。转json需2次开发keedio/flume-ng-sql-source,在flume source获取mysql数据后将其转为json格式往channel发送。
4. 结构图
上一篇: 【力扣】234. 回文链表