Flink1.11.2-pg-source&sink
程序员文章站
2022-07-14 14:19:38
...
Flink1.11.2-pg-source&sink
[CSVSource_ToPg.java]
package com.flink.source.pg_jdbc;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
public class CSVSource_ToPg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dsFile = env.readTextFile("D:\\code\\flink111\\src\\main\\resources\\input\\water_sensor.txt");
dsFile.map(new MapFunction<String, Map>() {
@Override
public Map map(String s) throws Exception {
String[] strings = s.split(",");
HashMap<String, String> map = new HashMap<>();
map.put("id",strings[0]);
map.put("ts",strings[1]);
map.put("vc",strings[2]);
return map;
}
})
.addSink(new PgsqlSink());
env.execute("txt write to the psql demo");
}
public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String,Integer>> out){
String[] tokens = value.toLowerCase().split("\\W+");
for (String token:tokens){
if (token.length()>0){
out.collect(new Tuple2<String, Integer>(token,1));
}
}
}
}
}
[PgsqlSink.java]
package com.flink.source.pg_jdbc;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Map;
public class PgsqlSink extends RichSinkFunction<Map> {
private static final long serialVersionUID = 1L;
private Connection connection;
private PreparedStatement preparedStatement;
@Override
public void open(Configuration parameters) throws Exception {
//JDBC连接信息
String USERNAME = "test_user";
String PASSWORD = "aaaaaa";
String driverClass = "org.postgresql.Driver";
String URL = "jdbc:postgresql://192.168.1.163:5432/test_db2";
//加载jdbc的驱动
Class.forName(driverClass);
//获取数据库的连接
connection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
String sql = "insert into water_sensor(id, ts,vc) values (?,?,?)";
preparedStatement = connection.prepareStatement(sql);
super.open(parameters);
}
@Override
public void invoke(Map value, Context context) {
try {
String id = (String) value.get("id");
Long ts = Long.valueOf(value.get("ts").toString()) ;
Integer vc = Integer.valueOf(value.get("vc").toString());
preparedStatement.setString(1, id);
preparedStatement.setLong(2, ts);
preparedStatement.setInt(3, vc);
preparedStatement.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws Exception {
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
super.close();
}
}
[FromPg_Print.java]
package com.flink.source.pg_jdbc;
import com.flink.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FromPg_Print {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
DataStream<WaterSensor> stream = env.addSource(new PsqlSource());
stream.print();
env.execute("PostGreSQL Source to Flink demo");
}
}
[PsqlSource.java]
package com.flink.source.pg_jdbc;
import com.flink.bean.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class PsqlSource extends RichSourceFunction<WaterSensor> {
private static final long serialVersionUID = 1L;
private Connection connection;
private PreparedStatement preparedStatement;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String USERNAME = "test_user";
String PASSWORD = "aaaaaa";
String driverClass = "org.postgresql.Driver";
String URL = "jdbc:postgresql://192.168.1.163:5432/test_db2";
Class.forName(driverClass);
connection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
String sql = " SELECT * FROM public.water_sensor ";
preparedStatement = connection.prepareStatement(sql);
}
@Override
public void run(SourceContext<WaterSensor> sourceContext) throws Exception {
try {
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
WaterSensor waterSensor = new WaterSensor();
waterSensor.setId(resultSet.getString("id"));
waterSensor.setTs(Long.valueOf(resultSet.getString("ts")));
waterSensor.setVc(Integer.valueOf(resultSet.getString("vc")));
sourceContext.collect(waterSensor);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void cancel() {
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (preparedStatement != null) {
preparedStatement.close();
}
}
}
water_sensor.txt
sensor_1,1549044122,1
sensor_1,1549044123,2
sensor_1,1549044124,3
sensor_1,1549044125,4
sensor_2,1549044123,2
sensor_3,1549044124,3
sensor_4,1549044125,4
sensor_5,1549044126,5
sensor_6,1549044127,6
sensor_7,1549044128,7
-- public.water_sensor definition
-- Drop table
-- DROP TABLE public.water_sensor;
CREATE TABLE public.water_sensor (
id varchar NULL,
ts int8 NULL,
vc int4 NULL
);