欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Streaming使用pull模式接收Flume传送的数据并进行WordCount统计

程序员文章站 2024-02-21 21:54:58
...

      根据前文 spark streaming使用push模式读取flume数据 官网中认为push模式是不可靠的,而认为pull模式可靠,故推荐大家使用pull模式,本文将测试spark streaming使用pull模式读取flume过来的数据,并统计WordCount。
测试环境:

  • Spark 2.4.4
  • Flume 1.6.0
  • Scala 2.11.8
  • JDk 1.8.0_121
  • maven 3.5.0
    flume安装在redhat服务器,IP是 172.16.13.143
    测试在windows10上,使用idea 2017.2

使用pull模式的好处是:只有当spark streaming使用数据或者备份后,flume 才把这份数据删除,所以这样相比push模式就更能可靠的保存数据了。

一、测试前解决依赖问题

1.idea依赖

在maven工程中的pom.xml文件加入如下依赖

  <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-flume-sink_2.11</artifactId>
       <version>2.4.4</version>
</dependency>
<dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.8.1</version>
</dependency>

2.flume端依赖

由于flume1.6.0的版本本身已经很旧,里面的很多jar包也跟不上spark 2.4.4的版本,所以需要替换某些jar,不然运行 flume agent 过程中会出现错误。

  • spark-streaming-flume-sink_2.11-2.4.4.jar 放进flume安装目录的lib文件夹中
  • flume安装目录lib文件中 avro-1.7.4.jaravro-ipc-1.7.4.jar 版本太旧,换成 avro-1.8.2.jaravro-ipc-1.8.2.jar
  • flume安装目录lib文件中 scala-library-2.10.1.jar 版本太旧,换成 scala-library-2.11.8.jar

二、flume端配置文件

stream-agent.sources = netcat-source
stream-agent.channels = memory-channel
stream-agent.sinks = avro-sink

## sources
stream-agent.sources.netcat-source.type=netcat
stream-agent.sources.netcat-source.bind=172.16.13.143
stream-agent.sources.netcat-source.port=8888

## channels
stream-agent.channels.memory-channel.type=memory
stream-agent.channels.memory-channel.capacity=1000
stream-agent.channels.memory-channel.transactionCapacity=100

## sinks,注意这里sink的IP和push模式不一样,要写自身服务器的IP
stream-agent.sinks.avro-sink.type=org.apache.spark.streaming.flume.sink.SparkSink
stream-agent.sinks.avro-sink.hostname=172.16.13.143
stream-agent.sinks.avro-sink.port=8989

## bind sources、channels、sinks
stream-agent.sources.netcat-source.channels=memory-channel
stream-agent.sinks.avro-sink.channel=memory-channel

三、spark streaming代码

object FlumePullWordCount {
  def main(args: Array[String]): Unit = {
    if(args.length != 2) {
      System.err.println("Usage: FlumePullWordCount <hostname> <port>")
      System.exit(1)
    }
    val Array(hostname,port) = args
    val sparkconf = new SparkConf().setMaster("local[*]").setAppName("FlumePullWordCount")
    val ssc = new StreamingContext(sparkconf,Seconds(5))
    // TODO... 使用SparkStreaming 整合Flume
    val flumeStream = FlumeUtils.createPollingStream(ssc,hostname,port.toInt)
    flumeStream.map(x => new String(x.event.getBody.array()).trim)   // 获取flume上发送过来的信息
      .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)    // 对接收到的信息进行wordCount
      .print()      // 打印结果
    ssc.start()
    ssc.awaitTermination()
  }
}

和push模式相比,只是把 FlumeUtils.createPollingStream 改为了 FlumeUtils.createStream

四、启动顺序

启动顺序和push模式也不一样,pull模式是先启动 flume agent 再启动 spark streaming。

1.flume启动

./bin/flume-ng agent --name stream-agent --conf ./conf --conf-file ./conf/flume-pull-streaming.conf -Dflume.root.logger=INFO,console

启动后窗口放着,后续查看日志

2. netcat启动

telnet 172.16.13.143 8888

3.spark streaming启动

注意,还是要添加两个参数,这时IP写的是 172.16.13.143
Spark Streaming使用pull模式接收Flume传送的数据并进行WordCount统计

五、测试

三步启动后看控制台日志,没有报错后开始测试。
先在telnet客户端发送数据
Spark Streaming使用pull模式接收Flume传送的数据并进行WordCount统计
数据以空格分隔,enter键发送
可以看看flume控制台有没有报错,没有报错后看idea中输出
Spark Streaming使用pull模式接收Flume传送的数据并进行WordCount统计
可以看到输出没问题,测试成功!

可靠性测试

根据pull模式的概述,可以知道只有streaming使用或者备份数据后,flume上的数据才会删除。所以可以做如下测试来验证:

  1. 先启动flume agent和netcat
  2. 不启动spark streaming程序,直接在netcat上发送数据
  3. 启动spark streaming程序,观察能否读取到streaming启动前的数据

还可以把idea 的程序打jar包,放到spark集群上运行,具体步骤见上一篇push模式的,这里不再写步骤了。

相关标签: BigData