欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink入门之WordCount(Scala语言)

程序员文章站 2022-06-17 10:20:47
...

Flink入门之WordCount(Scala语言)

流式处理

一.创建一个Maven工程

Flink入门之WordCount(Scala语言)

1.pom.xml文件依赖
<dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

    </dependencies>
2.加入scala依赖

右击工程FlinkDemo–>Open Module Settings -->点击“+”号添加
Flink入门之WordCount(Scala语言)
Flink入门之WordCount(Scala语言)

3.在scala包下创建包为com.xxxx.xxxx,在包下创建scala工程FlinkDemo选为Object

Flink入门之WordCount(Scala语言)

Flink入门之WordCount(Scala语言)
Flink入门之WordCount(Scala语言)

4.代码如下
package com.smxy.flinkdemo

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object FlinkDemo {

  def main(args: Array[String]): Unit = {

//    获取socket端口号
    val port: Int = try{
      ParameterTool.fromArgs(args).getInt("port")
    }catch {
      case e => {
        System.err.println("no port set. use default 9000")
      }
        9000
    }

//    获取运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

//    连接socket获取输入数据,IP改为自己的虚拟机IP
    val text = env.socketTextStream("192.168.1.15",port,'\n')

//    解析数据(把数据打平),分组,窗口计算,聚合
    val windowCounts = text.flatMap(line => line.split("\\s"))//把一行单词切分
      .map(w => WordwithCount(w,1))// 把单词转成(a,1)的形式
      .keyBy("word")  //分组
      .timeWindow(Time.seconds(2),Time.seconds(1))  //指定窗口大小,指定时间间隔
      .sum("count") //统计

    windowCounts.print().setParallelism(1)  //打印到控制台
//  执行任务
    env.execute("Socket window count")

  }

  case class WordwithCount(word: String,count: Long)

}

5.若运行程序报如下错误

Flink入门之WordCount(Scala语言)

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
#将上述导入依赖修改如下
import org.apache.flink.streaming.api.scala._
二.虚拟机开启端口测试
1.虚拟机安装nc
yum install nc -y
2.开启测试端口

Flink入门之WordCount(Scala语言)

3.运行程序

Flink入门之WordCount(Scala语言)

4.虚拟机输入数据

Flink入门之WordCount(Scala语言)

5.程序运行结果为

Flink入门之WordCount(Scala语言)