FLink实战读取kafka的nginx数据并写入kafka
程序员文章站
2022-07-14 13:34:08
...
package com.bigdata.realtime;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
public class FlinkUrlApp {
private static Logger log = LoggerFactory.getLogger(FlinkUrlApp.class);
private static String KAFKA_BROKER = "localhost:9092";
private static String TRANSACTION_GROUP = "flink";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
//创建Kafka消费及生产链接
Properties properties = new Properties();
properties.put("bootstrap.servers", KAFKA_BROKER);
// properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
properties.put("group.id", TRANSACTION_GROUP);//max.partition.fetch.bytes
properties.put("max.partition.fetch.bytes", (1024 * 1024 * 200)+"");
properties.put("fetch.max.bytes", (1024 * 1024 * 200)+"");
properties.put("fetch.min.bytes", (1024 * 1024 * 200)+"");
properties.put("auto.commit.interval.ms", "1000");
properties.put("max.poll.records", "10000");
properties.put("send.buffer.bytes", (1024 * 1024 * 200)+"");
properties.put("fetch.max.wait.ms", "500");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("producer.type","async");
properties.put("request.required.acks","1");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("nginx", new SimpleStringSchema(),
properties);
myConsumer.setStartFromLatest();
DataStream<String> stream = env.addSource(myConsumer);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//格式化输入数据
DataStream<UrlDesc> UrlDescStream = stream.flatMap(new FlatMapFunction<String, UrlDesc>(){
@Override
public void flatMap(String value, Collector<UrlDesc> out) throws Exception {
String temp[] = value.split(" ");
//
if(temp[6].contains("\\?"))
temp[6] = temp[6].split("?")[0];
if(temp[11].contains("-"))
temp[11] = "未知";
out.collect(new UrlDesc(temp[0],temp[5].replace("\"", ""),temp[6],temp[7],temp[8],temp[9],temp[10],temp[11],1l));
}});
//注册为表
bsTableEnv.registerDataStream("UrlDesc", UrlDescStream, "ip, method,targetUrl,http,status,size,sourceUrl,clentType,count,UserActionTime.proctime");
//每隔10秒计算前10分钟的IP数据
Table countsBysecondTable = bsTableEnv.scan("UrlDesc").window(Slide.over("10.minutes").every("10.seconds").on("UserActionTime").as("UrlDesc")).groupBy("UrlDesc,ip")
.select("ip as word ,ip.count as count");
FlinkKafkaProducer<String> secondProducer = new FlinkKafkaProducer<String>(KAFKA_BROKER,"IPaddress",new SimpleStringSchema());
bsTableEnv.toAppendStream(countsBysecondTable, WordWithCount.class).map((MapFunction<FlinkUrlApp.WordWithCount, String>)(o)->JSONObject.toJSON(o).toString()).addSink(secondProducer);
bsTableEnv.registerDataStream("UrlDesctargetUrl", UrlDescStream, "ip, method,targetUrl,http,status,size,sourceUrl,clentType,count,UserActionTime.proctime");
Table UrlDesctargetUrl = bsTableEnv.scan("UrlDesctargetUrl").window(Slide.over("10.minutes").every("10.seconds").on("UserActionTime").as("UrlDesctargetUrl")).groupBy("UrlDesctargetUrl,targetUrl")
.select("targetUrl as word ,targetUrl.count as count");
FlinkKafkaProducer<String> UrlDesctargetUrlProducer = new FlinkKafkaProducer<String>(KAFKA_BROKER,"targetUrl",new SimpleStringSchema());
bsTableEnv.toAppendStream(UrlDesctargetUrl, WordWithCount.class).map((MapFunction<FlinkUrlApp.WordWithCount, String>)(o)->JSONObject.toJSON(o).toString()).addSink(UrlDesctargetUrlProducer);
bsTableEnv.registerDataStream("UrlDescmethod", UrlDescStream, "ip, method,targetUrl,http,status,size,sourceUrl,clentType,count,UserActionTime.proctime");
Table method = bsTableEnv.scan("UrlDescmethod").window(Slide.over("10.minutes").every("10.seconds").on("UserActionTime").as("UrlDescmethod")).groupBy("UrlDescmethod,method")
.select("method as word ,method.count as count");
FlinkKafkaProducer<String> methodProducer = new FlinkKafkaProducer<String>(KAFKA_BROKER,"method",new SimpleStringSchema());
bsTableEnv.toAppendStream(method, WordWithCount.class).map((MapFunction<FlinkUrlApp.WordWithCount, String>)(o)->JSONObject.toJSON(o).toString()).addSink(methodProducer);
bsTableEnv.registerDataStream("UrlDescclentType", UrlDescStream, "ip, method,targetUrl,http,status,size,sourceUrl,clentType,count,UserActionTime.proctime");
Table clentType = bsTableEnv.scan("UrlDescclentType").window(Slide.over("10.minutes").every("10.seconds").on("UserActionTime").as("UrlDescclentType")).groupBy("UrlDescclentType,clentType")
.select("clentType as word ,clentType.count as count");
FlinkKafkaProducer<String> clentTypeProducer = new FlinkKafkaProducer<String>(KAFKA_BROKER,"clentType",new SimpleStringSchema());
bsTableEnv.toAppendStream(clentType, WordWithCount.class).map((MapFunction<FlinkUrlApp.WordWithCount, String>)(o)->JSONObject.toJSON(o).toString()).addSink(clentTypeProducer);
FlinkKafkaProducer<String> parallelCount = new FlinkKafkaProducer<String>(KAFKA_BROKER,"parallelCount",new SimpleStringSchema());
stream.flatMap(new FlatMapFunction<String, Tuple1<Long>>(){
@Override
public void flatMap(String value, Collector<Tuple1<Long>> out) throws Exception {
out.collect(new Tuple1<Long>(1l));
}}).keyBy(0).timeWindow(Time.seconds(1)).sum(0).map((MapFunction<Tuple1<Long>, String>)(o)->(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+":"+String.valueOf(o.f0))).addSink(parallelCount);
env.execute("FlinkURLKafka");
}
public static class UrlDesc{
public UrlDesc(String ip, String method, String targetUrl, String http, String status, String size,
String sourceUrl, String clentType,Long count) {
super();
this.ip = ip;
this.method = method;
this.targetUrl = targetUrl;
this.http = http;
this.status = status;
this.size = size;
this.sourceUrl = sourceUrl;
this.clentType = clentType;
this.count = count;
}
public UrlDesc() {}
public String ip;
public String method;
public String targetUrl;
@Override
public String toString() {
return "UrlDesc [ip=" + ip + ", method=" + method + ", targetUrl=" + targetUrl + ", http=" + http
+ ", status=" + status + ", size=" + size + ", sourceUrl=" + sourceUrl + ", clentType=" + clentType+ ", count=" + count
+ "]";
}
public String http;
public String status;
public String size;
public String sourceUrl;
public String clentType;
public Long count;
}
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word, long count,String date,String type) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
上一篇: Bash自动补全
下一篇: 基于scala play 监控平台架构