欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume学习系列(七)---- 自定义Sink到Mysql

程序员文章站 2022-06-14 13:04:08
...

前言:接上一篇,我们总结了一下自定义Sink的流程,这次我们实现一个自己的Sink,将数据Sink到Mysql数据库中。我们还是使用 Flume学习系列(二)----实战Spooling到HDFS中的的源,但是Sink我们不用HDFS,用MysqlSink。

一、创建数据库相关

create database flume;
use flume;

DROP TABLE IF EXISTS `income`;
CREATE TABLE `income` (
  `userid` varchar(36) NOT NULL , #用户唯一编号
  `county` varchar(3) NOT NULL, #县
  `town` varchar(3) NOT NULL,  #镇
  `income` int(11) DEFAULT NULL, #收入
  PRIMARY KEY (`userid`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

二、编写自定义MysqlSink

    回顾一下数据的格式:(为了插入数据方便,我把最后一部分之间的####变成了逗号,同时给每个字段都加上了单引号)

[INFO ] 2018-08-20 18:40:20 'e2a07cc1-f0e4-46e0-8bad-59303b1085fd','AMU','bml','2148168'

    [INFO]与[2018-08-20 18:40:20]与['e2a07cc1-f0e4-46e0-8bad-59303b1085fd','AMU','bml','2148168']之间用制表符分割。这个就是我们的body的内容,一会通过split去切。

    自定义Sink代码如下:

package com.zhb.flume;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

public class MysqlSinker extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(MysqlSinker.class);
    private Connection connect;
    private Statement stmt;
    private String columnName;
    private String url;
    private String user;
    private String password;
    private String tableName;

    // 在整个sink结束时执行一遍
    @Override
    public synchronized void stop() {
        // TODO Auto-generated method stub
        super.stop();
    }

    // 在整个sink开始时执行一遍,用来初始化数据库连接
    @Override
    public synchronized void start() {
        // TODO Auto-generated method stub
        super.start();
        try {
            connect = DriverManager.getConnection(url, user, password);
            // 连接URL为 jdbc:mysql//服务器地址/数据库名 ,后面的2个参数分别是登陆用户名和密码
            stmt = connect.createStatement();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    // 不断循环调用,处理消息Event(本例就是插入数据库)
    public Status process() throws EventDeliveryException {

        // TODO Auto-generated method stub
        //事务,获取event什么的都是模板。仿照别的sink写就OK
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        Event event = null;
        txn.begin();
        while (true) {
            event = ch.take();
            if (event != null) {
                break;
            }
        }
        try {
            String rawbody = new String(event.getBody());
            //logger.error("rawbody:"+rawbody);
            String body = rawbody.split("\t")[2];
            //logger.error("spiltbody:"+body);
            if (body.split(",").length == columnName.split(",").length) {
                String sql = "insert into " + tableName + "(" + columnName + ") values(" + body + ")";  
                //logger.error("sql:"+sql);
                stmt.executeUpdate(sql);
                txn.commit();
                return Status.READY;
            } else {
                txn.rollback();
                return null;
            }
        } catch (Throwable th) {
            txn.rollback();

            if (th instanceof Error) {
                throw (Error) th;
            } else {
                throw new EventDeliveryException(th);
            }
        } finally {
            txn.close();
        }

    }
    //从配置文件中读取各种属性,并进行一些非空验证
    public void configure(Context context) {
        columnName = context.getString("column_name");
        Preconditions.checkNotNull(columnName, "column_name must be set!!");
        url = context.getString("url");
        Preconditions.checkNotNull(url, "url must be set!!");
        user = context.getString("user");
        Preconditions.checkNotNull(user, "user must be set!!");
        //我的mysql没有密码。所以这里不检查密码为空
        password = context.getString("password");
        // Preconditions.checkNotNull(password, "password must be set!!");
        tableName = context.getString("tableName");
        Preconditions.checkNotNull(tableName, "tableName must be set!!");
    }

}

    将写好的程序打成jar包放到flume的lib下。

三、编写配置文件

    flume.conf的内容如下:

# my application flume configuration
#agent2 name
agent2.sources=source2
agent2.sinks=sink2
agent2.channels=channel2


#Spooling Directory
#set source2
agent2.sources.source2.type=spooldir
agent2.sources.source2.spoolDir=/Users/jsj/eclipse-workspace/logs

agent2.sources.source2.channels=channel2
agent2.sources.source2.fileHeader = false
agent2.sources.source2.interceptors = i1
agent2.sources.source2.interceptors.i1.type = timestamp

#set sink2
agent2.sinks.sink2.type=com.zhb.flume.MysqlSinker
agent2.sinks.sink2.url =jdbc:mysql://127.0.0.1:3306/flume
agent2.sinks.sink2.tableName= income
agent2.sinks.sink2.user=root
#为空就不写,不要写""
agent2.sinks.sink2.password=
agent2.sinks.sink2.column_name=userid,county,town,income
agent2.sinks.sink2.channel=channel2

#set channel2
agent2.channels.channel2.type=memory
agent2.channels.channel2.capacity=10000
agent2.channels.channel2.transactionCapacity=1000
agent2.channels.channel2.keep-alive=30

四、验证

    进入到flume的bin目录下,执行./flume-ng agent -c ../conf -f ../conf/flume.conf -Dflume.root.logger=INFO,console -n agent 2

    成功启动flume后,新开一个终端,将生成的log文件拷贝到spooling 监控的文件夹下:cp /Users/jsj/eclipse-workspace/log4j/src/main/java/testlog.log* /Users/jsj/eclipse-workspace/logs
看下数据库。

Flume学习系列(七)---- 自定义Sink到Mysql
001.jpg

    大功告成,成功插入到数据库。自定义Sink成功了。

五、总结

    本文实现了自定义sink,将数据sink到mysql中。说一下心得吧,中间有问题的时候怎么调试,就在你编写的类里用logger去输出你想看的日志即可。我在上面的代码中注释掉了。至此,flume的绝大部分内容都结束了。