欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

NiFi数据流实例二

程序员文章站 2022-05-01 14:24:49
...

NiFi数据流实例系列文章,是将我个人做过的一些数据流项目和探索整理成数据流实例,旨在提供NiFi实用方法。

示例说明

本实例构建一条数据流,以小时为间隔获取天气数据,截取部分数据字段,经由Phoenix写入到HBase中。

流程图

NiFi数据流实例二

数据样例

我使用的天气数据是和风天气提供的实况天气。个人感觉,在免费的天气数据接口中,这个数据还是很良心的。

话不多说,直接看数据样例:

{
    "HeWeather6": [
        {
            "basic": {
                "cid": "CN101010100",
                "location": "北京",
                "parent_city": "北京",
                "admin_area": "北京",
                "cnty": "中国",
                "lat": "39.90498734",
                "lon": "116.4052887",
                "tz": "+8.00"
            },
            "update": {
                "loc": "2018-11-27 08:46",
                "utc": "2018-11-27 00:46"
            },
            "status": "ok",
            "now": {
                "cloud": "0",
                "cond_code": "100",
                "cond_txt": "晴",
                "fl": "-3",
                "hum": "9",
                "pcpn": "0.0",
                "pres": "1028",
                "tmp": "6",
                "vis": "10",
                "wind_deg": "351",
                "wind_dir": "北风",
                "wind_sc": "5",
                "wind_spd": "36"
            }
        }
    ]
}

上面是我获取的北京实况天气数据,根据官方说明,数据每小时更新多次。

数据表设计

要设计数据表,首先需要确定要存储哪些数据,由于并没有项目实际的需求,我就随便选取数据一些字段进行存储,选取字段以及数据描述如下:

字段 描述
location 地区/城市名称
loc 当地时间,24小时制,格式yyyy-MM-dd HH:mm
cloud 云量
cond_txt 实况天气状况描述
tmp 温度,默认单位:摄氏度
wind_dir 风向

根据选取字段,设计表结构,创建表语句如下:

CREATE TABLE IF NOT EXISTS XWD.TIANQI (
location VARCHAR NOT NULL,
loc VARCHAR NOT NULL,
cloud VARCHAR,
cond_txt VARCHAR,
tmp VARCHAR,
wind_dir VARCHAR
CONSTRAINT PK PRIMARY KEY (location, loc)
) SALT_BUCKETS=32;

为了方便,这里的字段全部使用字符类型。不过我也尝试了针对个别字段使用其他数据类型,不过会有一些问题,这里不做赘述。

根据数据样例,测试数据插入,语句如下:

UPSERT INTO XWD.TIANQI VALUES('北京','2018-11-27 08:46','0','晴','6','北风');

数据插入测试完成后,记得清空表数据。

注意,这里的插入语句,将为下面的NiFi ReplaceText处理器的配置提供参考。

数据流配置

控制器服务配置

Phoenix-DBCPConnectionPool

类型:DBCPConnectionPool

SETTINGS:

Name Value
Name Phoenix-DBCPConnectionPool

PROPERTIES:

Name Value
Database Connection URL jdbc:phoenix:xwd01,xwd02,xwd03:2181
Database Driver Class Name org.apache.phoenix.jdbc.PhoenixDriver
Database Driver Location(s) /usr/lib/phoenix/phoenix-4.13.1-HBase-1.2-client.jar

这里的Phoenix-DBCPConnectionPool是我创建的DBCPConnectionPool类型的Phoenix数据库连接池。在这里是提供给PutSQL处理器使用的。

处理器配置

说明

每个处理器的配置窗口,有四个Tab,说明如下:

Tab 描述
SETTINGS 处理器信息的配置,例如处理器名称。可以说是一个处理器类型的实例。
SCHEDULING 处理器的调度配置,可以进行定时调度和调整分配给此处理器的资源,例如可以使用的线程数。
PROPERTIES 定义处理器的行为,是处理器配置的重点部分。
COMMENTS 一个文本区,可以写一些描述信息或者其他信息,也可以不写任何内容。

