欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Streaming实时流处理项目实战笔记——Push方式整合之本地环境联调

程序员文章站 2022-06-15 14:11:13
...

 Spark Streaming 整合Flume的两种方式

Spark Streaming实时流处理项目实战笔记——Push方式整合之本地环境联调

Push过程的官网描述

Spark Streaming实时流处理项目实战笔记——Push方式整合之本地环境联调

Flume配置文件代码

[aaa@qq.com conf]# more netcat-memory-avro.conf 
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop
a1.sources.r1.port = 44444

a1.sinks.k1.type = avro
//本地的ip地址
a1.sinks.k1.hostname = 192.168.206.1
a1.sinks.k1.port = 55555

a1.channels.c1.type = memory


a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Pom文件添加依赖

 <!-- sparkstreaming整合flume依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

配置IDEA的args默认参数

Spark Streaming实时流处理项目实战笔记——Push方式整合之本地环境联调

Spark Streaming实时流处理项目实战笔记——Push方式整合之本地环境联调

IDEA 代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingFlume extends App {

    if(args.length!=2){
      System.err.println("Usage: SparkStreamingFlume <hostname> <port>")
      System.exit(1)
    }

    val Array(hostname,port) = args

    val saprkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
    val ssc = new StreamingContext(saprkConf,Seconds(5))

    //TODD....如何使用SparkStreaming整合flume
    val flume = FlumeUtils.createStream(ssc,hostname,port.toInt)
    flume.map(x=>new String(x.event.getBody.array()).trim)
        .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
  
    ssc.start()
    ssc.awaitTermination()
}

启动先后顺序

Spark Streaming实时流处理项目实战笔记——Push方式整合之本地环境联调