欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

spark读取kafka数据(两种方式比较及flume配置文件)

程序员文章站 2022-03-14 08:34:54
spark读取kafka数据(两种方式比较及flume配置文件)。 Kafka topic及partition设计 1、对于银行应用日志,一个系统建一个topic,每台主机对应一个part...

spark读取kafka数据(两种方式比较及flume配置文件)。

Kafka topic及partition设计

1、对于银行应用日志,一个系统建一个topic,每台主机对应一个partition,规则为,flume采集时,同一个应用,数据送到同一个topic,一个主机,送一个partition,这样做是为了同一个日志的数据在一个partition中,顺序不会乱。另,flume配置文件可以配置sink的topic和partition id(xxx.kafka.topic = xxx;dafaultPartitionId = x)。

2、flume送数据到kafka,flume配置文件配置文件

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 150000

a1.channels.c1.transactionCapacity = 1500

##自定义source,见前一篇博客(flume 1.7 TailDir source重复获取数据集不释放资源解决办法)

a1.sources.r1.type = TaildirSourceSelf

a1.sources.r1.batchSize = 500

a1.sources.r1.skipToEnd = true

a1.sources.r1.ignorePathChange = true

a1.sources.r1.positionFile = test/logConf/xxx_taildir_position.json

a1.sources.r1.writePosInterval = 5000

a1.sources.r1.filegroups = f0

a1.sources.r1.filegroups.f0 = xxx/xxx/*.*

a1.sources.r1.file.header.enabled = true

a1.sources.r1.file.name.key = tag.default.filename

a1.sources.r1.dir.header.enabled = true

a1.sources.r1.dir.name.key = tag.default.dirname

a1.sources.r1.inode.header.enabled = false

a1.sources.r1.inode.key = tag.default.inode

# For Kafka Sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.brokerList = mn01:9092

a1.sinks.k1.kafka.topic = test-topic

a1.sinks.k1.defaultPartitionId = 0

a1.sinks.k1.flumeBatchSize = 500

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.k1.kafka.producer.max.request.size = 5120000

3、SparkStreaming从kafka获取数据

Spark提供两种方法从kafka拿数据:

1)Receiver方式:

a、其实是通过zookeeper连接kafka队列;

b、kafka中topic的partition数目与sparkrdd中partition数目没有关系,增加topic的partition数,不会增加spark的处理并行度,仅仅是增加获取数据的Receiver;

c、Receiver方式,据都存储在Spark Executor的内存中,一旦spark停止运行(如机器崩溃),数据将无法恢复,只有则不开启WAL(write ahead log)机制,及预写日志的方式同步将数据写到分布式系统中,虽然可以恢复,但是效率低,每份数据需要复制两份;

2)Director方式:

a、这是spark1.3引进的新方法,直接从kafka的broker分区中读数据,跳过zookeeper,也没有receiver;

b、kafka中topic的partition与spark rdd的partition一一对应,也就是说增加kafka中topic的partition数目,就增加了spark的并行处理度;

c、该方式不会复制两份数据,因为kafka本身就有高可用,kafka会做数据备份,宕机后,可以利用kafka副本恢复;

补充:这种方式数据的offset存在spark的checkpoint中,不然每次重启spark,就会从kafka的最新offset位置读数据,会丢数据;所以,spark需要设置checkpoint,在创建JavaStreamingContext时,建议使用

JavaStreamingContext.getOrCreate(sparkChkDir,newStreamContextFunction());//第一个参数:checkpoint路径;第二个参数,返回新的JavaStreamingContext;

及如果checkpoint存在就从checkpoint得到sparkStreamingContext,不存在就创建sparkStreamingContext;

下面是我从spark获取数据的代码(java版):

public static JavaDStream getJavaDStreamFromKafka(String _Brokers, String _Topics, String _Metadata,

JavaStreamingContext _JavaStreamingContext) {

//返回该“,”分隔的所有topic的数据

HashSet _TopicsSet = new HashSet(Arrays.asList(_Topics.split(",")));

Map _KafkaParams = new HashMap();

_KafkaParams.put(_Metadata, _Brokers);

_KafkaParams.put("group.id", "xxxx");

_KafkaParams.put("fetch.message.max.bytes", "5120000");

// Create direct kafka stream with brokers and topics

JavaDStream _MessagesLines = KafkaUtils

.createDirectStream(_JavaStreamingContext, String.class, String.class, StringDecoder.class,

StringDecoder.class, _KafkaParams, _TopicsSet)

.map(new Function, String>() {

public String call(Tuple2 tuple2) {

return tuple2._2();

}

});

return _MessagesLines;

}