Flink mysql cdc 读取
程序员文章站
2022-03-04 23:02:34
...
- Flink1.11 读取mysql cdc
- 返回DataStream[(Boolean, Row)],可以根据元组第一个值为True or false判定数据是弃用或者更新插入
package com.cdc
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
object MysqlCDC {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance.inStreamingMode.useBlinkPlanner.build
val sTableEnv = StreamTableEnvironment.create(env,bsSettings)
val userDDL =
s"""
|CREATE TABLE t_user (
| uid int,
| name string
|) WITH (
| 'connector' = 'mysql-cdc',
| 'hostname' = 'jeff200',
| 'port' = '3306',
| 'username' = 'root',
| 'password' = 'root',
| 'database-name' = 'test_db',
| 'table-name' = 't_user'
|)
|""".stripMargin
sTableEnv.executeSql(userDDL)
val filterSql =
s"""
|SELECT uid, name
|FROM t_user
|WHERE uid > 0
""".stripMargin
val table: Table = sTableEnv.sqlQuery(filterSql)
// 回撤流方式输出
val kafkaStream:DataStream[(Boolean, Row)] = sTableEnv.toRetractStream(table)
kafkaStream.print()
env.execute("mysql cdc")
}
}
- 开启binlog
#在/etc/my.cnf中的[mysqld]下面直接增加内容
vi /etc/my.cnf
server_id=1
log_bin = mysql-bin
binlog_format = ROW
#退出并保存
- 在navicat或者sql客户端执行,建表,插入数
USE test_db;
CREATE TABLE t_user
(
`uid` INT(11),
`name` VARCHAR(25),
`age` INT(11),
`sex` VARCHAR(25),
`ts` timestamp default CURRENT_TIMESTAMP()
);
use test_db;
INSERT INTO t_user (uid, `name`, age, sex) VALUES (1, "小明", 20, '男');
INSERT INTO t_user (uid, `name`, age, sex) VALUES (2, "小丽", 34, '女');
SELECT * FROM t_user
- 添加mysql cdc 依赖 和table blink
<!-- flink table api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
<!-- Flink-CDC -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.1.0</version>
</dependency>
- 结果