欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink实时处理Socket数据

程序员文章站 2022-07-14 19:41:15
...

博客首发链接 : https://mhuig.github.io/posts/fc610c2d.html

Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,提供支持流处理和批处理两种类型应用的功能 Flink实时处理Socket数据

Flink Socket 源码GitHub

通过 Maven Archetype 创建项目

创建项目

mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.9.0

通过以上Maven 命令进行项目创建的过程中,命令会交互式地提示用户对项目的 groupId、artifactId、version、package 等信息进行定义,且部分选项有默认值,直接回车即可。如图如果创建项目成功之后,客户端会有相应提示。

这里我们分别指定 groupId、artifactId 的信息分别如下,其余参数使用默认值

  • groupId:com.qst
  • artifactId:flink-socket

Flink实时处理Socket数据

检查项目

对于使用 Maven 创建的项目,我们可以看到的项目结构如下所示

Flink实时处理Socket数据

以上项目结构可以看出,该项目是一个 Scala 代码的项目,分别是 BatchJob.java 和StreamingJob.java 两个文件,分别赌赢 Flink 批量接口 DataSet 的实例代码和流式接口的实例代码。

将项目导入IDE

项目经过上述步骤创建后,Flink 官网推荐使用 Intellij IDEA 进行后续项目开发。

编译项目

项目经过上述步骤创建后,可以使用 Maven Command 命令 mvn clean package 对项目进行编译,编译完成后会在项目同级目录下生成 target/-.jar 文件,此jar文件就可以通过 Web 客户端提交到集群上运行。

开发环境配置

这里我们使用官网推荐的 IntelliJ IDEA 作为应用的开发的 IDE。

下载 IntelliJ IDEA

用户可以通过 IntelliJ IDEA 官方地址下载安装程序,根据操作系统选择相应的程序包进行安装。

安装 Scala Plugins

安装完 IntelliJ IDEA 默认是不支持 Scala 开发环境的,需要安装 Scala 插件进行支持。一下说明在 IDEA 中进行 Scala 插件的安装。  打开 IDEA IDE 后,在 IntelliJ IDEA 菜单栏中选择 Preferences选项,然后选择 Plugins 子选项,最后在页面中选择 Marketplace,在搜索框中输入 Scala 进行搜索  在检索出来的选项列表中选择和安装 Scala插件

 点击安装后重启IDE,Scala 编程环境即可生效

Flink实时处理Socket数据

导入 Flink 项目

 启动 IntelliJ IDEA,选择 Import Project,在文件选项框中选择创建好的项目,点击确定。  导入项目中选择 Import project from external mode 中的 Maven 后续选项使用默认值即可。

Flink Socket 应用程序

编写 Flink Socket 应用程序代码

import org.apache.flink.streaming.api.scala._

object StreamingJob {
  def main(args: Array[String]) {
    //设置环境变量
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //指定数据源,读取socket
    val socketStream = env.socketTextStream("localhost", 9000, '\n')
    //对数据集指定转换操作逻辑
    val count = socketStream
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)
    //将计算结果打印到控制台
    count.print()
    //指定任务名称并触发流式任务
    env.execute("Socket Stream")
  }
}

在 IDE 中测试代码

在代码文件中右键运行程序

此时会报如下错误

Flink实时处理Socket数据

这时我们需要在 IDEA 的 Run/Debug Configuration 中将 “Include dependencies with "Provided" scope ”选项勾选,这时我们就可以在本地 IDE 运行了

Flink实时处理Socket数据

在本地测试代码

首先在命令行我们现在终端开启监听端口9000,在命令行中执行如下命令

nc -l 9000

Flink实时处理Socket数据

然后在 IDE 中 右键运行 StreamingJob 类的 main 方法,运行结果如下

Flink实时处理Socket数据

在 Web 客户端中运行 Job

首先在项目所在目录执行 mvn clean package 进行打包,在项目的 target 目录下生成一个 flink-socket-1.0-SNAPSHOT.jar 文件 在命令行我们现在终端开启监听端口9000,在命令行中执行如下命令

nc -l 9000

Flink实时处理Socket数据

在浏览器中打开 Flink Web 监控页面,在左侧选择 Submit New Job 选项,点击 右上角的 Add New 选择我们编译好的 flink-socket-1.0-SNAPSHOT.jar 文件,点击 Submit 按钮提交Job

Flink实时处理Socket数据

选择 Task Managers 选择列表中的对应 Job 点击 Stdout选项查看执行结果

Flink实时处理Socket数据