Flume学习系列(七)---- 自定义Sink到Mysql
前言:接上一篇,我们总结了一下自定义Sink的流程,这次我们实现一个自己的Sink,将数据Sink到Mysql数据库中。我们还是使用 Flume学习系列(二)----实战Spooling到HDFS中的的源,但是Sink我们不用HDFS,用MysqlSink。
一、创建数据库相关
create database flume;
use flume;
DROP TABLE IF EXISTS `income`;
CREATE TABLE `income` (
`userid` varchar(36) NOT NULL , #用户唯一编号
`county` varchar(3) NOT NULL, #县
`town` varchar(3) NOT NULL, #镇
`income` int(11) DEFAULT NULL, #收入
PRIMARY KEY (`userid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
二、编写自定义MysqlSink
回顾一下数据的格式:(为了插入数据方便,我把最后一部分之间的####变成了逗号,同时给每个字段都加上了单引号)
[INFO ] 2018-08-20 18:40:20 'e2a07cc1-f0e4-46e0-8bad-59303b1085fd','AMU','bml','2148168'
[INFO]与[2018-08-20 18:40:20]与['e2a07cc1-f0e4-46e0-8bad-59303b1085fd','AMU','bml','2148168']之间用制表符分割。这个就是我们的body的内容,一会通过split去切。
自定义Sink代码如下:
package com.zhb.flume;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
public class MysqlSinker extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(MysqlSinker.class);
private Connection connect;
private Statement stmt;
private String columnName;
private String url;
private String user;
private String password;
private String tableName;
// 在整个sink结束时执行一遍
@Override
public synchronized void stop() {
// TODO Auto-generated method stub
super.stop();
}
// 在整个sink开始时执行一遍,用来初始化数据库连接
@Override
public synchronized void start() {
// TODO Auto-generated method stub
super.start();
try {
connect = DriverManager.getConnection(url, user, password);
// 连接URL为 jdbc:mysql//服务器地址/数据库名 ,后面的2个参数分别是登陆用户名和密码
stmt = connect.createStatement();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// 不断循环调用,处理消息Event(本例就是插入数据库)
public Status process() throws EventDeliveryException {
// TODO Auto-generated method stub
//事务,获取event什么的都是模板。仿照别的sink写就OK
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
Event event = null;
txn.begin();
while (true) {
event = ch.take();
if (event != null) {
break;
}
}
try {
String rawbody = new String(event.getBody());
//logger.error("rawbody:"+rawbody);
String body = rawbody.split("\t")[2];
//logger.error("spiltbody:"+body);
if (body.split(",").length == columnName.split(",").length) {
String sql = "insert into " + tableName + "(" + columnName + ") values(" + body + ")";
//logger.error("sql:"+sql);
stmt.executeUpdate(sql);
txn.commit();
return Status.READY;
} else {
txn.rollback();
return null;
}
} catch (Throwable th) {
txn.rollback();
if (th instanceof Error) {
throw (Error) th;
} else {
throw new EventDeliveryException(th);
}
} finally {
txn.close();
}
}
//从配置文件中读取各种属性,并进行一些非空验证
public void configure(Context context) {
columnName = context.getString("column_name");
Preconditions.checkNotNull(columnName, "column_name must be set!!");
url = context.getString("url");
Preconditions.checkNotNull(url, "url must be set!!");
user = context.getString("user");
Preconditions.checkNotNull(user, "user must be set!!");
//我的mysql没有密码。所以这里不检查密码为空
password = context.getString("password");
// Preconditions.checkNotNull(password, "password must be set!!");
tableName = context.getString("tableName");
Preconditions.checkNotNull(tableName, "tableName must be set!!");
}
}
将写好的程序打成jar包放到flume的lib下。
三、编写配置文件
flume.conf的内容如下:
# my application flume configuration
#agent2 name
agent2.sources=source2
agent2.sinks=sink2
agent2.channels=channel2
#Spooling Directory
#set source2
agent2.sources.source2.type=spooldir
agent2.sources.source2.spoolDir=/Users/jsj/eclipse-workspace/logs
agent2.sources.source2.channels=channel2
agent2.sources.source2.fileHeader = false
agent2.sources.source2.interceptors = i1
agent2.sources.source2.interceptors.i1.type = timestamp
#set sink2
agent2.sinks.sink2.type=com.zhb.flume.MysqlSinker
agent2.sinks.sink2.url =jdbc:mysql://127.0.0.1:3306/flume
agent2.sinks.sink2.tableName= income
agent2.sinks.sink2.user=root
#为空就不写,不要写""
agent2.sinks.sink2.password=
agent2.sinks.sink2.column_name=userid,county,town,income
agent2.sinks.sink2.channel=channel2
#set channel2
agent2.channels.channel2.type=memory
agent2.channels.channel2.capacity=10000
agent2.channels.channel2.transactionCapacity=1000
agent2.channels.channel2.keep-alive=30
四、验证
进入到flume的bin目录下,执行./flume-ng agent -c ../conf -f ../conf/flume.conf -Dflume.root.logger=INFO,console -n agent 2
成功启动flume后,新开一个终端,将生成的log文件拷贝到spooling 监控的文件夹下:cp /Users/jsj/eclipse-workspace/log4j/src/main/java/testlog.log* /Users/jsj/eclipse-workspace/logs
看下数据库。
大功告成,成功插入到数据库。自定义Sink成功了。
五、总结
本文实现了自定义sink,将数据sink到mysql中。说一下心得吧,中间有问题的时候怎么调试,就在你编写的类里用logger去输出你想看的日志即可。我在上面的代码中注释掉了。至此,flume的绝大部分内容都结束了。