Learning Apache Flink(API)
本文是参考Apache Flink v1.3官方文档,本文所使用的是scala版本的API,基础架构参见《Learning Apache Flink(BASIC)》
业务场景
Flink接kafka的数据,然后通过初步的过滤得到一个结果集,再进行“打标签”,最后对“打标签”的结果进行过滤,最终输出到kafka中。例如,在topic foo中的数据表示"imsi,lac,cell",先通过imsi字段筛选出所有以460开头的字段,再通过lac和cell字段判断是否在指定的区域,增加一个字段isSpecifiedLocation,值为true或者false。最终输出到kafka中的字段为"imsi,lac,cell,isSpecifiedLocation,timestamp",且isSpecifiedLocation为true。
Flink读kafka数据
注:本文中所使用的kafka的版本为0.10.0
官方文档中Provided TableSources针对kafka指提供了json和avro格式的接入,所以如果是在topic中的数据是csv格式的,可以模仿Kafka010JsonTableSource
和JsonRowDeserializationSchema
自定义KafkaCsvTableSource
和CsvRowDeserializationSchema
解析csv格式数据(具体实现参见完整代码章节),然后就可以通过下面的方法注册一个TableSource
//Register a TableSource
val kafkaTableSource = new KafkaCsvTableSource(
"foo",
properties,
new CsvRowDeserializationSchema(typeInfo),
typeInfo)
tableEnv.registerTableSource("KafkaCsvTable", kafkaTableSource)
val kafkaCsvTable = tableEnv.scan("KafkaCsvTable")
得到一个Table之后,就可以使用Table API,进行数据的过滤
val filterResult = kafkaCsvTable.where('imsi like "460%").select("imsi,lac,cell")
DataStream动态增加字段
- 将Table转换为DataStream
val dsRow: DataStream[Row] = tableEnv.toAppendStream(filterResult)
- 增加字段
val newDsRows = dsRow.map(row => {
val ret = new Row(row.getArity() + 2)
for(i <- 0 to row.getArity()-1) {
ret.setField(i, row.getField(i))
}
val isSpecifiedLocation = if(ret.getField(1).equals(ret.getField(2))) true else false
ret.setField(row.getArity(), isSpecifiedLocation)
ret.setField(row.getArity()+1, System.currentTimeMillis())
ret
})
- 再将新生成的DataStream注册为Table,进行最终的过滤
tableEnv.registerDataStream("newTable", newDsRows)
val newKafkaCsvTable = tableEnv.scan("newTable")
val newResult = newKafkaCsvTable.filter('isSpecifiedLocation === true).select("imsi,lac,cell,isSpecifiedLocation,timestamp")
Flink向kafka写数据
本文使用的是Flink提供的Kafka09JsonTableSink类直接将结果输出为json格式
val sink = new Kafka09JsonTableSink("bar", properties, new FlinkFixedPartitioner[Row])
newResult.writeToSink(sink)
测试用例
执行
./bin/flink run -c com.woople.streaming.scala.examples.kafka.FlinkKafkaDemo /opt/flink-tutorials-1.0-bundle.jar
向topic foo中写入
4601234,1,1
数据,在topic bar中可以得到{"imsi":"4601234","lac":"1","cell":"1","isSpecifiedLocation":true,"timestamp":1511222771896}
结果,如果输入的是4601234,2,1
则不符合条件不会输出。
Troubleshooting
在代码调试过程中遇到一个错误
org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
在网上找到FLINK-6500,参考里面的方法,在代码中添加了这行代码之后,问题解决了
implicit val tpe: TypeInformation[Row] = new RowTypeInfo(types, names)
完整代码
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.woople</groupId>
<artifactId>flink-tutorials</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.10</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.10</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
<build>
<defaultGoal>package</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<goals>
<goal>copy-resources</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>eclipse-add-source</id>
<goals>
<goal>add-source</goal>
</goals>
</execution>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>attach-scaladocs</id>
<phase>verify</phase>
<goals>
<goal>doc-jar</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>2.11.8</scalaVersion>
<recompileMode>incremental</recompileMode>
<useZincServer>true</useZincServer>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<finalName>${project.artifactId}-${project.version}-bundle</finalName>
</configuration>
</plugin>
</plugins>
</build>
</project>
KafkaCsvTableSource.java
package com.woople.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import java.util.Properties;
public class KafkaCsvTableSource implements StreamTableSource<Row> {
/** The Kafka topic to consume. */
private final String topic;
/** Properties for the Kafka consumer. */
private final Properties properties;
/** Deserialization schema to use for Kafka records. */
private final DeserializationSchema<Row> deserializationSchema;
/** Type information describing the result type. */
private final TypeInformation<Row> typeInfo;
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @param typeInfo Type information describing the result type.
*/
public KafkaCsvTableSource(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
TypeInformation<Row> typeInfo) {
this.topic = Preconditions.checkNotNull(topic, "Topic");
this.properties = Preconditions.checkNotNull(properties, "Properties");
this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information");
}
/**
* NOTE: This method is for internal use only for defining a TableSource.
* Do not use it in Table API programs.
*/
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
// Version-specific Kafka consumer
FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
return env.addSource(kafkaConsumer);
}
@Override
public TypeInformation<Row> getReturnType() {
return typeInfo;
}
/**
* Returns the version-specific Kafka consumer.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @return The version-specific Kafka consumer
*/
private FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer010<Row>(topic, deserializationSchema, properties);
}
/**
* Returns the deserialization schema.
*
* @return The deserialization schema
*/
protected DeserializationSchema<Row> getDeserializationSchema() {
return deserializationSchema;
}
@Override
public String explainSource() {
return "";
}
}
CsvRowDeserializationSchema.java
package com.woople.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
public class CsvRowDeserializationSchema implements DeserializationSchema<Row> {
/** Type information describing the result type. */
private final TypeInformation<Row> typeInfo;
/** Field names to parse. Indices match fieldTypes indices. */
private final String[] fieldNames;
/** Types to parse fields as. Indices match fieldNames indices. */
private final TypeInformation<?>[] fieldTypes;
/** Flag indicating whether to fail on a missing field. */
private boolean failOnMissingField;
/**
* Creates a JSON deserialization schema for the given fields and types.
*
* @param typeInfo Type information describing the result type. The field names are used
* to parse the JSON file and so are the types.
*/
public CsvRowDeserializationSchema(TypeInformation<Row> typeInfo) {
Preconditions.checkNotNull(typeInfo, "Type information");
this.typeInfo = typeInfo;
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
}
@Override
public Row deserialize(byte[] message) throws IOException {
try {
String messages = new String(message);
String[] messagesArray = messages.split(",");
Row row = new Row(fieldNames.length);
for (int i = 0; i < fieldNames.length; i++) {
row.setField(i, messagesArray[i]);
}
return row;
} catch (Throwable t) {
throw new IOException("Failed to deserialize JSON object.", t);
}
}
@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}
@Override
public TypeInformation<Row> getProducedType() {
return typeInfo;
}
/**
* Configures the failure behaviour if a JSON field is missing.
*
* <p>By default, a missing field is ignored and the field is set to null.
*
* @param failOnMissingField Flag indicating whether to fail or not on a missing field.
*/
public void setFailOnMissingField(boolean failOnMissingField) {
this.failOnMissingField = failOnMissingField;
}
}
FlinkKafkaDemo.scala
package com.woople.streaming.scala.examples.kafka
import java.util.Properties
import com.woople.flink.streaming.connectors.kafka.{CsvRowDeserializationSchema, KafkaCsvTableSource}
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSink
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
object FlinkKafkaDemo {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val typeInfo = Types.ROW_NAMED(Array("imsi","lac","cell"), Types.STRING, Types.STRING, Types.STRING)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "10.1.236.66:6667")
properties.setProperty("group.id", "test")
//Register a TableSource
val kafkaTableSource = new KafkaCsvTableSource(
"foo",
properties,
new CsvRowDeserializationSchema(typeInfo),
typeInfo)
tableEnv.registerTableSource("KafkaCsvTable", kafkaTableSource)
val kafkaCsvTable = tableEnv.scan("KafkaCsvTable")
val filterResult = kafkaCsvTable.where('imsi like "460%").select("imsi,lac,cell")
val dsRow: DataStream[Row] = tableEnv.toAppendStream(filterResult)
{
val types = Array[TypeInformation[_]](
Types.STRING,
Types.STRING,
Types.STRING,
Types.BOOLEAN,
Types.LONG)
val names = Array("imsi","lac","cell","isSpecifiedLocation","timestamp")
implicit val tpe: TypeInformation[Row] = new RowTypeInfo(types, names)
val newDsRows = dsRow.map(row => {
val ret = new Row(row.getArity() + 2)
for(i <- 0 to row.getArity()-1) {
ret.setField(i, row.getField(i))
}
val isSpecifiedLocation = if(ret.getField(1).equals(ret.getField(2))) true else false
ret.setField(row.getArity(), isSpecifiedLocation)
ret.setField(row.getArity()+1, System.currentTimeMillis())
ret
})
tableEnv.registerDataStream("newTable", newDsRows)
val newKafkaCsvTable = tableEnv.scan("newTable")
val newResult = newKafkaCsvTable.filter('isSpecifiedLocation === true).select("imsi,lac,cell,isSpecifiedLocation,timestamp")
val sink = new Kafka09JsonTableSink("bar", properties, new FlinkFixedPartitioner[Row])
newResult.writeToSink(sink)
env.execute("Flink kafka demo")
}
}
}
总结
本文只是一个简单的样例,代码中并没有考虑性能等因素。后续会对相关内容进行深入的研究。
上一篇: 定时器timer
下一篇: c# wpf模拟按钮点击操作
推荐阅读
-
Flink实战(六) - Table API & SQL编程
-
idea中flink启动报错org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters
-
Flink 1.8 Basic API Concepts 基本API概念
-
「漏洞预警」Apache Flink 任意 Jar 包上传导致远程代码执行漏洞复现
-
Apache Flink 分布式运行时环境
-
Flink入门(一)——Apache Flink介绍
-
Flink入门(五)——DataSet Api编程指南
-
Apache Flink 未授权访问+远程代码执行
-
深入了解 Flink 网络栈(二):监控、指标和处理背压 工作网络协议jvm活动apache
-
Flink DataStream API之Operators