替代Flume——Kafka Connect简介
我们知道过去对于kafka的定义是分布式,分区化的,带备份机制的日志提交服务。也就是一个分布式的消息队列,这也是他最常见的用法。但是kafka不止于此,打开最新的官网。
我们看到kafka最新的定义是:apache kafka® is a distributed streaming platform
分布式流处理平台。
这里也清晰的描述了kafka的特点:kafka用于构建实时数据管道和流式应用程序。它具有水平可扩展性、容错性、速度极快,并在数千家公司投入生产。
所以现在的kafka已经不仅是一个分布式的消息队列,更是一个流处理平台。这源于它于0.9.0.0和0.10.0.0引入的两个全新的组件kafka connect与kafka streaming。
kafka connect简介
我们知道消息队列必须存在上下游的系统,对消息进行搬入搬出。比如经典的日志分析系统,通过flume读取日志写入kafka,下游由storm进行实时的数据处理。
kafka connect的作用就是替代flume,让数据传输这部分工作可以由kafka connect来完成。kafka connect是一个用于在apache kafka和其他系统之间可靠且可靠地传输数据的工具。它可以快速地将大量数据集合移入和移出kafka。
kafka connect的导入作业可以将数据库或从应用程序服务器收集的数据传入到kafka,导出作业可以将kafka中的数据传递到查询系统,也可以传输到批处理系统以进行离线分析。
kafka connect功能包括:
- 一个通用的kafka连接的框架 - kafka connect规范化了其他数据系统与kafka的集成,简化了连接器开发,部署和管理
- 分布式和独立模式 - 支持大型分布式的管理服务,也支持小型生产环境的部署
- rest界面 - 通过易用的rest api提交和管理kafka connect
- 自动偏移管理 - 只需从连接器获取一些信息,kafka connect就可以自动管理偏移量提交过程,因此连接器开发人员无需担心连接器开发中偏移量提交这部分的开发
- 默认情况下是分布式和可扩展的 - kafka connect构建在现有的组管理协议之上。可以添加扩展集群
- 流媒体/批处理集成 - 利用kafka现有的功能,kafka connect是桥接流媒体和批处理数据系统的理想解决方案
运行kafka connect
kafka connect目前支持两种运行模式:独立和集群。
独立模式
在独立模式下,只有一个进程,这种更容易设置和使用。但是没有容错功能。
启动:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
独立模式配置
第一个参数config/connect-standalone.properties是一些基本的配置:
这几个在独立和集群模式下都需要设置:
#bootstrap.servers kafka集群列表 bootstrap.servers=localhost:9092 #key.converter key的序列化转换器 比如json的 key.converter=org.apache.kafka.connect.json.jsonconverter #value.converter value的序列化转换器 value.converter=org.apache.kafka.connect.json.jsonconverter #独立模式特有的配置: #offset.storage.file.filename 用于存储偏移量的文件 offset.storage.file.filename =/home/kafka/connect.offsets
独立模式连接器配置(配置文件)
后面的参数connector1.properties [connector2.properties ...] 可以多个,是连接器配置内容
这里我们配置一个从文件读取数据并存入kafka的配置:
connect-file-sink.properties
name
- 连接器的唯一名称。尝试再次使用相同名称注册将失败。connector.class
- 连接器的java类 此连接器的类的全名或别名。这里我们选择filestreamsinktasks.max
- 应为此连接器创建的最大任务数。如果连接器无法达到此级别的并行性,则可能会创建更少的任务。key.converter
- (可选)覆盖worker设置的默认密钥转换器。-
value.converter
- (可选)覆盖worker设置的默认值转换器。下面两个必须设置一个:
-
topics
- 以逗号分隔的主题列表,用作此连接器的输入 -
topics.regex
- 用作此连接器输入的主题的java正则表达式
-
name=local-file-sink connector.class=filestreamsink tasks.max=1 file=test.sink.txt topics=connect-test
可以在连接器中配置转换器
需要指定参数:
-
transforms
- 转换的别名列表,指定将应用转换的顺序。 -
transforms.$alias.type
- 转换的完全限定类名。 -
transforms.$alias.$transformationspecificconfig
转换的配置属性
例如,我们把刚才的文件转换器的内容添加字段
首先设置connect-standalone.properties
key.converter.schemas.enable = false value.converter.schemas.enable = false
设置connect-file-source.properties
name=local-file-source connector.class=filestreamsource tasks.max=1 file=test.txt topic=connect-test transforms=makemap, insertsource transforms.makemap.type=org.apache.kafka.connect.transforms.hoistfield$value transforms.makemap.field=line transforms.insertsource.type=org.apache.kafka.connect.transforms.insertfield$value transforms.insertsource.static.field=data_source transforms.insertsource.static.value=test-file-source
没有转换前的结果:
"foo" "bar" "hello world"
转换后:
{"line":"foo","data_source":"test-file-source"} {"line":"bar","data_source":"test-file-source"} {"line":"hello world","data_source":"test-file-source"}
常用转换类型:
- insertfield - 使用静态数据或记录元数据添加字段
- replacefield - 过滤或重命名字段
- maskfield - 用类型的有效空值替换字段(0,空字符串等)
- valuetokey value转换为key
- hoistfield - 将整个事件作为单个字段包装在struct或map中
- extractfield - 从struct和map中提取特定字段,并在结果中仅包含此字段
- setschemametadata - 修改架构名称或版本
- timestamprouter - 根据原始主题和时间戳修改记录主题
- regexrouter - 根据原始主题,替换字符串和正则表达式修改记录主题
集群模式
集群模式下,可以扩展,容错。
启动:
> bin/connect-distributed.sh config/connect-distributed.properties
在集群模式下,kafka connect在kafka主题中存储偏移量,配置和任务状态。
集群模式配置
connect-distributed.properties
#也需要基本的配置 bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.jsonconverter value.converter=org.apache.kafka.connect.json.jsonconverter #还有一些配置要注意 #group.id(默认connect-cluster) - connect的组id 请注意,这不得与使用者的组id 冲突 group.id=connect-cluster #用于存储偏移的主题; 此主题应具有许多分区 offset.storage.topic=connect-offsets offset.storage.replication.factor=1 #用于存储连接器和任务配置的主题 只能一个分区 config.storage.topic=connect-configs config.storage.replication.factor=1 #用于存储状态的主题; 此主题可以有多个分区 status.storage.topic=connect-status status.storage.replication.factor=1
在集群模式下,配置并不会在命令行传进去,而是需要rest api来创建,修改和销毁连接器。
集群模式连接器配置(rest api)
可以配置rest api服务器,支持http与https
listeners=http://localhost:8080,https://localhost:8443
默认情况下,如果未listeners
指定,则rest服务器使用http协议在端口8083上运行。
以下是当前支持的rest api:
-
get /connectors
- 返回活动连接器列表 -
post /connectors
- 创建一个新的连接器; 请求主体应该是包含字符串name
字段的json对象和包含config
连接器配置参数的对象字段 -
get /connectors/{name}
- 获取有关特定连接器的信息 -
get /connectors/{name}/config
- 获取特定连接器的配置参数 -
put /connectors/{name}/config
- 更新特定连接器的配置参数 -
get /connectors/{name}/status
- 获取连接器的当前状态,包括它是否正在运行,失败,暂停等,分配给哪个工作人员,错误信息(如果失败)以及所有任务的状态 -
get /connectors/{name}/tasks
- 获取当前为连接器运行的任务列表 -
get /connectors/{name}/tasks/{taskid}/status
- 获取任务的当前状态,包括它是否正在运行,失败,暂停等,分配给哪个工作人员,以及错误信息是否失败 -
put /connectors/{name}/pause
- 暂停连接器及其任务,这将停止消息处理,直到恢复连接器 -
put /connectors/{name}/resume
- 恢复暂停的连接器(如果连接器未暂停,则不执行任何操作) -
post /connectors/{name}/restart
- 重新启动连接器(通常是因为它已经失败) -
post /connectors/{name}/tasks/{taskid}/restart
- 重启个别任务(通常因为失败) -
delete /connectors/{name}
- 删除连接器,暂停所有任务并删除其配置
连接器开发指南
kakfa允许开发人员自己去开发一个连接器。
核心概念
要在kafka和其他系统之间复制数据,用户需要创建一个connector
connector有两种形式:
sourceconnectors
从另一个系统导入数据,例如,jdbcsourceconnector
将关系数据库导入kafka
sinkconnectors
导出数据,例如,hdfssinkconnector
将kafka主题的内容导出到hdfs文件
和对应的task:
sourcetask
和sinktask
task形成输入输出流,开发task要注意偏移量的问题。
每个流应该是一系列键值记录。还需要定期提交已处理的数据的偏移量,以便在发生故障时,处理可以从上次提交的偏移量恢复。connector还需要是动态的,实现还负责监视外部系统是否存在任何更改。
开发一个简单的连接器
开发连接器只需要实现两个接口,即connector
和task
。
这里我们简单开发一个filestreamconnector。
此连接器是为在独立模式下使用,sourceconnector/
sourcetask读取文件的每一行,
sinkconnector/
sinktask每个记录写入一个文件。
连接器示例:
继承sourceconnector,添加字段(要读取的文件名和要将数据发送到的主题)
public class filestreamsourceconnector extends sourceconnector { private string filename; private string topic;
定义实际读取数据的类
@override public class<? extends task> taskclass() { return filestreamsourcetask.class; }
在filestreamsourcetask
下面定义该类。接下来,我们添加一些标准的生命周期方法,start()
和stop()
@override public void start(map<string, string> props) { // the complete version includes error handling as well. filename = props.get(file_config); topic = props.get(topic_config); } @override public void stop() { // nothing to do since no background monitoring is required. }
最后,实施的真正核心在于taskconfigs()
@override public list<map<string, string>> taskconfigs(int maxtasks) { arraylist<map<string, string>> configs = new arraylist<>(); // only one input stream makes sense. map<string, string> config = new hashmap<>(); if (filename != null) config.put(file_config, filename); config.put(topic_config, topic); configs.add(config); return configs; }
任务示例:
源任务
实现sourcetask
创建filestreamsourcetask继承sourcetask
public class filestreamsourcetask extends sourcetask { string filename; inputstream stream; string topic; @override public void start(map<string, string> props) { filename = props.get(filestreamsourceconnector.file_config); stream = openorthrowerror(filename); topic = props.get(filestreamsourceconnector.topic_config); } @override public synchronized void stop() { stream.close(); }
接下来,我们实现任务的主要功能,即poll()
从输入系统获取事件并返回以下内容的方法list
:
@override public list<sourcerecord> poll() throws interruptedexception { try { arraylist<sourcerecord> records = new arraylist<>(); while (streamvalid(stream) && records.isempty()) { lineandoffset line = readtonextline(stream); if (line != null) { map<string, object> sourcepartition = collections.singletonmap("filename", filename); map<string, object> sourceoffset = collections.singletonmap("position", streamoffset); records.add(new sourcerecord(sourcepartition, sourceoffset, topic, schema.string_schema, line)); } else { thread.sleep(1); } } return records; } catch (ioexception e) { // underlying stream was killed, probably as a result of calling stop. allow to return // null, and driving thread will handle any shutdown if necessary. } return null; }
接收任务
不像sourceconnector
和sinkconnector
,sourcetask
并sinktask
有非常不同的接口,因为sourcetask
采用的是拉接口,并sinktask
使用推接口。两者共享公共生命周期方法,但sinktask
完全不同:
public abstract class sinktask implements task { public void initialize(sinktaskcontext context) { this.context = context; } public abstract void put(collection<sinkrecord> records); public void flush(map<topicpartition, offsetandmetadata> currentoffsets) { }
这是一个简单的例子,它们有简单的结构化数据 - 每一行只是一个字符串。几乎所有实用的连接器都需要具有更复杂数据格式的模式。要创建更复杂的数据,您需要使用kafka connect data
api。
schema schema = schemabuilder.struct().name(name) .field("name", schema.string_schema) .field("age", schema.int_schema) .field("admin", new schemabuilder.boolean().defaultvalue(false).build()) .build(); struct struct = new struct(schema) .put("name", "barbara liskov") .put("age", 75);
更多kafka相关技术文章:
什么是kafka?
kafka监控工具汇总
kafka快速入门
kafka核心之consumer
kafka核心之producer
更多实时计算,flink,kafka等相关技术博文,欢迎关注实时流式计算