欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

FLink实战读取kafka的nginx数据并写入kafka

程序员文章站 2022-07-14 13:34:08
...
package com.bigdata.realtime;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;

public class FlinkUrlApp {

	private static Logger  log = LoggerFactory.getLogger(FlinkUrlApp.class);
	
	private static String KAFKA_BROKER = "localhost:9092";
	private static String TRANSACTION_GROUP = "flink";
	
	public static void main(String[] args) throws Exception  {
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
		StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

		
		//创建Kafka消费及生产链接
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_BROKER);
     //   properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
        properties.put("group.id", TRANSACTION_GROUP);//max.partition.fetch.bytes
        properties.put("max.partition.fetch.bytes", (1024 * 1024 * 200)+"");
        
		properties.put("fetch.max.bytes", (1024 * 1024 * 200)+"");
		properties.put("fetch.min.bytes", (1024 * 1024 * 200)+"");
		properties.put("auto.commit.interval.ms", "1000");
		properties.put("max.poll.records", "10000");
		
		properties.put("send.buffer.bytes", (1024 * 1024 * 200)+"");
		properties.put("fetch.max.wait.ms", "500");
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("producer.type","async");
		properties.put("request.required.acks","1");
		properties.put("serializer.class", "kafka.serializer.StringEncoder");

        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("nginx", new SimpleStringSchema(),
                properties);
        myConsumer.setStartFromLatest();
        DataStream<String> stream = env.addSource(myConsumer);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//格式化输入数据
        DataStream<UrlDesc> UrlDescStream =  stream.flatMap(new FlatMapFunction<String, UrlDesc>(){
        	@Override
	        public void flatMap(String value, Collector<UrlDesc> out) throws Exception {
        		String temp[] = value.split(" ");
        		//
        		 
        		if(temp[6].contains("\\?"))
        			temp[6] = temp[6].split("?")[0];
        		if(temp[11].contains("-"))
        			temp[11] = "未知";
        		
    			out.collect(new UrlDesc(temp[0],temp[5].replace("\"", ""),temp[6],temp[7],temp[8],temp[9],temp[10],temp[11],1l));
	        	
	        }});
        //注册为表
        bsTableEnv.registerDataStream("UrlDesc", UrlDescStream, "ip, method,targetUrl,http,status,size,sourceUrl,clentType,count,UserActionTime.proctime");
//每隔10秒计算前10分钟的IP数据
        Table countsBysecondTable = bsTableEnv.scan("UrlDesc").window(Slide.over("10.minutes").every("10.seconds").on("UserActionTime").as("UrlDesc")).groupBy("UrlDesc,ip")
		.select("ip as word ,ip.count as count");
        FlinkKafkaProducer<String> secondProducer = new FlinkKafkaProducer<String>(KAFKA_BROKER,"IPaddress",new SimpleStringSchema());
        bsTableEnv.toAppendStream(countsBysecondTable, WordWithCount.class).map((MapFunction<FlinkUrlApp.WordWithCount, String>)(o)->JSONObject.toJSON(o).toString()).addSink(secondProducer);
        
        
        
        bsTableEnv.registerDataStream("UrlDesctargetUrl", UrlDescStream, "ip, method,targetUrl,http,status,size,sourceUrl,clentType,count,UserActionTime.proctime");
        Table UrlDesctargetUrl = bsTableEnv.scan("UrlDesctargetUrl").window(Slide.over("10.minutes").every("10.seconds").on("UserActionTime").as("UrlDesctargetUrl")).groupBy("UrlDesctargetUrl,targetUrl")
		.select("targetUrl as word ,targetUrl.count as count");
        FlinkKafkaProducer<String> UrlDesctargetUrlProducer = new FlinkKafkaProducer<String>(KAFKA_BROKER,"targetUrl",new SimpleStringSchema());
        bsTableEnv.toAppendStream(UrlDesctargetUrl, WordWithCount.class).map((MapFunction<FlinkUrlApp.WordWithCount, String>)(o)->JSONObject.toJSON(o).toString()).addSink(UrlDesctargetUrlProducer);
        
