欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Beam系列一 Beam介绍及简单使用.md

程序员文章站 2022-05-18 20:26:57
...

1.简介

简单地说,Apache Beam是一个实时处理、流处理的大数据框架,由Google DataFlow贡献给 Apache 基金会孵化而来。

2.应用场景

以下为应用场景的几个例子:
1.Beam 可以用于 ETL Job 任务
Beam 的数据可以通过 SDKs 的 IO 接入,通过管道可以用后面的 Runners 做清洗。

2.Beam 数据仓库快速切换、跨仓库
由于 Beam 的数据源是多样 IO,所以用 Beam 可以快速切换任何数据仓库。

3.Beam 计算处理平台切换、跨平台
Runners 目前提供了 3-4 种可以切换的平台,如Hadoop,Spark,Flink

3.数据处理流程

3.1Modes

Modes 是 Beam 的模型或叫数据来源的 IO,它是由多种数据源或仓库的 IO 组成,数据源支持批处理和流处理。
Modes要处理的问题有两个,一个是数据源类型,大致分为两类,有界的数据集(如数据库数据)和*的数据流(如消息中间件)。
第二个是时间处理,有两种,一种是全量计算,另一种是部分增量计算。Beam Model 处理的目标数据是*的时间乱序数据流,不考虑时间顺序或有界的数据集可看做是*乱序数据流的一个特例。

3.2 Pipeline

Pipeline 是 Beam 的管道,所有的批处理或流处理都要通过这个管道把数据传输到后端的计算平台。这个管道现在是唯一的。数据源可以切换多种,计算平台或处理平台也支持多种。需要注意的是,管道只有一条,它的作用是连接数据和 Runtimes 平台。

3.3 Runtimes

Runtimes 是大数据计算或处理平台,目前支持 Apache Flink、Apache Spark、Direct Pipeline 和 Google Clound Dataflow 四种。其中 Apache Flink 和 Apache Spark 同时支持本地和云端。Direct Pipeline 仅支持本地,Google Clound Dataflow 仅支持云端。

4. 第一个Beam 程序

4.1 新建一个Maven 项目

<dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.8.0</version>
    </dependency>

4.2

static final List<String> LINES = Arrays.asList(
              "To be, or not to be: that is the question: ",
              "Whether 'tis nobler in the mind to suffer ",
              "The slings and arrows of outrageous fortune, ",
              "Or to take arms against a sea of troubles, ");
    public static void main(String[] args) {     
        wordLenth();
    }
    
    public static void wordLenth() {
         PipelineOptions options =  PipelineOptionsFactory.create();
         Pipeline p = Pipeline.create(options);
         PCollection<String> words = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of());
         PCollection<Integer> wordLengths = words.apply(
                  "ComputeWordLengths",  ParDo.of(new ComputeWordLengthFn()));
         p.run().waitUntilFinish();
    }
    
    /**
     * 传递给ParDo的DoFn对象中包含对输入集合中的元素的进行处理,DoFn从输入的PCollection一次处理一个元素
     */
    static class ComputeWordLengthFn extends DoFn<String, Integer> {
          @ProcessElement
          public void processElement(ProcessContext c) {
            // Get the input element from ProcessContext.
            String word = c.element();
            // Use ProcessContext.output to emit the output element.
            System.out.println(word.length());
            c.output(word.length());
          }
        }

解释一下上面的代码
1.第一步创建一个管道Pipeline
2.PCollection words = p.apply(Create.of(LINES)) 使用原始数据创建数据集
3.
words.apply(“ComputeWordLengths”, ParDo.of(new ComputeWordLengthFn())),统计每一个字符串的长度。
4.p.run().waitUntilFinish();运行管道实例

5.解释程序出现的方法

1.options = PipelineOptionsFactory.create();创建管道参数,设置Runtime类型,当我们不指定的时候,会默认使用DirectRunner这种类型
// pipe.setRunner(DirectRunner.class);

2.PCollection表示Beam中任何大小的输入和输出数据。pipeLine读取数据输入,生成PCollection作为输出。
3.p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of());
使用Beam提供的Create从内存中的Java集合创建PCollection,Create接受Java Collection和一个Coder对象作为参数,在Coder指定的Collection中的元素如何编码。
4.apply方法转化管道中的数据,转换采用PCollection(或多个PCollection)作为输入,在该集合中的每个元素上执行指定的操作,并生成新的输出PCollection。下面是转换格式
[Output PCollection] = [Input PCollection].apply([Transform])

5.ParDo是用于通用并行处理的Beam转换。ParDo的处理范例类似于map/shuffle/reduce形式的算法中的“Map”操作:一个ParDo转换考虑到了输入PCollection中的每个元素,在该元素上执行一些处理函数(用户代码),并发送0个,1个或多个元素到输出PCollection