欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume获取多mysql数据传入kafka中

程序员文章站 2024-01-11 21:41:04
...

实验条件:

两个mysql 5.3 数据库在不同ip的机器上
一个kafka 0.10.1集群(ambari集群,kafka端口默认6667,apache原生端口9092)
一台flume 1.5.2

按以下步骤:

1. 上传两个包到flume/lib/目录(存放很多Jar包)下[jar包地址]

(1)数据库连接jar包
(2)flume-mysql插件

2. 更改flume配置

a1.sources = s1 s2
a1.channels = c1 c2
a1.sinks = kafkaSink
#Source1配置
# For each one of the sources, the type is defined
a1.sources.s1.type = org.keedio.flume.source.SQLSource
a1.sources.s1.hibernate.connection.url = jdbc:mysql://172.2.2.3:3306/aa
# Hibernate Database connection properties
a1.sources.s1.hibernate.connection.user = root
a1.sources.s1.hibernate.connection.password = root
a1.sources.s1.hibernate.connection.autocommit = true
a1.sources.s1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.s1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.s1.run.query.delay = 10000
#Source1查询状态配置
#确保/var/log/flume目录存在,位置可以自定义,文件名也可以自定义,存储上次查询位置,第一次为空,使用start.from初始值作为第一次查询条件。
a1.sources.s1.status.file.path = /var/log/flume
a1.sources.s1.status.file.name = s1.status
# Source1查询配置
#增量起始值
a1.sources.s1.start.from = 0
#这里引号需要用转移字符\转义,aaa@qq.com$会被替换为:1.当状态文件中增量标识不存在,也就是第一次查询的时候,替换为start.from指定的值;2.当增量标识有值,则替换为状态文件里存储的值
# 查询条件必须为主键,自增
a1.sources.s1.custom.query = select * from piececounts where PieceCountID > \'aaa@qq.com$\'
a1.sources.s1.batch.size = 1000
#每次查询1000条数据,查询的语句会自动拼接 “limit 1000”
a1.sources.s1.max.rows = 1000
#字段分隔符
a1.sources.s1.delimiter.entry = ,
#连接池的配置
a1.sources.s1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.s1.hibernate.c3p0.min_size = 1
a1.sources.s1.hibernate.c3p0.max_size = 10

#Source2配置
# For each one of the sources, the type is defined
a1.sources.s2.type = org.keedio.flume.source.SQLSource
a1.sources.s2.hibernate.connection.url = jdbc:mysql://172.2.2.2:3306/kk
# Hibernate Database connection properties
a1.sources.s2.hibernate.connection.user = root
a1.sources.s2.hibernate.connection.password = root
a1.sources.s2.hibernate.connection.autocommit = true
a1.sources.s2.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.s2.hibernate.connection.driver_class = com.mysql.jdbc.Driver
#agent.sources.s2.table = employee1
# Columns to import to kafka (default * import entire row)
#agent.sources.s2.columns.to.select = *
# Query delay, each configured milisecond the query will be sent
a1.sources.s2.run.query.delay = 10000
#Source2查询状态配置
a1.sources.s2.status.file.path = /var/log/flume
a1.sources.s2.status.file.name = s2.status
# Source查询配置
#增量起始值
a1.sources.s2.start.from = 0
#这里引号需要用转移字符\转义,aaa@qq.com$会被替换为:1.当状态文件中增量标识不存在,也就是第一次查询的时候,替换为start.from指定的值;2.当增量标识有值,则替换为状态文件里存储的值
a1.sources.s2.custom.query = select * from measurementvalues where MeasurementValueID > \'aaa@qq.com$\'
a1.sources.s2.batch.size = 1000
#每次查询1000条数据,查询的语句会自动拼接 “limit 1000”
a1.sources.s2.max.rows = 1000
#字段分隔符
a1.sources.s2.delimiter.entry = ,
#连接池的配置
a1.sources.s2.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.s2.hibernate.c3p0.min_size = 1
a1.sources.s2.hibernate.c3p0.max_size = 10

#channel配置
# The channel can be defined as follows.
a1.sources.s1.channels = c1
a1.sources.s2.channels = c2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 500
a1.channels.c2.type = memory
a1.channels.c2.capacity = 500

#sink配置
#使用的flume自带的kafka sink
a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaSink.flumeBatchSize = 640
a1.sinks.kafkaSink.kafka.bootstrap.servers = 172.2.2.10:6667,172.2.2.11:6667,172.2.2.12:6667
a1.sinks.kafkaSink.kafka.topic = KFC

#关联配置
a1.sources.s1.channels = c1
a1.sources.s2.channels = c2
a1.sinks.kafkaSink.channel = c1 c2

更改完记得重启flume服务

3. kafka收到数据如下

获取的为mysql增量数据,但是得区分表。转json需2次开发keedio/flume-ng-sql-source,在flume source获取mysql数据后将其转为json格式往channel发送。
Flume获取多mysql数据传入kafka中

4. 结构图

Flume获取多mysql数据传入kafka中

相关标签: Flume