欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

4 Flink1.10.1对wordcount进行批处理和流处理

程序员文章站 2022-07-05 16:37:04
...

1 Flink使用1.10.1版本对wordcount进行批处理

 1.1 pom.xml配置

包名:com.hik.myFlink.Flink74

<groupId>com.hik.myFlink</groupId>
  <artifactId>Flink74</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>Flink74</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>


  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.10.1</flink.version>
  </properties>

  <dependencies>
    <!-- Flink 依赖 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-wikiedits_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Flink Kafka连接器的依赖 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.25</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.25</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-nop</artifactId>
      <version>1.7.25</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.5</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
          <!--<appendAssemblyId>false</appendAssemblyId>-->
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass>com.hik.myFlink.FlinkStreamTest</mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>assembly</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

2 类名 FlinkBatchTest  用来进行批处理

package com.hik.myFlink;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;

/**
 * @Auther lyz
 * @Date 2020/7/15
 */

/**
 * Flink 可以处理批数据,也可以处理流式数据
 *  Flink & SparkStreaming & Storm
 *  1.Storm 吞吐量低,实时性强
 *  2.SparkStreaming 微批处理,吞吐量大,实时性低  5s
 *  3.Flink 兼容了Strom和SparkStreaming的优点,吞吐量大,实时性强。
 *
 *  Flink中分为有界数据和*数据:
 *   有界数据:批数据
 *   *数据:流数据
 *
 * =======================
 *
 *  如果是处理批数据,批数据底层是DataSet,必须创建:ExecutionEnvironment.getExecutionEnvironment();
 *  如果是处理流数据,流式处理底层是DataStream,必须创建:StreamExecutionEnvironment.getExecutionEnvironment();
 *  代码流程:
 *      1.创建ExecutionEnvironment();
 *      2.Source -> transformation -> Sink
 *      3.必须有env.execute()去触发执行transformation,批次处理中 【count()', 'collect()', or 'print()'.】
 *
 *  Flink 中没有全局排序,只能在每个分区排序。
 */
public class FlinkBatchTest {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


        DataSource<String> dataSource = env.readTextFile("./data/input");
        FlatMapOperator<String, String> flatMap = dataSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] split = line.split(" ");
                for (String word : split) {
                    collector.collect(word);
                }
            }
        });

        MapOperator<String, Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        UnsortedGrouping<Tuple2<String, Integer>> groupBy = map.groupBy(0);
        DataSet<Tuple2<String, Integer>> sum = groupBy.sum(1).setParallelism(1);

        SortPartitionOperator<Tuple2<String, Integer>> sum1= sum.sortPartition(1, Order.DESCENDING);

        DataSink<Tuple2<String, Integer>> tuple2DataSink = sum1.writeAsText("./data/result", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        env.execute("myflink");
    }
}

3 结果显示  input 输入如下:

4 Flink1.10.1对wordcount进行批处理和流处理

4 result 结果显示如下:

4 Flink1.10.1对wordcount进行批处理和流处理

2 Flink使用1.10.1版本对wordcount进行流处理

2.1 IDEA中 pom.xml如上述所示

2.2 服务器中部署flink集群,我采用的版本为flink-1.10.1-bin-scala_2.11.tgz

2.3类名 FlinkStreamTest

package com.hik.myFlink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


/**
 * @Auther lyz
 * @Date 2020/7/8
 */
public class FlinkStreamTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> ds = env.socketTextStream("Hlink161", 9999);
        SingleOutputStreamOperator<String> platMap = ds.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] split = line.split(" ");
                for (String word : split) {
                    collector.collect(word);
                }

            }
        });
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> map = platMap.map(new MapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public Tuple3<String, String, Integer> map(String word) throws Exception {
                return new Tuple3<>(word,word,1);
            }
        });
        KeyedStream<Tuple3<String, String, Integer>, Tuple> keyBy = map.keyBy(0);
        DataStream <Tuple3<String, String, Integer>> sum = keyBy.sum(2).setParallelism(1);
        //sum.writeAsText("./data/result1", FileSystem.WriteMode.OVERWRITE) ;
        sum.print();
        env.execute("my first flink strem");
    }
}

3 在服务器中打开一个端口 ,实时输入数据

nc -lk 9999

4 Flink1.10.1对wordcount进行批处理和流处理

 

4 控制台 结果显示如下:

4 Flink1.10.1对wordcount进行批处理和流处理

相关标签: Flink