欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Learning Apache Flink(join API)

程序员文章站 2022-07-14 13:29:20
...

本文将演示如何通过flink接kafka两个topic的数据,然后进行join。

The DeserializationSchema

首先要实现DeserializationSchema接口,用来解析kafka中的数据

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.types.Row;
import java.io.IOException;

public class TestDeserializationSchema implements DeserializationSchema<Row> {
    private final TypeInformation<Row> typeInfo;
    private final String[] fieldNames;

    public TestDeserializationSchema(TypeInformation<Row> typeInfo) {
        this.typeInfo = typeInfo;
        this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
    }

    @Override
    public Row deserialize(byte[] message) throws IOException {
        try {
            String messages = new String(message);
            String[] messagesArray = messages.split(",");

            Row row = new Row(fieldNames.length);
            for (int i = 0; i < fieldNames.length; i++) {
                row.setField(i, messagesArray[i]);
            }

            return row;
        } catch (Throwable t) {
            throw new IOException("Failed to deserialize Row object.", t);
        }
    }

    @Override
    public boolean isEndOfStream(Row row) {
        return false;
    }

    @Override
    public TypeInformation<Row> getProducedType() {
        return typeInfo;
    }
}

完整实现

import java.util.Properties

import com.woople.flink.streaming.connectors.kafka.TestDeserializationSchema
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.types.Row

object KafkaJoin {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    // comma separated list of Kafka brokers
    properties.setProperty("bootstrap.servers", "hostA:6667")
    // id of the consumer group
    properties.setProperty("group.id", "test")
    val s1 = env.addSource(new FlinkKafkaConsumer010[Row]("foo", new TestDeserializationSchema(Types.ROW_NAMED(Array("imsi","lac","cell"), Types.STRING, Types.STRING, Types.STRING)), properties))
    val s2 = env.addSource(new FlinkKafkaConsumer010[Row]("baz", new TestDeserializationSchema(Types.ROW_NAMED(Array("imsi","phoneNum"), Types.STRING, Types.STRING)), properties))

    val result = s1.join(s2).where(_.getField(0)).equalTo(_.getField(0))
      .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
        .apply((l, r) => {
          val row = new Row(4)
          row.setField(0, l.getField(0))
          row.setField(1, l.getField(1))
          row.setField(2, l.getField(2))
          row.setField(3, r.getField(1))
          row
        })

    result.print()
    env.execute("Kafka Window Stream Join")
  }
}

测试用例

分别向topic中写入

kafka-console-producer.sh --broker-list hostA:6667 --topic foo
400123456789,88,99

kafka-console-producer.sh --broker-list hostA:6667 --topic baz
400123456789,19900000000

最终得到的结果

400123456789,88,99,19900000000