欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

structured streaming 入门级初使用(一)

程序员文章站 2022-07-14 21:56:00
...

1、准备

导入依赖

		<dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
	     <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3.2</version>
        </dependency>
        
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>
        
     <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.0</version>
        </dependency>

2、从kafka读取数据

注意:

1、structured streaming 读取kafka数据是不需要设置group id的。
2、df.selectExpr()中还可以选择kafka的key等

代码如下

    val conf = new SparkConf().setMaster("local[*]")
    val spark =  SparkSession
      .builder()
      .config(conf)
      .appName(getClass.getName)
      .getOrCreate()
 import spark.implicits._

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", PropertiesUtils.loadProperties("kafka.broker.list"))
      .option("subscribe", "cache_tmp")//
      .option("startingOffsets", "earliest")
      .load()
    val re = df.selectExpr("CAST(value AS STRING) ")//
      .as[(String)]

3、写入kafka

注意:

1、写入kafka需要做checkpoint(),checkpoint 中保存着偏移量
2、目的 kafka 即使不存在 也会自己创建。

代码如下

   val query = re

      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers",PropertiesUtils.loadProperties("kafka.broker.list"))
      .option("topic","tmp_t")
      .option("checkpointLocation","E:/t_check_2")//生产环境中要放在hdfs哟。
      .start()
    query.awaitTermination()

这里的checkpoint的目录保存这消费的偏移量等等信息。
structured streaming 入门级初使用(一)