欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink实践三:读取kafka过滤后写入kafka

程序员文章站 2022-07-14 12:27:38
...

需求:读取topic数据,进行json解析后,发到kafka;

1、json转换类:

package applog;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple5;

/**
 * @Description: 解析原始消息的辅助类
 * @author: yzg
 * @date: xxx
 */
public class JSONHelperTwo {

    /**
     * 解析消息,得到时间字段
     * @param raw
     * @return
     */

/*
    appLog(第一行)、systemCode、appName、hostIp、count;
*/
    public static Tuple5<String,String,String,String,Long> getTupleFromMessage(String raw){
        ApplogSingleMessage singleMessage = parse(raw);
        String applog=singleMessage.getAppLog();
        String systemCode=singleMessage.getSystemCode();
        String appName=singleMessage.getAppName();
        String hostIp=singleMessage.getHostIp();
        Long logerTime=singleMessage.getLogTime();
        Tuple5<String,String,String,String,Long> TupleSixe=new Tuple5<String,String,String,String,Long>(applog,systemCode,appName,hostIp,logerTime);
        return  TupleSixe;
    }

    /**
     * 将消息解析成对象
     * @param raw
     * @return
     */
    public static ApplogSingleMessage parse(String raw){
        ApplogSingleMessage singleMessage = null;
        if (raw != null) {
            singleMessage = JSONObject.parseObject(raw.substring(1, raw.length()-1), ApplogSingleMessage.class);
        }

        return singleMessage;
    }



/*    public static String getFirstLine(String applog_all){
        String applog=null;
        if(applog_all!=null){
            if(applog_all.indexOf("\\\n")==-1){
                applog=applog_all;
            }else{
                applog=applog_all.substring(0, applog_all.indexOf("\n"));
            }
        }
        return applog;
    }*/

}               

2、主程序:

package applog;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class KafkaConsumerTest {
    public static void main(String[] args) throws Exception {

        //设置statebackend
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //checkpoint配置
        env.setStateBackend(new FsStateBackend("hdfs://172.19.73.90:8020/user/zx_yezonggang", false));
        env.enableCheckpointing(60000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
        env.getCheckpointConfig().setCheckpointTimeout(10000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(10);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        /*
        1、Could not get a resource from the pool
        2、Work queue full
        3、Thread pool is EXHAUSTED!
        4、java.lang.OutOfMemoryError
        5、more than max_user_connections active connections
        6、Communications link failure
        */

        //++++++++++++++++++++++++++++++++++++数据源1++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        Properties props1 = new Properties();
        props1.setProperty("bootstrap.servers", "xxx:9092");
        props1.setProperty("zookeeper.connect", "xxxx:21810");
        props1.setProperty("group.id", "applog_flink_group_alarm");
        FlinkKafkaConsumer010<String> consumer1 =new FlinkKafkaConsumer010<>("applog", new SimpleStringSchema(), props1);
        //淮安的applog
        DataStream source1 =env.addSource(consumer1).setParallelism(40);

        //++++++++++++++++++++++++++++++++++++数据源2++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        Properties props2 = new Properties();
        props2.setProperty("bootstrap.servers", "xxx:9092");
        props2.setProperty("zookeeper.connect", "xxx");
        props2.setProperty("group.id", "applog_flink_group_n");
        FlinkKafkaConsumer010<String> consumer2 =new FlinkKafkaConsumer010<>("applog", new SimpleStringSchema(), props2);
        //洛阳的applog
        DataStream source2 =env.addSource(consumer2).setParallelism(40);

        //++++++++++++++++++++++++++++++++++++合并数据源++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        DataStream source_union=source1.union(source2);

        //++++++++++++++++++++++++++++++++++++过滤-处理++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        DataStream source_filter=source_union
                .flatMap(new FlatMapFunction<String, Tuple5<String,String,String,String,Long>>() {
                    // 解析数据
                    @Override
                    public void flatMap(String s, Collector<Tuple5<String,String,String,String,Long>> collector) throws Exception {
                        if (null != JSONHelperTwo.parse(s)) {
                            collector.collect(JSONHelperTwo.getTupleFromMessage(s));
                        }
                    }
                }).setParallelism(20)
                .filter(new FilterOne()).setParallelism(20)
                .map(new MapFunction<Tuple5<String,String,String,String,Long>,String>() {
                    @Override
                    public String map(Tuple5<String, String, String, String, Long> input) throws Exception {
                       String applog=null;
                       int index=input.f0.indexOf("\n");
                        if(input.f0!=null&&index!=-1){
                                applog=input.f0.substring(0,index-1);
                            }else{
                                applog=input.f0;
                            }
                        String output=applog+"|"+input.f1+"|"+input.f2+"|"+input.f3+"|"+input.f4;
                        return output;
                    }
                }).setParallelism(20);

        //++++++++++++++++++++++++++++++++++++发到kafka++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        //将string 吐到kafka
        source_filter.addSink(new FlinkKafkaProducer010<>(
                "xxx:9092",
                "topic_applog_alarm",
                new SimpleStringSchema()
        )).name("applog_alarm").setParallelism(5);
        env.execute("list_data");
    }

    public static class FilterOne implements FilterFunction<Tuple5<String,String,String,String,Long>> {
        private static String error1 = "not get a resource from the pool";
        private static String error2 = "queue full";
        private static String error3 = "pool is EXHAUSTED";
        private static String error4 = "lang.OutOfMemoryError";
        private static String error5 = "than max_user_connections active connections";
        private static String error6 = "Communications link failure";

        @Override
        public boolean filter(Tuple5<String,String,String,String,Long> input) throws Exception {
            if (input.f0.indexOf(error1) == -1 &&
                    input.f0.indexOf(error2) == -1 &&
                    input.f0.indexOf(error3) == -1 &&
                    input.f0.indexOf(error4) == -1 &&
                    input.f0.indexOf(error5) == -1 &&
                    input.f0.indexOf(error6) == -1) {
                return false;
            } else {
                return true;
            }
        }

    }

}