欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

NiFi数据流实例三

程序员文章站 2022-05-01 14:37:30
...

NiFi数据流实例系列文章,是将我个人做过的一些数据流项目和探索整理成数据流实例,旨在提供NiFi实用方法。

示例说明

本实例构建一条数据流,以小时为间隔获取天气数据,截取部分数据字段,经由Phoenix写入到HBase中。其中全部数据都写入到TIANQI表中,过滤一部分数据写入到TIANQI_FILTER表中。

本实例是在NiFi数据流实例二的基础上进行的升级,虽然阅读的时候不需要参考实例二,但是建议不熟悉NiFi的朋友先阅读NiFi数据流实例二

流程图

NiFi数据流实例三

数据样例

我使用的天气数据是和风天气提供的实况天气。个人感觉,在免费的天气数据接口中,这个数据还是很良心的。

话不多说,直接看数据样例:

{
    "HeWeather6": [
        {
            "basic": {
                "cid": "CN101010100",
                "location": "北京",
                "parent_city": "北京",
                "admin_area": "北京",
                "cnty": "中国",
                "lat": "39.90498734",
                "lon": "116.4052887",
                "tz": "+8.00"
            },
            "update": {
                "loc": "2018-11-27 08:46",
                "utc": "2018-11-27 00:46"
            },
            "status": "ok",
            "now": {
                "cloud": "0",
                "cond_code": "100",
                "cond_txt": "晴",
                "fl": "-3",
                "hum": "9",
                "pcpn": "0.0",
                "pres": "1028",
                "tmp": "6",
                "vis": "10",
                "wind_deg": "351",
                "wind_dir": "北风",
                "wind_sc": "5",
                "wind_spd": "36"
            }
        }
    ]
}

上面是我获取的北京实况天气数据,根据官方说明,数据每小时更新多次。

数据表设计

要设计数据表,首先需要确定要存储哪些数据,由于并没有项目实际的需求,我就随便选取数据一些字段进行存储,选取字段以及数据描述如下:

字段 描述
location 地区/城市名称
loc 当地时间,24小时制,格式yyyy-MM-dd HH:mm
cloud 云量
cond_txt 实况天气状况描述
tmp 温度,默认单位:摄氏度
wind_dir 风向

根据选取字段,设计表结构,创建表语句如下:

TIANQI

CREATE TABLE IF NOT EXISTS XWD.TIANQI (
location VARCHAR NOT NULL,
loc VARCHAR NOT NULL,
cloud VARCHAR,
cond_txt VARCHAR,
tmp VARCHAR,
wind_dir VARCHAR
CONSTRAINT PK PRIMARY KEY (location, loc)
) SALT_BUCKETS=32;

TIANQI_FILTER

CREATE TABLE IF NOT EXISTS XWD.TIANQI_FILTER (
location VARCHAR NOT NULL,
loc VARCHAR NOT NULL,
cloud VARCHAR,
cond_txt VARCHAR,
tmp VARCHAR,
wind_dir VARCHAR
CONSTRAINT PK PRIMARY KEY (location, loc)
) SALT_BUCKETS=32;

两张表,除了表名不同,其他都是一样的。为了方便,这里的字段全部使用字符类型。不过我也尝试了针对个别字段使用其他数据类型,不过会有一些问题,这里不做赘述。

根据数据样例,测试数据插入,语句如下:

UPSERT INTO XWD.TIANQI VALUES('北京','2018-11-27 08:46','0','晴','6','北风');
UPSERT INTO XWD.TIANQI_FILTER VALUES('北京','2018-11-27 08:46','0','晴','6','北风');

数据插入测试完成后,记得清空表数据。

注意,这里的插入语句,将为下面的NiFi ReplaceText处理器的配置提供参考。

数据流配置

控制器服务配置

Phoenix-DBCPConnectionPool

类型:DBCPConnectionPool

SETTINGS:

Name Value
Name Phoenix-DBCPConnectionPool

PROPERTIES:

Name Value
Database Connection URL jdbc:phoenix:xwd01,xwd02,xwd03:2181
Database Driver Class Name org.apache.phoenix.jdbc.PhoenixDriver
Database Driver Location(s) /usr/lib/phoenix/phoenix-4.13.1-HBase-1.2-client.jar

这里的Phoenix-DBCPConnectionPool是我创建的DBCPConnectionPool类型的Phoenix数据库连接池。在这里是提供给PutSQL处理器使用的。

处理器配置

说明

每个处理器的配置窗口,有四个Tab,说明如下:

Tab 描述
SETTINGS 处理器信息的配置,例如处理器名称。可以说是一个处理器类型的实例。
SCHEDULING 处理器的调度配置,可以进行定时调度和调整分配给此处理器的资源,例如可以使用的线程数。
PROPERTIES 定义处理器的行为,是处理器配置的重点部分。
COMMENTS 一个文本区,可以写一些描述信息或者其他信息,也可以不写任何内容。

详情可参考 Configuring a Processor

下面的处理器配置说明,我只写改动的和添加的配置部分。

InvokeHTTP

类型:InvokeHTTP

SCHEDULING:

Name Value
Scheduling Strategy CRON driven
Run Schedule 0 0 * * * ?

这里,我选择每小时获取一次数据。如果想要更快的看到效果,可以自行调整定时调度。

PROPERTIES:

Name Value
HTTP Method GET
Remote URL https://free-api.heweather.com/s6/weather/now?key=xxxxxxxxxxxxxx&location=CN101010100

注意这里的Remote URL属性值,是根据和风天气提供的API URL格式拼接的,URL中的key的参数值部分填写你的用户认证key。

EvaluateJsonPath

