Spark Streaming使用pull模式接收Flume传送的数据并进行WordCount统计
根据前文 spark streaming使用push模式读取flume数据 官网中认为push模式是不可靠的,而认为pull模式可靠,故推荐大家使用pull模式,本文将测试spark streaming使用pull模式读取flume过来的数据,并统计WordCount。
测试环境:
- Spark 2.4.4
- Flume 1.6.0
- Scala 2.11.8
- JDk 1.8.0_121
- maven 3.5.0
flume安装在redhat服务器,IP是 172.16.13.143
测试在windows10上,使用idea 2017.2
使用pull模式的好处是:只有当spark streaming使用数据或者备份后,flume 才把这份数据删除,所以这样相比push模式就更能可靠的保存数据了。
一、测试前解决依赖问题
1.idea依赖
在maven工程中的pom.xml文件加入如下依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
2.flume端依赖
由于flume1.6.0的版本本身已经很旧,里面的很多jar包也跟不上spark 2.4.4的版本,所以需要替换某些jar,不然运行 flume agent 过程中会出现错误。
- 把 spark-streaming-flume-sink_2.11-2.4.4.jar 放进flume安装目录的lib文件夹中
- flume安装目录lib文件中 avro-1.7.4.jar 和 avro-ipc-1.7.4.jar 版本太旧,换成 avro-1.8.2.jar 和 avro-ipc-1.8.2.jar
- flume安装目录lib文件中 scala-library-2.10.1.jar 版本太旧,换成 scala-library-2.11.8.jar
二、flume端配置文件
stream-agent.sources = netcat-source
stream-agent.channels = memory-channel
stream-agent.sinks = avro-sink
## sources
stream-agent.sources.netcat-source.type=netcat
stream-agent.sources.netcat-source.bind=172.16.13.143
stream-agent.sources.netcat-source.port=8888
## channels
stream-agent.channels.memory-channel.type=memory
stream-agent.channels.memory-channel.capacity=1000
stream-agent.channels.memory-channel.transactionCapacity=100
## sinks,注意这里sink的IP和push模式不一样,要写自身服务器的IP
stream-agent.sinks.avro-sink.type=org.apache.spark.streaming.flume.sink.SparkSink
stream-agent.sinks.avro-sink.hostname=172.16.13.143
stream-agent.sinks.avro-sink.port=8989
## bind sources、channels、sinks
stream-agent.sources.netcat-source.channels=memory-channel
stream-agent.sinks.avro-sink.channel=memory-channel
三、spark streaming代码
object FlumePullWordCount {
def main(args: Array[String]): Unit = {
if(args.length != 2) {
System.err.println("Usage: FlumePullWordCount <hostname> <port>")
System.exit(1)
}
val Array(hostname,port) = args
val sparkconf = new SparkConf().setMaster("local[*]").setAppName("FlumePullWordCount")
val ssc = new StreamingContext(sparkconf,Seconds(5))
// TODO... 使用SparkStreaming 整合Flume
val flumeStream = FlumeUtils.createPollingStream(ssc,hostname,port.toInt)
flumeStream.map(x => new String(x.event.getBody.array()).trim) // 获取flume上发送过来的信息
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) // 对接收到的信息进行wordCount
.print() // 打印结果
ssc.start()
ssc.awaitTermination()
}
}
和push模式相比,只是把 FlumeUtils.createPollingStream 改为了 FlumeUtils.createStream
四、启动顺序
启动顺序和push模式也不一样,pull模式是先启动 flume agent 再启动 spark streaming。
1.flume启动
./bin/flume-ng agent --name stream-agent --conf ./conf --conf-file ./conf/flume-pull-streaming.conf -Dflume.root.logger=INFO,console
启动后窗口放着,后续查看日志
2. netcat启动
telnet 172.16.13.143 8888
3.spark streaming启动
注意,还是要添加两个参数,这时IP写的是 172.16.13.143 ,
五、测试
三步启动后看控制台日志,没有报错后开始测试。
先在telnet客户端发送数据
数据以空格分隔,enter键发送
可以看看flume控制台有没有报错,没有报错后看idea中输出
可以看到输出没问题,测试成功!
可靠性测试
根据pull模式的概述,可以知道只有streaming使用或者备份数据后,flume上的数据才会删除。所以可以做如下测试来验证:
- 先启动flume agent和netcat
- 不启动spark streaming程序,直接在netcat上发送数据
- 启动spark streaming程序,观察能否读取到streaming启动前的数据
还可以把idea 的程序打jar包,放到spark集群上运行,具体步骤见上一篇push模式的,这里不再写步骤了。
上一篇: MySQL自增列插入0值的解决方案
下一篇: 实用的JS小技巧