欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SpringBoot+KafkaConnect功能+监控实践(持续更新)

程序员文章站 2024-03-11 17:06:43
...

前言

实践

环境搭建

  作者是在linux环境中搭建的,建议在linux环境测试。

kafka安装运行

# 解压
tar -xzf kafka_2.13-3.0.0.tgz
# 进入主目录
cd kafka_2.13-3.0.0
# 启动zk
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka
bin/kafka-server-start.sh config/server.properties

启动kafka connect

# 第一个properties是启动connect的配置;
# 第二个properties是connector的配置,会根据配置创建一个connector
bin/connect-standalone.sh \
	config/connect-standalone.properties \
	connect-file-sink.properties

实现mysql数据读取和插入

  kafka官方包里面只支持几种简单的connector类型,像JDBC这些是不支持的,因此需要额外添加。

  • 直接linux下载
# 需要安装wget工具,如果没有
yum install -y wget

# 下载
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.2.4/confluentinc-kafka-connect-jdbc-10.2.4.zip
  • 安装
# 先解压
tar -xzf confluentinc-kafka-connect-jdbc-10.2.3.zip
# 将该工具里的lib下面的包,复制到kafka/lib下
cp confluentinc-kafka-connect-jdbc-10.2.3/lib/* KAFKA_HOME/lib
  • windows下载JDBC连接包,再上传到Linux
    下载地址

  • 添加一个MySQL source connector

curl -X POST -H "Content-Type: application/json" \
-d '{
	"name":"test_mysql", 
	"config":{
		"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
		"mode":"bulk",
		"tables":"`zzf`.`test_zzf`",
		"task.class":"io.confluent.connect.jdbc.source.JdbcSourceTask",
		"tasks.max":"2",
		"topics":"mysql_test",
		"name":"test_mysql",
		"connection.url":"jdbc:mysql://localhost:3306/zzf?user=root&password=123456",
		"table.whitelist":"test_zzf"
	}
}'

附录

添加额外连接器

  kafka官方包里面只支持几种简单的connector类型,像JDBC这些是不支持的,因此需要额外添加
地址:https://www.confluent.io/hub

connect URI介绍

请求方法 路径 含义
GET /connectors 返回所有正在运行的connector名。
GET /connectors/{name} 获取指定connetor的信息。
GET /connectors/{name}/config 获取指定connector的配置信息
GET /connectors/{name}/status 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status 获取指定connector的task的状态信息。
PUT /connectors/{name}/config 更新指定connector的配置信息
PUT /connectors/{name}/pause 暂停connector和它的task,停止数据处理知道它被恢复
PUT /connectors/{name}/resume 恢复一个被暂停的connector。
POST /connectors 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
POST /connectors/{name}/restart 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} 删除一个connector,停止它的所有task并删除配置。