类型:EvaluateJsonPath

PROPERTIES:

Name Value
Destination flowfile-attribute
location $.HeWeather6[0].basic.location
loc $.HeWeather6[0].update.loc
cloud $.HeWeather6[0].now.cloud
cond_txt $.HeWeather6[0].now.cond_txt
tmp $.HeWeather6[0].now.tmp
wind_dir $.HeWeather6[0].now.wind_dir

上面的属性,除了Destination,其他都是动态属性,是我额外添加的,用来提供给下游处理器使用。

RouteTianQi

类型:RouteOnAttribute

SETTINGS:

Name Value
Name RouteTianQi

PROPERTIES:

Name Value
Routing Strategy Route to ‘matched’ if all match
cloud_eq ${cloud:equals(0)}
tmp_gt ${tmp:gt(2)}
tmp_lt ${tmp:lt(7)}

上面的属性,除了Routing Strategy,其他都是动态属性,是我额外添加的,用来分流数据。

逻辑:数据指标满足cloud值等于0,tmp值大于2小于7的时候,数据将被路由到matched。

TianQi

类型:ReplaceText

SETTINGS:

Name Value
Name TianQi

PROPERTIES:

Name Value
Replacement Value UPSERT INTO XWD.TIANQI VALUES('${'location'}','${'loc'}','${'cloud'}','${'cond_txt'}','${'tmp'}','${'wind_dir'}')

TianQiFilter

类型:ReplaceText

SETTINGS:

Name Value
Name TianQiFilter

PROPERTIES:

Name Value
Replacement Value UPSERT INTO XWD.TIANQI_FILTER VALUES('${'location'}','${'loc'}','${'cloud'}','${'cond_txt'}','${'tmp'}','${'wind_dir'}')

UpsertToPhoenix

类型:PutSQL

SETTINGS:

Name Value
Name UpsertToPhoenix

PROPERTIES:

Name Value
JDBC Connection Pool Phoenix-DBCPConnectionPool

测试运行

开启所有处理器。

等待一段时间后,进入Phoenix的sqlline,查询数据:

TIANQI

select * from xwd.tianqi order by loc;

部分结果如下:

+-----------+-------------------+--------+-----------+------+-----------+
| LOCATION  |        LOC        | CLOUD  | COND_TXT  | TMP  | WIND_DIR  |
+-----------+-------------------+--------+-----------+------+-----------+
| 北京        | 2018-11-29 18:45  | 0      | 晴         | 3    | 南风        |
| 北京        | 2018-11-29 19:46  | 0      | 晴         | 1    | 西南风       |
| 北京        | 2018-11-29 20:45  | 0      | 晴         | 0    | 南风        |
| 北京        | 2018-11-29 21:45  | 0      | 晴         | 0    | 东南风       |
| 北京        | 2018-11-29 22:46  | 0      | 晴         | 0    | 东南风       |
| 北京        | 2018-11-29 23:46  | 0      | 晴         | 0    | 东南风       |
| 北京        | 2018-11-30 00:46  | 0      | 晴         | -1   | 西风        |
| 北京        | 2018-11-30 01:45  | 0      | 晴         | -2   | 东北风       |
| 北京        | 2018-11-30 02:46  | 0      | 晴         | -1   | 南风        |
| 北京        | 2018-11-30 03:45  | 0      | 晴         | -1   | 东风        |
| 北京        | 2018-11-30 04:45  | 0      | 晴         | -1   | 东风        |
| 北京        | 2018-11-30 05:45  | 0      | 晴         | -1   | 东北风       |
| 北京        | 2018-11-30 06:45  | 0      | 晴         | -1   | 东风        |
| 北京        | 2018-11-30 07:45  | 0      | 晴         | -1   | 东北风       |
| 北京        | 2018-11-30 08:45  | 0      | 晴         | 0    | 东风        |
| 北京        | 2018-11-30 09:46  | 0      | 晴         | 2    | 东风        |
| 北京        | 2018-11-30 10:45  | 0      | 晴         | 4    | 东风        |
| 北京        | 2018-11-30 11:46  | 0      | 晴         | 6    | 南风        |
+-----------+-------------------+--------+-----------+------+-----------+

TIANQI_FILTER

select * from xwd.tianqi_filter order by loc;

部分结果如下:

+-----------+-------------------+--------+-----------+------+-----------+
| LOCATION  |        LOC        | CLOUD  | COND_TXT  | TMP  | WIND_DIR  |
+-----------+-------------------+--------+-----------+------+-----------+
| 北京        | 2018-11-29 10:45  | 0      | 晴         | 5    | 南风        |
| 北京        | 2018-11-29 11:46  | 0      | 晴         | 5    | 西南风       |
| 北京        | 2018-11-29 12:46  | 0      | 晴         | 6    | 南风        |
| 北京        | 2018-11-29 13:45  | 0      | 晴         | 6    | 西南风       |
| 北京        | 2018-11-29 14:45  | 0      | 晴         | 6    | 西南风       |
| 北京        | 2018-11-29 15:45  | 0      | 晴         | 6    | 南风        |
| 北京        | 2018-11-29 16:45  | 0      | 晴         | 5    | 南风        |
| 北京        | 2018-11-29 17:45  | 0      | 晴         | 4    | 西南风       |
| 北京        | 2018-11-29 18:45  | 0      | 晴         | 3    | 南风        |
| 北京        | 2018-11-30 10:45  | 0      | 晴         | 4    | 东风        |
| 北京        | 2018-11-30 11:46  | 0      | 晴         | 6    | 南风        |
+-----------+-------------------+--------+-----------+------+-----------+

这个结果证明整个数据流运行通畅,并且符合预期。