Spark 学习(6)SparkStreaming
程序员文章站
2022-07-06 14:14:58
...
SparkStreaming
这是PySpark 学习最后一篇 有兴趣的可以关注我的Github :https://github.com/daijiangtian
最下方的Example是一个使用SparkStream 接受socket流 并对其实时分析的例子
-
逻辑
- 当ssc启动之后,driver会执行一个长时间运行的Task
- 作为Reveiver的executors,接受传来的数据收到数据,并将其分成块保存在内存中
- 这写块也会被赋值给另一个Executors,以免数据丢失
- 每个批次间隔(通常这是每1秒),驱动程序将启动Spark任务处理块。然后这些块被持久化到任意数量的目标中
- SparkSteam 与 Tcp 对接 每一秒会请求一次链接然后返回数据 而不是建立一个长连接
- 数据存储,包括云存储(例如S3、WASB等),关系数据存储(例如,MySQL、PostgreSQL等等)NoSQL存储。
-
DStreams
- 一组随着时间到底的数据序列,代表着每一个时间段内到达的RDDs序列
-
Windowed transformations
- 每一次数据计算的窗口
-
checkpointing
- 目的是将数据存储在可靠的文件系统中,如HDFS
ssc.checkpoint("hdfs://...")
- 目的是将数据存储在可靠的文件系统中,如HDFS
-
Load
-
SparkContext
# Create a local SparkContext and Streaming Contexts from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create sc with two working threads sc = SparkContext("local[2]", "NetworkWordCount") # 一定要写local[2] 而不是local # Create local StreamingContextwith batch interval of 1 second ssc = StreamingContext(sc, 1) # Create checkpoint for local StreamingContext # 把数据直接存在checkpoint 文件夹下 # ssc.checkpoint("checkpoint") # Create DStream that connects to localhost:9999 # 使用 tcp 去链接远方服务端 并且接受数据 lines = ssc.socketTextStream("localhost", 9999) # Split lines into words words = lines.flatMap(lambda line: line.split(" ")) # Count each word in each batch pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) # Print the first ten elements of each RDD in this DStream wordCounts.pprint() # Populate `meetup_stream` table #sqlContext.sql("insert into meetup_stream select * from meetup_stream_ #json") # Start the computation ssc.start() # Wait for the computation to terminate ssc.awaitTermination() #控制台使用 nc -l -p 9999 当 nc -lk 9999 不能使用时
-
Sparksession
""" 与之前的脚本不同,现在使用的是更熟悉的脚本 先创建一个session 这里不需要再去创建一个SparkStreaming """ # Import the necessary classes and create a local SparkSession from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate() """ SparkStream 通过在第4行调用readStream来发起的 """ # Create DataFrame representing the stream of input lines # from connection to localhost:9999 lines = spark\ .readStream\ .format('socket')\ .option('host', 'localhost')\ .option('port', 9999)\ .load() """ 在这里就不需要使用RDD的复杂操作, 直接使用SQL便可以 """ # Split the lines into words words = lines.select( explode( split(lines.value, ' ') ).alias('word') ) # Generate running word count wordCounts = words.groupBy('word').count() """ 没有使用pprint(),而是显式地调用writeStream来编写 流,并定义格式和输出模式。虽然写的时间要长一些, 这些方法和属性在语法上与其他DataFrame调用相似 你只需要改变outputMode和格式属性来保存它 对于数据库、文件系统、控制台等等。 """ # Start running the query that prints the # running counts to the console query = wordCounts\ .writeStream\ .outputMode('complete')\ .format('console')\ .start() """ 最后,运行等待取消这个流媒体工作。 """ # Await Spark Streaming termination query.awaitTermination()
-
-
Save to text
ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")
-
Example
# Create a local SparkContext and Streaming Contexts from pyspark import SparkContext from pyspark import SparkConf from pyspark.streaming import StreamingContext import datetime # from pyspark.sql import SQLContext # from pyspark.sql.types import * import numpy as np # Create sc with four working threads sconf = SparkConf() sconf.setMaster("local[4]") sc = SparkContext(appName="act_analysis",conf = sconf) # Create local StreamingContextwith batch interval of 1 second ssc = StreamingContext(sc, 3) # Create DStream that connects to localhost:9999 lines = ssc.socketTextStream("192.168.14.2",1234) def get_rows(x): res = x.split(" ") return (res[0],res[1]) rows = lines.map(get_rows) roles = rows.groupByKey() def get_speed_std(action): temp_list = [] temp = 0 for i in action: if temp == 0: temp_list.append(0) temp = i continue temp_list.append((i - temp)) temp = i res = np.std(np.array(temp_list), ddof=1) return res def get_role_action_feature(action): try: action = sorted(action) except Exception as e: print(e) return None count = len(action) stay_time = int(action[-1] - action[0]) try: ave_speed = count / stay_time std_speed = get_speed_std(action) except: ave_speed = 0 std_speed = 0 return ave_speed, std_speed def feature(x): role = x[0] action = [float(i)for i in list(x[1])] res = get_role_action_feature(action) return (role,res[0],res[1],datetime.datetime.now()) res = roles.map(feature) # res.pprint() from pymongo import MongoClient MONGODB = { 'MONGO_HOST': '', 'MONGO_PORT': '27017', 'MONGO_USERNAME': '', 'MONGO_PASSWORD': '' } mongo_uri = 'mongodb://{account}{host}:{port}/'.format( account='{username}:{password}@'.format( username=MONGODB['MONGO_USERNAME'], password=MONGODB['MONGO_PASSWORD']) if MONGODB['MONGO_USERNAME'] else '', host=MONGODB['MONGO_HOST'], port=MONGODB['MONGO_PORT']) conn = MongoClient(mongo_uri) db = conn['104'] def save(x): col = db['act_live_res'] res = x.collect() for i in res: item = { 'role_id' : i[0], 'ave':i[1], 'std':i[2], 'date':i[3] } col.insert_one(item) # conn.close() res.foreachRDD(save) ssc.start() # Wait for the computation to terminate ssc.awaitTermination()
上一篇: Kafka 搭建测试环境