spark读取kafka数据(两种方式比较及flume配置文件)
spark读取kafka数据(两种方式比较及flume配置文件)。
Kafka topic及partition设计
1、对于银行应用日志,一个系统建一个topic,每台主机对应一个partition,规则为,flume采集时,同一个应用,数据送到同一个topic,一个主机,送一个partition,这样做是为了同一个日志的数据在一个partition中,顺序不会乱。另,flume配置文件可以配置sink的topic和partition id(xxx.kafka.topic = xxx;dafaultPartitionId = x)。
2、flume送数据到kafka,flume配置文件配置文件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 150000
a1.channels.c1.transactionCapacity = 1500
##自定义source,见前一篇博客(flume 1.7 TailDir source重复获取数据集不释放资源解决办法)
a1.sources.r1.type = TaildirSourceSelf
a1.sources.r1.batchSize = 500
a1.sources.r1.skipToEnd = true
a1.sources.r1.ignorePathChange = true
a1.sources.r1.positionFile = test/logConf/xxx_taildir_position.json
a1.sources.r1.writePosInterval = 5000
a1.sources.r1.filegroups = f0
a1.sources.r1.filegroups.f0 = xxx/xxx/*.*
a1.sources.r1.file.header.enabled = true
a1.sources.r1.file.name.key = tag.default.filename
a1.sources.r1.dir.header.enabled = true
a1.sources.r1.dir.name.key = tag.default.dirname
a1.sources.r1.inode.header.enabled = false
a1.sources.r1.inode.key = tag.default.inode
# For Kafka Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = mn01:9092
a1.sinks.k1.kafka.topic = test-topic
a1.sinks.k1.defaultPartitionId = 0
a1.sinks.k1.flumeBatchSize = 500
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.max.request.size = 5120000
3、SparkStreaming从kafka获取数据
Spark提供两种方法从kafka拿数据:
1)Receiver方式:
a、其实是通过zookeeper连接kafka队列;
b、kafka中topic的partition数目与sparkrdd中partition数目没有关系,增加topic的partition数,不会增加spark的处理并行度,仅仅是增加获取数据的Receiver;
c、Receiver方式,据都存储在Spark Executor的内存中,一旦spark停止运行(如机器崩溃),数据将无法恢复,只有则不开启WAL(write ahead log)机制,及预写日志的方式同步将数据写到分布式系统中,虽然可以恢复,但是效率低,每份数据需要复制两份;
2)Director方式:
a、这是spark1.3引进的新方法,直接从kafka的broker分区中读数据,跳过zookeeper,也没有receiver;
b、kafka中topic的partition与spark rdd的partition一一对应,也就是说增加kafka中topic的partition数目,就增加了spark的并行处理度;
c、该方式不会复制两份数据,因为kafka本身就有高可用,kafka会做数据备份,宕机后,可以利用kafka副本恢复;
补充:这种方式数据的offset存在spark的checkpoint中,不然每次重启spark,就会从kafka的最新offset位置读数据,会丢数据;所以,spark需要设置checkpoint,在创建JavaStreamingContext时,建议使用
JavaStreamingContext.getOrCreate(sparkChkDir,newStreamContextFunction());//第一个参数:checkpoint路径;第二个参数,返回新的JavaStreamingContext;
及如果checkpoint存在就从checkpoint得到sparkStreamingContext,不存在就创建sparkStreamingContext;
下面是我从spark获取数据的代码(java版):
public static JavaDStream getJavaDStreamFromKafka(String _Brokers, String _Topics, String _Metadata,
JavaStreamingContext _JavaStreamingContext) {
//返回该“,”分隔的所有topic的数据
HashSet _TopicsSet = new HashSet(Arrays.asList(_Topics.split(",")));
Map _KafkaParams = new HashMap();
_KafkaParams.put(_Metadata, _Brokers);
_KafkaParams.put("group.id", "xxxx");
_KafkaParams.put("fetch.message.max.bytes", "5120000");
// Create direct kafka stream with brokers and topics
JavaDStream _MessagesLines = KafkaUtils
.createDirectStream(_JavaStreamingContext, String.class, String.class, StringDecoder.class,
StringDecoder.class, _KafkaParams, _TopicsSet)
.map(new Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});
return _MessagesLines;
}
上一篇: Oracle数据库锁机制Lock的介绍
下一篇: oracle的5个体系结构概念讲解