欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

在 Spark SQL 和 Spark Structured Streaming 中使用 Pulsar

程序员文章站 2022-07-14 21:54:35
...

**????️大纲提要:**你可以使用 Pulsar Spark Connector 读取 Pulsar 的数据,并将结果写回 Pulsar。本文介绍 Pulsar Spark Connector 的使用方法。

????Pulsar Spark Connector 在 2019 年 7 月 9 日开源,源代码与用户指南参见这里

配置环境

以下示例使用 Homebrew 包管理器在 macOS 下载和安装软件,你可以根据自身需求和操作系统选择其他包管理器。

  1. 安装 Homebrew。
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
  1. 安装 Java 8 或更高版本。
    本示例使用 Homebrew 安装 JDK8。
brew tap adoptopenjdk/openjdk
brew cask install adoptopenjdk8
  1. 安装 Apache Spark 2.4.0 或更高版本。
    从官网下载 Spark 2.4.3 并解压。
tar xvfz spark-2.4.3-bin-hadoop2.7.tgz
  1. 下载 Apache Pulsar 2.4.0。
    从官网下载 Pulsar 2.4.0。
wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz
tar xvfz apache-pulsar-2.4.0-bin.tar.gz
  1. 安装 Apache Maven。
brew install maven
  1. 设置开发环境。
    本示例创建一个名为 connector-test 的 Maven 工程。

(1)使用 Scala Maven Plugin 提供的 archetype 构建一个 Scala 项目的框架。

mvn archetype:generate

在出现的列表中选择 net.alchim31.maven:scala-archetype-simple 的最新版本,当前为 1.7,并为新工程指定 groupId、artifactId 和 version。

本示例使用的是:

groupId: com.example
artifactId: connector-test
version: 1.0-SNAPSHOT

经过以上步骤,一个 Maven 的 Scala 项目框架就基本搭建好了。

(2)在项目根目录下的 _pom.xml_ 中引入 Spark、Pulsar Spark Connector 依赖, 并使用 _maven_shade_plugin_ 进行项目打包。

a. 定义依赖包的版本信息。
  <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.12</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <spark.version>2.4.3</spark.version>
        <pulsar-spark-connector.version>2.4.0</pulsar-spark-connector.version>
        <spec2.version>4.2.0</spec2.version>
        <maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
  </properties>
