kafka-connect-jdbc-sink配置属性
kafka-connect-jdbc-sink配置属性
要使用此连接器,请在connector.class
配置属性中指定连接器类的名称。
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
连接器特定的配置属性如下所述。
数据库连接安全
在连接器配置中,您会注意到没有安全参数。这是因为SSL不属于JDBC标准,而是取决于所使用的JDBC驱动程序。通常,您需要通过connection.url
参数配置SSL 。例如,对于MySQL,它看起来像:
connection.url="jdbc:mysql://127.0.0.1:3306/sample?verifyServerCertificate=false&useSSL=true&requireSSL=true"
请与您的特定JDBC驱动程序文档联系以获取有关支持和配置的信息。
连接(Connection)
-
connection.url
JDBC连接URL。例如:
jdbc:oracle:thin:@localhost:1521:orclpdb1
,jdbc:mysql://localhost/db_name
,jdbc:sqlserver://localhost;instance=SQLEXPRESS;databaseName=db_name
- 类型:字符串
- 重要性:高
-
connection.user
JDBC连接用户。
- 类型:字符串
- 默认值:空
- 重要性:高
-
connection.password
JDBC连接密码。
- 类型:密码
- 默认值:空
- 重要性:高
-
dialect.name
该连接器应使用的数据库方言的名称。默认情况下为空,并且连接器根据JDBC连接URL自动确定方言。如果要覆盖该行为并使用特定的方言,请使用此选项。可以使用JDBC连接器插件中所有正确包装的方言。
- 类型:字符串
- 默认值:“”
- 有效值:[,Db2DatabaseDialect,MySqlDatabaseDialect,SybaseDatabaseDialect,GenericDatabaseDialect,OracleDatabaseDialect,SqlServerDatabaseDialect,PostgreSqlDatabaseDialect,SqliteDatabaseDialect,DerbyDatabaseDialect,SapHanaDatabaseDialect,MockDatabaseDialect,MockDatabaseDialect
- 重要性:低
写(Writes)
-
insert.mode
要使用的插入模式。支持的模式有:
-
insert
使用标准的SQL
INSERT
语句。 -
upsert
如果连接器支持目标数据库,请使用适当的upsert语义,例如。
INSERT OR IGNORE
-
update
如果连接器支持目标数据库,请使用适当的更新语义,例如
UPDATE
。 -
类型:字符串
-
默认值:插入
-
有效值:[insert, upsert, update]
-
重要性:高
-
-
batch.size
指定在可能的情况下尝试尝试将多少记录一起批处理以插入到目标表中。
- 类型:int
- 默认值:3000
- 有效值:[0,…]
- 重要性:中等
-
delete.enabled
是否将
null
记录值视为删除。要求pk.mode
是record_key
。- 类型:布尔值
- 默认值:false
- 重要性:中等
数据映射(Data Mapping)
-
table.name.format
目标表名称的格式字符串,其中可以包含“ $ {topic}”作为源主题名称的占位符。
例如,
kafka_${topic}
对于主题“订单”将映射到表名称“ kafka_orders”。- 类型:字符串
- 默认值:$ {topic}
- 重要性:中等
-
pk.mode
主键模式,也请参考
pk.fields
文档以了解相互之间的相互作用。支持的模式有:-
none
没有使用键。
-
kafka
Apache Kafka®坐标用作PK。
-
record_key
使用了来自记录键的字段,该字段可以是基元或结构。
-
record_value
使用记录值中的字段,该字段必须是结构。
-
类型:字符串
-
默认值:无
-
有效值:[none,kafka,record_key,record_value]
-
重要性:高
-
-
pk.fields
逗号分隔的主键字段名称列表。此配置的运行时解释取决于
pk.mode
:-
none
在此模式下,由于没有字段被用作主键,因此将被忽略。
-
kafka
必须是代表Kafka坐标的三项配置,
__connect_topic,__connect_partition,__connect_offset
如果为空则默认为。 -
record_key
如果为空,则将使用键结构中的所有字段,否则将用于提取所需的字段-对于原始键,仅必须配置一个字段名称。
-
record_value
如果为空,则将使用值struct中的所有字段,否则将用于提取所需的字段。
-
类型:list
-
默认值:无
-
重要性:中等
-
-
fields.whitelist
以逗号分隔的记录值字段名称的列表。如果为空,将使用记录值中的所有字段,否则将用于过滤到所需的字段。
请注意,
pk.fields
是在目标数据库中哪些字段构成主键列的上下文中独立应用的,而此配置适用于其他列。- 类型:list
- 默认值:“”
- 重要性:中等
-
db.timezone
插入基于时间的值时,连接器应使用的JDBC时区的名称。默认为UTC。
- 类型:字符串
- 默认值:“ UTC”
- 有效值:任何有效的JDK时区
- 重要性:中等
DDL支持
-
auto.create
如果发现缺少目标表,将基于记录模式自动创建目标表
CREATE
。- 类型:布尔值
- 默认值:false
- 重要性:中等
-
auto.evolve
如果发现缺少字段,将基于记录架构,自动添加列
ALTER
。- 类型:布尔值
- 默认值:false
- 重要性:中等
-
quote.sql.identifiers
何时在SQL语句中引用表名,列名和其他标识符。为了向后兼容,默认值为
always
。- 类型:字符串
- 默认值:始终
- 重要性:中等
重试(Retries)
-
max.retries
任务失败前重试错误的最大次数。
- 类型:int
- 默认值:10
- 有效值:[0,…]
- 重要性:中等
-
retry.backoff.ms
错误尝试重试之前等待的时间(以毫秒为单位)。
- 类型:int
- 默认值:3000
- 有效值:[0,…]
- 重要性:中等
原文
https://docs.confluent.io/5.4.0/connect/kafka-connect-jdbc/sink-connector/sink_config_options.html
上一篇: Tensorflow2.0教程-自定义层
下一篇: k8s学习(二十) helm的基本使用