欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

程序员文章站 2022-03-14 19:09:20
...

【使用简介】

一、Scala简单统计WordCount

  1. 在56.196上启动端口监听服务nc -lp 9999
  2. 编写客户端处理

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

     3. 运行程序,在56.196监听控制台数数据、客户端控制台输出统计结果

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

二、Window使用

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

 

三、选取输入中最大/小的值

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

 

【代码说明】

创建一个flink-test-bd工程,添加如下pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>jhy</groupId>
    <artifactId>flink-test-bd</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.21.0</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
        <defaultGoal>compile</defaultGoal>
    </build>
</project>

新增Test.scala对象。

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

需要加入下面这个import才能在idea中跑本地flinkimport org.apache.flink.streaming.api.scala._

在Scala shell中执行上述命令:

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

同时在Flink服务器开启一个新的终端输入nc -lp 9999,然后输入内容如:

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

在Scala shell中会统计输入结果如:

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】

master节点:

./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999

 

slave3:nc -lk/lp 9999

 

yarn形式:

查看帮助:具体参数

./bin/yarn-session.sh  -h

./bin/yarn-session.sh  -n 2 -s 2 -jm 1024 -tm 1024

 

yarn调试结果:

先跑session:

./bin/yarn-session.sh  -n 2 -s 2 -jm 1024 -tm 1024

,然后

./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999

 

【源代码】

Test.scala对象

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object Test {

  def main(args: Array[String]): Unit = {

    /*
    如果本地跑就是local的模式
    如果提交到yarn上,就是yarn模式
    如果提交到standalone形式,就是standalone形式
    不需要指定,它会获取当前的环境
     */
    val senv = StreamExecutionEnvironment.getExecutionEnvironment

    val data = senv.socketTextStream("192.168.56.196",9999)

    val wordCounts = data
//      .flatMap{w=>w.split("\\s")}
      .map(w=>WordCount(w.split(",")(0),w.split(",")(1).toLong))
      .keyBy("word")
      .max("count")
//      .countWindowAll(5)
//      .countWindow(5,2)
//      .timeWindow(Time.seconds(3), Time.seconds(1))
//      .sum("count")

    wordCounts.print().setParallelism(1)
    senv.execute("Stream")

    /*val benv = ExecutionEnvironment.getExecutionEnvironment
    val data = benv.readTextFile("")*/
  }

  case class WordCount(word:String,count:Long)

}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>jhy</groupId>
    <artifactId>flink-test-bd</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!--<plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>-->
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.21.0</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
        <defaultGoal>compile</defaultGoal>
    </build>

</project>

【使用说明】

 

 

 

相关标签: Flink