b. 引入 Spark、Pulsar Spark Connector 依赖。
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-catalyst_${scala.compat.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>io.streamnative.connectors</groupId>
        <artifactId>pulsar-spark-connector_${scala.compat.version}</artifactId>
        <version>${pulsar-spark-connector.version}</version>
    </dependency>
    ```
    c. 添加包含 _pulsar-spark-connector_ 的 Maven 仓库。
    ```xml
    <repositories>
      <repository>
        <id>central</id>
        <layout>default</layout>
        <url>https://repo1.maven.org/maven2</url>
      </repository>
      <repository>
        <id>bintray-streamnative-maven</id>
        <name>bintray</name>
        <url>https://dl.bintray.com/streamnative/maven</url>
      </repository>
    </repositories>
    ```
      d. 使用 _maven_shade_plugin_ 将示例类与 _pulsar-spark-connector_ 一同打包。
```xml
    <plugin>
          <!-- Shade all the dependencies to avoid conflicts -->
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>${maven-shade-plugin.version}</version>
          <executions>
            <execution>
              <phase>package</phase>
              <goals>
                <goal>shade</goal>
              </goals>
              <configuration>
                <createDependencyReducedPom>true</createDependencyReducedPom>
                <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
                <minimizeJar>false</minimizeJar>

                <artifactSet>
                  <includes>
                    <include>io.streamnative.connectors:*</include>
                  </includes>
                </artifactSet>
                <filters>
                  <filter>
                    <artifact>*:*</artifact>
                    <excludes>
                      <exclude>META-INF/*.SF</exclude>
                      <exclude>META-INF/*.DSA</exclude>
                      <exclude>META-INF/*.RSA</exclude>
                    </excludes>
                  </filter>
                </filters>
                <transformers>
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
                </transformers>
              </configuration>
            </execution>
          </executions>
        </plugin>

Spark 读写 Pulsar

示例中的工程包括以下程序:

  1. 从 Pulsar 中读取数据(将该 App 命名为 StreamRead)。
  2. 将数据写入 Pulsar(将该 App 命名为 BatchWrite)。

构建流处理作业,从 Pulsar 读取数据

  1. StreamRead 中,创建 SparkSession
val spark = SparkSession
  .builder()
  .appName("data-read")
  .config("spark.cores.max", 2)
  .getOrCreate()
  1. 为了连接至 Pulsar, 需要在构建 DataFrame 时指定 service.urladmin.url ,并指定待读取的 topic
val ds = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8088")
  .option("topic", "topic-test")
  .load()
ds.printSchema()  // 打印 topic-test 的 schema 信息,验证读取成功
  1. ds 输出至控制台,启动作业执行。
val query = ds.writeStream
    .outputMode("append")
    .format("console")
    .start()
query.awaitTermination()

将数据写入 Pulsar

  1. 同理,在 BatchWrite 中,首先创建 SparkSession
val spark = SparkSession
    .builder()
    .appName("data-sink")
    .config("spark.cores.max", 2)
    .getOrCreate()
  1. 创建 1-10 的列表,并将其转化为 Spark Dataset,写入 Pulsar。
import spark.implicits._
spark.createDataset(1 to 10)
    .write
    .format("pulsar")
    .option("service.url", "pulsar://localhost:6650")
    .option("admin.url", "http://localhost:8088")
    .option("topic", "topic-test")
    .save()

运行程序

首先配置、启动 Spark 和 Pulsar 的单节点集群,再将示例项目打包,并通过 spark-submit 分别提交两个作业,最后观察程序的执行结果。

  1. 修改 Spark 的日志级别 (可选)。
cd ${spark.dir}/conf
cp log4j.properties.template log4j.properties

在文本编辑器中,将日志级别改为 WARN

log4j.rootCategory=WARN, console
  1. 启动 Spark 集群。
cd ${spark.dir}
sbin/start-all.sh
  1. 修改 Pulsar WebService 端口为 8088(编辑 ${pulsar.dir}/conf/standalone.conf),避免和 Spark 端口冲突。

webServicePort=8088

4. 启动 Pulsar 集群。
```bash
bin/pulsar standalone
  1. 打包示例项目。
cd ${connector_test.dir}
mvn package
  1. 启动 StreamRead 监控 topic-test 中的数据变化。
${spark.dir}/bin/spark-submit --class com.example.StreamRead --master spark://localhost:7077 ${connector_test.dir}/target/connector-test-1.0-SNAPSHOT.jar
  1. 在另一个终端窗口中,启动 BatchWritetopic-test 一次性写入 1-10 的数字。
${spark.dir}/bin/spark-submit --class com.example.BatchWrite --master spark://localhost:7077 ${connector_test.dir}/target/connector-test-1.0-SNAPSHOT.jar
  1. 这时,可以在 StreamRead 所在的终端中得到类似的输出。
root
 |-- value: integer (nullable = false)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)

Batch: 0
+-----+-----+-------+-----------+-------------+-----------+
|value|__key|__topic|__messageId|__publishTime|__eventTime|
+-----+-----+-------+-----------+-------------+-----------+
+-----+-----+-------+-----------+-------------+-----------+

Batch: 1
+-----+-----+--------------------+--------------------+--------------------+-----------+
|value|__key|             __topic|         __messageId|       __publishTime|__eventTime|
+-----+-----+--------------------+--------------------+--------------------+-----------+
|    6| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|    7| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|    8| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|    9| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|   10| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...|       null|
|    1| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
|    2| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
|    3| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
|    4| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
|    5| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...|       null|
+-----+-----+--------------------+--------------------+--------------------+-----------+

至此,我们搭建了 Pulsar 和 Spark 集群,构建了示例项目的框架,使用 Pulsar Spark Connector 完成了从 Spark 读取 Pulsar 数据和向 Pulsar 写入 Spark 数据的操作,提交了最终程序测试。

???? 程序的完整示例,可参阅这里