Flink实践三:读取kafka过滤后写入kafka
程序员文章站
2022-07-14 12:27:38
...
需求:读取topic数据,进行json解析后,发到kafka;
1、json转换类:
package applog;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple5;
/**
* @Description: 解析原始消息的辅助类
* @author: yzg
* @date: xxx
*/
public class JSONHelperTwo {
/**
* 解析消息,得到时间字段
* @param raw
* @return
*/
/*
appLog(第一行)、systemCode、appName、hostIp、count;
*/
public static Tuple5<String,String,String,String,Long> getTupleFromMessage(String raw){
ApplogSingleMessage singleMessage = parse(raw);
String applog=singleMessage.getAppLog();
String systemCode=singleMessage.getSystemCode();
String appName=singleMessage.getAppName();
String hostIp=singleMessage.getHostIp();
Long logerTime=singleMessage.getLogTime();
Tuple5<String,String,String,String,Long> TupleSixe=new Tuple5<String,String,String,String,Long>(applog,systemCode,appName,hostIp,logerTime);
return TupleSixe;
}
/**
* 将消息解析成对象
* @param raw
* @return
*/
public static ApplogSingleMessage parse(String raw){
ApplogSingleMessage singleMessage = null;
if (raw != null) {
singleMessage = JSONObject.parseObject(raw.substring(1, raw.length()-1), ApplogSingleMessage.class);
}
return singleMessage;
}
/* public static String getFirstLine(String applog_all){
String applog=null;
if(applog_all!=null){
if(applog_all.indexOf("\\\n")==-1){
applog=applog_all;
}else{
applog=applog_all.substring(0, applog_all.indexOf("\n"));
}
}
return applog;
}*/
}
2、主程序:
package applog;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class KafkaConsumerTest {
public static void main(String[] args) throws Exception {
//设置statebackend
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint配置
env.setStateBackend(new FsStateBackend("hdfs://172.19.73.90:8020/user/zx_yezonggang", false));
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(10);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
/*
1、Could not get a resource from the pool
2、Work queue full
3、Thread pool is EXHAUSTED!
4、java.lang.OutOfMemoryError
5、more than max_user_connections active connections
6、Communications link failure
*/
//++++++++++++++++++++++++++++++++++++数据源1++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Properties props1 = new Properties();
props1.setProperty("bootstrap.servers", "xxx:9092");
props1.setProperty("zookeeper.connect", "xxxx:21810");
props1.setProperty("group.id", "applog_flink_group_alarm");
FlinkKafkaConsumer010<String> consumer1 =new FlinkKafkaConsumer010<>("applog", new SimpleStringSchema(), props1);
//淮安的applog
DataStream source1 =env.addSource(consumer1).setParallelism(40);
//++++++++++++++++++++++++++++++++++++数据源2++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Properties props2 = new Properties();
props2.setProperty("bootstrap.servers", "xxx:9092");
props2.setProperty("zookeeper.connect", "xxx");
props2.setProperty("group.id", "applog_flink_group_n");
FlinkKafkaConsumer010<String> consumer2 =new FlinkKafkaConsumer010<>("applog", new SimpleStringSchema(), props2);
//洛阳的applog
DataStream source2 =env.addSource(consumer2).setParallelism(40);
//++++++++++++++++++++++++++++++++++++合并数据源++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
DataStream source_union=source1.union(source2);
//++++++++++++++++++++++++++++++++++++过滤-处理++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
DataStream source_filter=source_union
.flatMap(new FlatMapFunction<String, Tuple5<String,String,String,String,Long>>() {
// 解析数据
@Override
public void flatMap(String s, Collector<Tuple5<String,String,String,String,Long>> collector) throws Exception {
if (null != JSONHelperTwo.parse(s)) {
collector.collect(JSONHelperTwo.getTupleFromMessage(s));
}
}
}).setParallelism(20)
.filter(new FilterOne()).setParallelism(20)
.map(new MapFunction<Tuple5<String,String,String,String,Long>,String>() {
@Override
public String map(Tuple5<String, String, String, String, Long> input) throws Exception {
String applog=null;
int index=input.f0.indexOf("\n");
if(input.f0!=null&&index!=-1){
applog=input.f0.substring(0,index-1);
}else{
applog=input.f0;
}
String output=applog+"|"+input.f1+"|"+input.f2+"|"+input.f3+"|"+input.f4;
return output;
}
}).setParallelism(20);
//++++++++++++++++++++++++++++++++++++发到kafka++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
//将string 吐到kafka
source_filter.addSink(new FlinkKafkaProducer010<>(
"xxx:9092",
"topic_applog_alarm",
new SimpleStringSchema()
)).name("applog_alarm").setParallelism(5);
env.execute("list_data");
}
public static class FilterOne implements FilterFunction<Tuple5<String,String,String,String,Long>> {
private static String error1 = "not get a resource from the pool";
private static String error2 = "queue full";
private static String error3 = "pool is EXHAUSTED";
private static String error4 = "lang.OutOfMemoryError";
private static String error5 = "than max_user_connections active connections";
private static String error6 = "Communications link failure";
@Override
public boolean filter(Tuple5<String,String,String,String,Long> input) throws Exception {
if (input.f0.indexOf(error1) == -1 &&
input.f0.indexOf(error2) == -1 &&
input.f0.indexOf(error3) == -1 &&
input.f0.indexOf(error4) == -1 &&
input.f0.indexOf(error5) == -1 &&
input.f0.indexOf(error6) == -1) {
return false;
} else {
return true;
}
}
}
}
上一篇: Flink1.12 通过CEP实现告警
下一篇: 开始一个django项目