《Apache Kafka 实战》笔记 - 10.2.2 Connect - 单机
程序员文章站
2024-03-11 17:02:25
...
Kafka connect
Kafka connector 负责将数据从外部系统转移到Kafka或从Kafka中转移到其他系统,比如Kafka Connect能够将文件系统中某些文件的内容全部灌入Kafka topic中或者是把Kafka topic中的消息导出到外部的数据库系统。
Kafka Connect 主要由 source connector 和 sink connector 组成,source connector 负责把输入数据从外部系统中导入到Kafka中,而 sink connector 则负责把输出数据导出到其他外部系统。
目标
下面在一个单节点的Kafka集群上运行 standalone 模式的 Kafka Connect,把输入文件 foo.txt 中的数据通过Kafka传输到输出文件 bar.txt 中。
创建配置文件
test-connect-file-source.properties
name=test-file-source
connector.class=FileStreamSource
tasks.max=1
file=foo.txt
topic=connect-file-test
test-connect-file-sink.properties
name= test-file-sink
connector.class=FileStreamSink
tasks.max=1
file=bar.txt
topics=connect-file-test
启动 connect
bin/connect-standalone.sh config/connect-standalone.properties config/test-connect-file-source.properties config/test-connect-file-sink.properties
启动后会一直提示警告信息:
WARN Couldn't find file foo.txt for ...
这是因为源文件还不存在,向源文件中添加内容:
echo 'hello' >> ./foo.txt
echo 'kafka connect test example' >> ./foo.txt
echo 'this is a file connector test.' >> ./foo.txt
查看下目录,已经出现了 bar.txt,内容就是上面输入的。
使用 consumer 读取 topic 验证一下:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-file-test --from-beginning
导出前修改消息数据
下面实验在将数据导出到目标文件之前为每条消息增加一个IP字段。
如果要插入IP静态字段,我们必须修改source connector的配置文件,增加以下这些行:
transforms=WrapMap,InsertHost
transforms.WrapMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.WrapMap.field=line
transforms.InsertHost.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertHost.static.field=ip
transforms.InsertHost.static.value=com.connector.machine1
之后重启Kafka Connect,然后写入foo.txt文件:
echo "this is a transformation test" >> ./foo.txt
查看bar.txt可以发现这条新增的数据:
Struct{line=this is a transformation test,ip=com.connector.machine1}
上一篇: 彻底理解(k8s)Namespace