Spark Streaming实时处理TCP Sockets数据流
程序员文章站
2022-07-12 19:40:48
...
1.构建模拟器,模拟网络环境下的数据流;
2.编辑Spark Streaming应用程序,在node01提交以集群模式运行,获取node02上端口9999中的文本数据流,并每隔5s对数据流中各单词的个数进行统计。
演示文档
//*******************模拟器******************
package spark
import java.io.{PrintWriter}
import java.net.ServerSocket
import java.util.Random
import scala.io.Source
object SocketSimulation {
//随机抓取文档中的数据,在设置的端口输出
def index(length: Int)={
val rdm = new Random
rdm.nextInt(length)
}
def main(args:Array[String]): Unit ={
if(args.length!=3){
System.err.println("Usage:<filename> <port> <millisecond>")
System.exit(1)
}
val filename = args(0)
val lines = Source.fromFile(filename).getLines().toList
val filerow=lines.length
val listener =new ServerSocket(args(1).toInt)
while(true){
val socket = listener.accept()
new Thread(){
override def run={
println("Got client connected from:"+socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(),true)
while(true){
Thread.sleep(args(2).toLong)
val content= lines(index(filerow))
println(content)
out.write(content+'\n')
out.flush()
}
socket.close()
}
}.start()
}
}
}
打出模拟器的JAR包
打包时注意需要在CLASSPATH中添加以下几个JAR包:
/home/hduser/scala-2.10.4/lib/scala-swing.jar
/home/hduser/scala-2.10.4/lib/scala-library.jar
/home/hduser/scala-2.10.4/lib/scala-actors.jar
//**************分析器***************
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
def main (args:Array[String]) ={
val conf=new SparkConf().setAppName("NetworkWordCount").setMaster("spark://192.168.71.129:7077") //集群模式设置
val sc=new SparkContext(conf)
val ssc=new StreamingContext(sc, Seconds(5)) //每隔5s统计数据流中各单词个数
val lines=ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER) //这里两个参数为监听的节点和端口
val words= lines.flatMap(_.split(","))
val wordcounts = words.map(x=>(x,1)).reduceByKey(_+_)
wordcounts.print()
ssc.start()
ssc.awaitTermination()
}
}
将分析器也打成JAR包,备用。
将打包好的模拟器和数据源通过scp命令复制到node02节点上,准备工作结束。
1.在node02节点上执行模拟器的命令
java -cp SocketSimulation.jar spark.SocketSimulation ~/Streamingtext/file.txt 9999 1000
9999为端口号,1000为毫秒,即每隔1s在node02的9999端口输出模拟器在file.txt中随机抓取的数据。
2.在node01中执行执行分析器的命令
在spark文件夹下执行:
bin/spark-submit ~/NetworkWordCount.jar node02 9999
后面给出的两个参数,正是模拟器所在的节点和端口,程序每5s处理一次数据。
上一篇: 单向数据流和双向数据流及双向数据绑定
下一篇: C++:命名空间namespace