Spark Streaming实时流处理项目实战笔记——Push方式整合之本地环境联调
程序员文章站
2022-06-15 14:11:13
...
Spark Streaming 整合Flume的两种方式
Push过程的官网描述
Flume配置文件代码
[aaa@qq.com conf]# more netcat-memory-avro.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop
a1.sources.r1.port = 44444
a1.sinks.k1.type = avro
//本地的ip地址
a1.sinks.k1.hostname = 192.168.206.1
a1.sinks.k1.port = 55555
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Pom文件添加依赖
<!-- sparkstreaming整合flume依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
配置IDEA的args默认参数
IDEA 代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamingFlume extends App {
if(args.length!=2){
System.err.println("Usage: SparkStreamingFlume <hostname> <port>")
System.exit(1)
}
val Array(hostname,port) = args
val saprkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
val ssc = new StreamingContext(saprkConf,Seconds(5))
//TODD....如何使用SparkStreaming整合flume
val flume = FlumeUtils.createStream(ssc,hostname,port.toInt)
flume.map(x=>new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
启动先后顺序
上一篇: PHP连接MySQL