       bsTableEnv.registerDataStream("UrlDescmethod", UrlDescStream, "ip, method,targetUrl,http,status,size,sourceUrl,clentType,count,UserActionTime.proctime");
        Table method = bsTableEnv.scan("UrlDescmethod").window(Slide.over("10.minutes").every("10.seconds").on("UserActionTime").as("UrlDescmethod")).groupBy("UrlDescmethod,method")
		.select("method as word ,method.count as count");
        FlinkKafkaProducer<String> methodProducer = new FlinkKafkaProducer<String>(KAFKA_BROKER,"method",new SimpleStringSchema());
        bsTableEnv.toAppendStream(method, WordWithCount.class).map((MapFunction<FlinkUrlApp.WordWithCount, String>)(o)->JSONObject.toJSON(o).toString()).addSink(methodProducer);
        
        bsTableEnv.registerDataStream("UrlDescclentType", UrlDescStream, "ip, method,targetUrl,http,status,size,sourceUrl,clentType,count,UserActionTime.proctime");
        Table clentType = bsTableEnv.scan("UrlDescclentType").window(Slide.over("10.minutes").every("10.seconds").on("UserActionTime").as("UrlDescclentType")).groupBy("UrlDescclentType,clentType")
		.select("clentType as word ,clentType.count as count");
        FlinkKafkaProducer<String> clentTypeProducer = new FlinkKafkaProducer<String>(KAFKA_BROKER,"clentType",new SimpleStringSchema());
        bsTableEnv.toAppendStream(clentType, WordWithCount.class).map((MapFunction<FlinkUrlApp.WordWithCount, String>)(o)->JSONObject.toJSON(o).toString()).addSink(clentTypeProducer);
        FlinkKafkaProducer<String> parallelCount = new FlinkKafkaProducer<String>(KAFKA_BROKER,"parallelCount",new SimpleStringSchema());
        stream.flatMap(new FlatMapFunction<String, Tuple1<Long>>(){
        	@Override
	        public void flatMap(String value, Collector<Tuple1<Long>> out) throws Exception {
    			out.collect(new Tuple1<Long>(1l));
	        }}).keyBy(0).timeWindow(Time.seconds(1)).sum(0).map((MapFunction<Tuple1<Long>, String>)(o)->(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+":"+String.valueOf(o.f0))).addSink(parallelCount);
		env.execute("FlinkURLKafka");

 
	}
	 public static class UrlDesc{
	      public UrlDesc(String ip, String method, String targetUrl, String http, String status, String size,
				String sourceUrl, String clentType,Long count) {
			super();
			this.ip = ip;
			this.method = method;
			this.targetUrl = targetUrl;
			this.http = http;
			this.status = status;
			this.size = size;
			this.sourceUrl = sourceUrl;
			this.clentType = clentType;
			this.count = count;
		}
	      public UrlDesc() {}
	      public String ip;
	      public String method;
	      public String targetUrl;
	      @Override
		public String toString() {
			return "UrlDesc [ip=" + ip + ", method=" + method + ", targetUrl=" + targetUrl + ", http=" + http
					+ ", status=" + status + ", size=" + size + ", sourceUrl=" + sourceUrl + ", clentType=" + clentType+ ", count=" + count
					+ "]";
		}
		public String http;
		public String status;
		public String size;
		public String sourceUrl;
		public String clentType;
		public Long count;
	     
	    }
	 
	 
	 public static class WordWithCount{
	        public String word;
	        public long count;
	      
	        public WordWithCount(){}
	        public WordWithCount(String word, long count,String date,String type) {
	            this.word = word;
	            this.count = count;
	        }

	        @Override
	        public String toString() {
	            return "WordWithCount{" +
	                    "word='" + word + '\'' +
	                    ", count=" + count +
	                    '}';
	        }
	    }
}

 

相关标签: flink