详情可参考 Configuring a Processor

下面的处理器配置说明,我只写改动的和添加的配置部分。

InvokeHTTP

类型:InvokeHTTP

SCHEDULING:

Name Value
Scheduling Strategy CRON driven
Run Schedule 0 0 * * * ?

这里,我选择每小时获取一次数据。如果想要更快的看到效果,可以自行调整定时调度。

PROPERTIES:

Name Value
HTTP Method GET
Remote URL https://free-api.heweather.com/s6/weather/now?key=xxxxxxxxxxxxxx&location=CN101010100

注意这里的Remote URL属性值,是根据和风天气提供的API URL格式拼接的,URL中的key的参数值部分填写你的用户认证key。

EvaluateJsonPath

类型:EvaluateJsonPath

PROPERTIES:

Name Value
Destination flowfile-attribute
location $.HeWeather6[0].basic.location
loc $.HeWeather6[0].update.loc
cloud $.HeWeather6[0].now.cloud
cond_txt $.HeWeather6[0].now.cond_txt
tmp $.HeWeather6[0].now.tmp
wind_dir $.HeWeather6[0].now.wind_dir

上面的属性,除了Destination,其他都是动态属性,是我额外添加的,用来提供给下游处理器使用。

ReplaceText

类型:ReplaceText

PROPERTIES:

Name Value
Replacement Value UPSERT INTO XWD.TIANQI VALUES('${'location'}','${'loc'}','${'cloud'}','${'cond_txt'}','${'tmp'}','${'wind_dir'}')

PutSQL

类型:PutSQL

PROPERTIES:

Name Value
JDBC Connection Pool Phoenix-DBCPConnectionPool

测试运行

开启所有处理器。

等待一段时间后,进入Phoenix的sqlline,查询数据:

select * from xwd.tianqi order by loc;

结果如下:

+-----------+-------------------+--------+-----------+------+-----------+
| LOCATION  |        LOC        | CLOUD  | COND_TXT  | TMP  | WIND_DIR  |
+-----------+-------------------+--------+-----------+------+-----------+
| 北京        | 2018-11-27 11:46  | 0      | 晴         | 9    | 北风        |
| 北京        | 2018-11-27 12:45  | 0      | 晴         | 9    | 北风        |
+-----------+-------------------+--------+-----------+------+-----------+

这个结果证明整个数据流运行通畅。

其他

在本文中,数据流的周期是一小时,而在许多的生产环境中,数据流的周期是微秒级的。这里只描述了数据流的流程构建,在具体的应用中,需要对处理器的配置进行适当的优化,以达到满足需求的目的。

试想,如果在本文的数据流程中流转的数据流是微秒级的,那么最可能出问题的是哪一环节?

我做的微秒级流数据处理项目中,曾遇到这样一个问题:数据积压在队列中,从而导致整条数据流崩溃。

经过调试和分析后,我发现瓶颈是数据入库环节,这一环节处理一条数据的速度跟不上上游处理器生产数据的速度,也就是说无法及时消费掉数据队列中的数据,造成数据积压在队列中。

问题根源找到后,解决问题的思路有两个方向:

  1. 降低数据生产速度(延长处理器运行周期)
  2. 提高数据入库环节的数据消费速度(增加处理器线程数)

一般来说,在一个生产环境中,降低数据生产速度,对业务影响极大,所以大多数情况下,不会选择降低数据生产速度。于是我选择提高数据入库环节的数据消费速度。

具体实现,就是增加PutSQL处理器的线程数,示例如下:

SCHEDULING:

Name Value
Concurrent Tasks 30

这个属性值,表示分配给当前处理器的线程数,也就是说当前处理器最多可用的线程数。根据具体的服务器环境做调整即可。