欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark 学习(6)SparkStreaming

程序员文章站 2022-07-06 14:14:58
...

SparkStreaming

这是PySpark 学习最后一篇 有兴趣的可以关注我的Github :https://github.com/daijiangtian

  • 最下方的Example是一个使用SparkStream 接受socket流 并对其实时分析的例子

  • 逻辑

    • 当ssc启动之后,driver会执行一个长时间运行的Task
    • 作为Reveiver的executors,接受传来的数据收到数据,并将其分成块保存在内存中
    • 这写块也会被赋值给另一个Executors,以免数据丢失
    • 每个批次间隔(通常这是每1秒),驱动程序将启动Spark任务处理块。然后这些块被持久化到任意数量的目标中
    • SparkSteam 与 Tcp 对接 每一秒会请求一次链接然后返回数据 而不是建立一个长连接
    • 数据存储,包括云存储(例如S3、WASB等),关系数据存储(例如,MySQL、PostgreSQL等等)NoSQL存储。
  • DStreams

    • 一组随着时间到底的数据序列,代表着每一个时间段内到达的RDDs序列
  • Windowed transformations

    • 每一次数据计算的窗口
  • checkpointing

    • 目的是将数据存储在可靠的文件系统中,如HDFS
          ssc.checkpoint("hdfs://...")
      
  • Load

    • SparkContext

          # Create a local SparkContext and Streaming Contexts
          from pyspark import SparkContext
          from pyspark.streaming import StreamingContext
      
           # Create sc with two working threads
          sc = SparkContext("local[2]", "NetworkWordCount")     # 一定要写local[2] 而不是local
      
          # Create local StreamingContextwith batch interval of 1 second
          ssc = StreamingContext(sc, 1)
      
          # Create checkpoint for local StreamingContext
          # 把数据直接存在checkpoint 文件夹下
          # ssc.checkpoint("checkpoint") 
      
          # Create DStream that connects to localhost:9999
          # 使用 tcp 去链接远方服务端 并且接受数据
          lines = ssc.socketTextStream("localhost", 9999)  
          # Split lines into words
          words = lines.flatMap(lambda line: line.split(" "))
      
          # Count each word in each batch
          pairs = words.map(lambda word: (word, 1))
          wordCounts = pairs.reduceByKey(lambda x, y: x + y)
      
          # Print the first ten elements of each RDD in this DStream
          wordCounts.pprint()
      
          # Populate `meetup_stream` table
          #sqlContext.sql("insert into meetup_stream select * from meetup_stream_
          #json")
      
      
          # Start the computation
          ssc.start()
          # Wait for the computation to terminate
          ssc.awaitTermination() 
      
      
          #控制台使用 nc -l -p 9999  当 nc -lk 9999 不能使用时
    • Sparksession

          """
              与之前的脚本不同,现在使用的是更熟悉的脚本
              先创建一个session 这里不需要再去创建一个SparkStreaming
          """
          # Import the necessary classes and create a local SparkSession
          from pyspark.sql import SparkSession
          from pyspark.sql.functions import explode
          from pyspark.sql.functions import split
          spark = SparkSession \
          .builder \
          .appName("StructuredNetworkWordCount") \
          .getOrCreate()
      
          """
              SparkStream 通过在第4行调用readStream来发起的
          """
          # Create DataFrame representing the stream of input lines
          # from connection to localhost:9999
          lines = spark\
          .readStream\
          .format('socket')\
          .option('host', 'localhost')\
           .option('port', 9999)\
          .load()
      
          """
              在这里就不需要使用RDD的复杂操作,
              直接使用SQL便可以
          """
          # Split the lines into words
          words = lines.select(
          explode(
              split(lines.value, ' ')
          ).alias('word')
          )
      
          # Generate running word count
          wordCounts = words.groupBy('word').count()
      
          """
              没有使用pprint(),而是显式地调用writeStream来编写
              流,并定义格式和输出模式。虽然写的时间要长一些,
              这些方法和属性在语法上与其他DataFrame调用相似
              你只需要改变outputMode和格式属性来保存它
              对于数据库、文件系统、控制台等等。
          """
          # Start running the query that prints the
          # running counts to the console
          query = wordCounts\
          .writeStream\
          .outputMode('complete')\
          .format('console')\
          .start()
      
          """
              最后,运行等待取消这个流媒体工作。
          """
          # Await Spark Streaming termination
          query.awaitTermination()
      
      
  • Save to text

        ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")
  • Example

      # Create a local SparkContext and Streaming Contexts
      from pyspark import SparkContext
      from pyspark import SparkConf
    
    
      from pyspark.streaming import StreamingContext
      import datetime
      # from pyspark.sql import SQLContext
      # from pyspark.sql.types import *
      import numpy as np
    
      # Create sc with four working threads
    
      sconf = SparkConf()
      sconf.setMaster("local[4]")
    
      sc = SparkContext(appName="act_analysis",conf = sconf)
    
      # Create local StreamingContextwith batch interval of 1 second
      ssc = StreamingContext(sc, 3)
    
    
      # Create DStream that connects to localhost:9999
      lines = ssc.socketTextStream("192.168.14.2",1234) 
    
      def get_rows(x):
          res = x.split(" ")
          return (res[0],res[1])
    
      rows = lines.map(get_rows)
    
      roles = rows.groupByKey()
    
      def get_speed_std(action):
          temp_list = []
          temp = 0
          for i in action:
              if temp == 0:
                  temp_list.append(0)
                  temp = i
                  continue
              temp_list.append((i - temp))
              temp = i
          res = np.std(np.array(temp_list), ddof=1)
          return res
    
      def get_role_action_feature(action):
          try:
              action = sorted(action)
          except Exception as e:
              print(e)
              return None
          count = len(action)
          stay_time = int(action[-1] - action[0])
          try:
              ave_speed = count / stay_time
              std_speed = get_speed_std(action)
          except:
              ave_speed = 0
              std_speed = 0
          return ave_speed, std_speed
    
      def feature(x):
          role = x[0]
          action = [float(i)for i in list(x[1])]
          res = get_role_action_feature(action)
    
          return (role,res[0],res[1],datetime.datetime.now())
    
      res = roles.map(feature)
    
      # res.pprint()
    
      from pymongo import MongoClient
    
      MONGODB = {
          'MONGO_HOST': '',
          'MONGO_PORT': '27017',
          'MONGO_USERNAME': '',
          'MONGO_PASSWORD': ''
      }
    
    
      mongo_uri = 'mongodb://{account}{host}:{port}/'.format(
                  account='{username}:{password}@'.format(
                      username=MONGODB['MONGO_USERNAME'],
                      password=MONGODB['MONGO_PASSWORD']) if MONGODB['MONGO_USERNAME'] else '',
                  host=MONGODB['MONGO_HOST'],
                  port=MONGODB['MONGO_PORT'])
      conn = MongoClient(mongo_uri)
      db = conn['104']
    
      def save(x):
    
          col = db['act_live_res']
          res = x.collect()
          for i in res:
              item = {
                  'role_id' : i[0],
                  'ave':i[1],
                  'std':i[2],
                  'date':i[3]
              }
    
              col.insert_one(item)
    
      #     conn.close()    
    
      res.foreachRDD(save)
    
      ssc.start()
      # Wait for the computation to terminate
      ssc.awaitTermination()