欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume Sink到Kafka遇到的问题

程序员文章站 2022-06-14 09:08:18
...

1、版本问题

2018-12-02 15:23:06,334 (conf-file-poller-0) [ERROR - org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:427)] Sink k1 has been removed due to an error during configuration
org.apache.flume.conf.ConfigurationException: brokerList must contain at least one Kafka broker
        at org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)
        at org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37)
        at org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:217)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

查看官网使用了最新的Flume版本(1.8.0)的配置信息,导致了上面的一个错误,数据无法写入Kafka。
使用相对应的版本的配置即可。

所以配置里的a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092换成a1.sinks.k1.brokerList= master:9092,slave1:9092,slave2:9092

Flume1.8.0有以下过期配置:
Deprecated Properties

Property Name Default Description
brokerList Use kafka.bootstrap.servers
topic default-flume-topic Use kafka.topic
batchSize 100 Use kafka.flumeBatchSize
requiredAcks 1 Use kafka.producer.acks

其他可能因为版本出现的警告:

WARN kafka.KafkaSink: The Property 'topic' is not set. Using the default topic name: default-flume-topic