Flink读写系列之-读mysql并写入mysql
程序员文章站
2022-07-14 13:15:34
...
在Flink文档中,提供connector读取源数据和把处理结果存储到外部系统中。但是没有提供数据库的connector,如果要读写数据库,官网给出了异步IO(Asynchronous I/O)专门用于访问外部数据,详细可看:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/asyncio.html
还有一种方法是继承RichSourceFunction,重写里面的方法,具体如下:
读取mysql的类:
package com.my.flink.utils.streaming.mysql;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import com.my.flink.utils.config.ConfigKeys;
/**
* @Description mysql source
* @Author jiangxiaozhi
* @Date 2018/10/15 17:05
**/
public class JdbcReader extends RichSourceFunction<Tuple2<String,String>> {
private static final Logger logger = LoggerFactory.getLogger(JdbcReader.class);
private Connection connection = null;
private PreparedStatement ps = null;
//该方法主要用于打开数据库连接,下面的ConfigKeys类是获取配置的类
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName(ConfigKeys.DRIVER_CLASS());//加载数据库驱动
connection = DriverManager.getConnection(ConfigKeys.SOURCE_DRIVER_URL(), ConfigKeys.SOURCE_USER(), ConfigKeys.SOURCE_PASSWORD());//获取连接
ps = connection.prepareStatement(ConfigKeys.SOURCE_SQL());
}
//执行查询并获取结果
@Override
public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
try {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
String name = resultSet.getString("nick");
String id = resultSet.getString("user_id");
logger.error("readJDBC name:{}", name);
Tuple2<String,String> tuple2 = new Tuple2<>();
tuple2.setFields(id,name);
ctx.collect(tuple2);//发送结果,结果是tuple2类型,2表示两个元素,可根据实际情况选择
}
} catch (Exception e) {
logger.error("runException:{}", e);
}
}
//关闭数据库连接
@Override
public void cancel() {
try {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
logger.error("runException:{}", e);
}
}
}
写入mysql的类:
package com.my.flink.utils.streaming.mysql;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import com.my.flink.utils.config.ConfigKeys;
/**
* @Description mysql sink
* @Author jiangxiaozhi
* @Date 2018/10/15 18:31
**/
public class JdbcWriter extends RichSinkFunction<Tuple2<String,String>> {
private Connection connection;
private PreparedStatement preparedStatement;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
useUnicode=true&characterEncoding=utf8";
// 加载JDBC驱动
Class.forName(ConfigKeys.DRIVER_CLASS());
// 获取数据库连接
connection = DriverManager.getConnection(ConfigKeys.SINK_DRIVER_URL(),ConfigKeys.SINK_USER(),ConfigKeys.SINK_PASSWORD());//写入mysql数据库
preparedStatement = connection.prepareStatement(ConfigKeys.SINK_SQL());//insert sql在配置文件中
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
if(preparedStatement != null){
preparedStatement.close();
}
if(connection != null){
connection.close();
}
super.close();
}
@Override
public void invoke(Tuple1<String,String> value, Context context) throws Exception {
try {
String name = value._1;//获取JdbcReader发送过来的结果
preparedStatement.setString(1,name);
preparedStatement.executeUpdate();
}catch (Exception e){
e.printStackTrace();
}
}
}
程序入口核心代码:
//scala代码
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val dataStream = env.addSource(new JdbcReader())//,读取mysql数据,获取dataStream后可以做逻辑处理,这里没有
做
dataStream.addSink(new JdbcWriter())//写入mysql
env.execute("flink mysql demo")//运行程序
运行mysql就可以在数据库表中看到写入的数据了。