flink sql使用kafka作为source和sink
程序员文章站
2022-06-16 16:42:08
...
大家都知道sql有着简单,直接,容易上手等优势,所以现在大有用sql去掉api的趋势。那么我们少说废话,下面先上个sql的列子
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10000)
env.setParallelism(1)
//注入数据源
var tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
tableEnv.registerExternalCatalog("kafka", new UDMExternalCatalog())
tableEnv.sqlUpdate(
s"""INSERT INTO `kafka.kafka-k8s.pb_sink_test`
|select
|fstDeptSet,
|filedName1,
|filedName2,
|userId,
|brandNames
|from kafka.`kafka-k8s`.`pb_internal_test`
| """.stripMargin)
env.execute("Flink SQL Skeleton")
上面是一个查询,插入语句,在flink中会被转为一个任务进行提交
下面我们大概讲一下flink内部kafka的实例化过程
有图可知,主要分为4大步骤,先通过calcite分析sql,转为相应的relnode,在根据用户配置的schema和Java spi,过滤出需要的kafka produce和kafka consumer版本。
kafka consumer对应于select部分
kafka produce对应于insert部分
转载于:https://my.oschina.net/u/1262062/blog/2980659
上一篇: 第25章 JDBC核心技术第2节
下一篇: php删除指定目录的方法_PHP教程