Flink基础入门实例【Scala简单统计WordCount、Window使用、选取输入中最大/小的值】
程序员文章站
2022-03-14 19:09:20
...
【使用简介】
一、Scala简单统计WordCount
- 在56.196上启动端口监听服务nc -lp 9999。
- 编写客户端处理
3. 运行程序,在56.196监听控制台数数据、客户端控制台输出统计结果
二、Window使用
三、选取输入中最大/小的值
【代码说明】
创建一个flink-test-bd工程,添加如下pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>jhy</groupId>
<artifactId>flink-test-bd</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.21.0</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
<defaultGoal>compile</defaultGoal>
</build>
</project>
新增Test.scala对象。
需要加入下面这个import才能在idea中跑本地flink:import org.apache.flink.streaming.api.scala._
在Scala shell中执行上述命令:
同时在Flink服务器开启一个新的终端输入nc -lp 9999,然后输入内容如:
在Scala shell中会统计输入结果如:
master节点:
./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
slave3:nc -lk/lp 9999
yarn形式:
查看帮助:具体参数
./bin/yarn-session.sh -h
./bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024
yarn调试结果:
先跑session:
./bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024
,然后
./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
【源代码】
Test.scala对象
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object Test {
def main(args: Array[String]): Unit = {
/*
如果本地跑就是local的模式
如果提交到yarn上,就是yarn模式
如果提交到standalone形式,就是standalone形式
不需要指定,它会获取当前的环境
*/
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val data = senv.socketTextStream("192.168.56.196",9999)
val wordCounts = data
// .flatMap{w=>w.split("\\s")}
.map(w=>WordCount(w.split(",")(0),w.split(",")(1).toLong))
.keyBy("word")
.max("count")
// .countWindowAll(5)
// .countWindow(5,2)
// .timeWindow(Time.seconds(3), Time.seconds(1))
// .sum("count")
wordCounts.print().setParallelism(1)
senv.execute("Stream")
/*val benv = ExecutionEnvironment.getExecutionEnvironment
val data = benv.readTextFile("")*/
}
case class WordCount(word:String,count:Long)
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>jhy</groupId>
<artifactId>flink-test-bd</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<!--<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>-->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.21.0</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
<defaultGoal>compile</defaultGoal>
</build>
</project>
【使用说明】
上一篇: php优惠券的实现方式是什么
下一篇: php中echo不出来怎么办