欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Streaming实时处理TCP Sockets数据流

程序员文章站 2022-07-12 19:40:48
...

1.构建模拟器,模拟网络环境下的数据流;
2.编辑Spark Streaming应用程序,在node01提交以集群模式运行,获取node02上端口9999中的文本数据流,并每隔5s对数据流中各单词的个数进行统计。

演示文档

//*******************模拟器******************
package spark

import java.io.{PrintWriter}
import java.net.ServerSocket
import java.util.Random
import scala.io.Source

object SocketSimulation {
  //随机抓取文档中的数据,在设置的端口输出
  def index(length: Int)={
    val rdm = new Random
    rdm.nextInt(length)
  }

  def main(args:Array[String]): Unit ={
    if(args.length!=3){
      System.err.println("Usage:<filename> <port> <millisecond>")
      System.exit(1)
    }

    val filename = args(0)
    val lines = Source.fromFile(filename).getLines().toList
    val filerow=lines.length

    val listener =new ServerSocket(args(1).toInt)
    while(true){
      val socket = listener.accept()
      new Thread(){
        override def run={
          println("Got client connected from:"+socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(),true)
          while(true){
            Thread.sleep(args(2).toLong)
            val content= lines(index(filerow))
            println(content)
            out.write(content+'\n')
            out.flush()
          }
          socket.close()
        }
      }.start()

    }

  }

}

打出模拟器的JAR包
Spark Streaming实时处理TCP Sockets数据流

打包时注意需要在CLASSPATH中添加以下几个JAR包:

/home/hduser/scala-2.10.4/lib/scala-swing.jar
/home/hduser/scala-2.10.4/lib/scala-library.jar
/home/hduser/scala-2.10.4/lib/scala-actors.jar

Spark Streaming实时处理TCP Sockets数据流

//**************分析器***************

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

object NetworkWordCount {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

  def main (args:Array[String]) ={
    val conf=new SparkConf().setAppName("NetworkWordCount").setMaster("spark://192.168.71.129:7077") //集群模式设置
    val sc=new SparkContext(conf)
    val ssc=new StreamingContext(sc, Seconds(5)) //每隔5s统计数据流中各单词个数

    val lines=ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER) //这里两个参数为监听的节点和端口
    val words= lines.flatMap(_.split(","))
    val wordcounts = words.map(x=>(x,1)).reduceByKey(_+_)

    wordcounts.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

将分析器也打成JAR包,备用。
将打包好的模拟器和数据源通过scp命令复制到node02节点上,准备工作结束。

1.在node02节点上执行模拟器的命令

java -cp SocketSimulation.jar spark.SocketSimulation ~/Streamingtext/file.txt 9999 1000

9999为端口号,1000为毫秒,即每隔1s在node02的9999端口输出模拟器在file.txt中随机抓取的数据。
Spark Streaming实时处理TCP Sockets数据流

2.在node01中执行执行分析器的命令
在spark文件夹下执行:

bin/spark-submit ~/NetworkWordCount.jar node02 9999

后面给出的两个参数,正是模拟器所在的节点和端口,程序每5s处理一次数据。

Spark Streaming实时处理TCP Sockets数据流

相关标签: spark