structured streaming 入门级初使用(一)
程序员文章站
2022-07-14 21:56:00
...
1、准备
导入依赖
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
2、从kafka读取数据
注意:
1、structured streaming 读取kafka数据是不需要设置group id的。
2、df.selectExpr()中还可以选择kafka的key等
代码如下
val conf = new SparkConf().setMaster("local[*]")
val spark = SparkSession
.builder()
.config(conf)
.appName(getClass.getName)
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", PropertiesUtils.loadProperties("kafka.broker.list"))
.option("subscribe", "cache_tmp")//
.option("startingOffsets", "earliest")
.load()
val re = df.selectExpr("CAST(value AS STRING) ")//
.as[(String)]
3、写入kafka
注意:
1、写入kafka需要做checkpoint(),checkpoint 中保存着偏移量
2、目的 kafka 即使不存在 也会自己创建。
代码如下
val query = re
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers",PropertiesUtils.loadProperties("kafka.broker.list"))
.option("topic","tmp_t")
.option("checkpointLocation","E:/t_check_2")//生产环境中要放在hdfs哟。
.start()
query.awaitTermination()
这里的checkpoint的目录保存这消费的偏移量等等信息。
上一篇: 数据压缩第十一次作业
下一篇: 《数据压缩》实验报告五·JPEG编解码