SpringBoot+KafkaConnect功能+监控实践(持续更新)
程序员文章站
2024-03-11 17:06:43
...
前言
实践
环境搭建
作者是在linux环境中搭建的,建议在linux环境测试。
kafka安装运行
-
下载
传送门 -
解压
# 解压
tar -xzf kafka_2.13-3.0.0.tgz
# 进入主目录
cd kafka_2.13-3.0.0
# 启动zk
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka
bin/kafka-server-start.sh config/server.properties
启动kafka connect
# 第一个properties是启动connect的配置;
# 第二个properties是connector的配置,会根据配置创建一个connector
bin/connect-standalone.sh \
config/connect-standalone.properties \
connect-file-sink.properties
实现mysql数据读取和插入
kafka官方包里面只支持几种简单的connector类型,像JDBC这些是不支持的,因此需要额外添加。
- 直接linux下载
# 需要安装wget工具,如果没有
yum install -y wget
# 下载
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.2.4/confluentinc-kafka-connect-jdbc-10.2.4.zip
- 安装
# 先解压
tar -xzf confluentinc-kafka-connect-jdbc-10.2.3.zip
# 将该工具里的lib下面的包,复制到kafka/lib下
cp confluentinc-kafka-connect-jdbc-10.2.3/lib/* KAFKA_HOME/lib
-
windows下载JDBC连接包,再上传到Linux
下载地址 -
添加一个MySQL source connector
curl -X POST -H "Content-Type: application/json" \
-d '{
"name":"test_mysql",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"mode":"bulk",
"tables":"`zzf`.`test_zzf`",
"task.class":"io.confluent.connect.jdbc.source.JdbcSourceTask",
"tasks.max":"2",
"topics":"mysql_test",
"name":"test_mysql",
"connection.url":"jdbc:mysql://localhost:3306/zzf?user=root&password=123456",
"table.whitelist":"test_zzf"
}
}'
附录
添加额外连接器
kafka官方包里面只支持几种简单的connector类型,像JDBC这些是不支持的,因此需要额外添加
地址:https://www.confluent.io/hub
connect URI介绍
请求方法 | 路径 | 含义 |
---|---|---|
GET | /connectors | 返回所有正在运行的connector名。 |
GET | /connectors/{name} | 获取指定connetor的信息。 |
GET | /connectors/{name}/config | 获取指定connector的配置信息 |
GET | /connectors/{name}/status | 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。 |
GET | /connectors/{name}/tasks | 获取指定connector正在运行的task。 |
GET | /connectors/{name}/tasks/{taskid}/status | 获取指定connector的task的状态信息。 |
PUT | /connectors/{name}/config | 更新指定connector的配置信息 |
PUT | /connectors/{name}/pause | 暂停connector和它的task,停止数据处理知道它被恢复 |
PUT | /connectors/{name}/resume | 恢复一个被暂停的connector。 |
POST | /connectors | 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。 |
POST | /connectors/{name}/restart | 重启一个connector,尤其是在一个connector运行失败的情况下比较常用 |
POST | /connectors/{name}/tasks/{taskId}/restart | 重启一个task,一般是因为它运行失败才这样做。 |
DELETE | /connectors/{name} | 删除一个connector,停止它的所有task并删除配置。 |
上一篇: InfoQ的自动新闻订阅
下一篇: HbuilderX vuex配置