NiFi数据流实例二
NiFi数据流实例系列文章,是将我个人做过的一些数据流项目和探索整理成数据流实例,旨在提供NiFi实用方法。
示例说明
本实例构建一条数据流,以小时为间隔获取天气数据,截取部分数据字段,经由Phoenix写入到HBase中。
流程图
数据样例
我使用的天气数据是和风天气提供的实况天气。个人感觉,在免费的天气数据接口中,这个数据还是很良心的。
话不多说,直接看数据样例:
{
"HeWeather6": [
{
"basic": {
"cid": "CN101010100",
"location": "北京",
"parent_city": "北京",
"admin_area": "北京",
"cnty": "中国",
"lat": "39.90498734",
"lon": "116.4052887",
"tz": "+8.00"
},
"update": {
"loc": "2018-11-27 08:46",
"utc": "2018-11-27 00:46"
},
"status": "ok",
"now": {
"cloud": "0",
"cond_code": "100",
"cond_txt": "晴",
"fl": "-3",
"hum": "9",
"pcpn": "0.0",
"pres": "1028",
"tmp": "6",
"vis": "10",
"wind_deg": "351",
"wind_dir": "北风",
"wind_sc": "5",
"wind_spd": "36"
}
}
]
}
上面是我获取的北京实况天气数据,根据官方说明,数据每小时更新多次。
数据表设计
要设计数据表,首先需要确定要存储哪些数据,由于并没有项目实际的需求,我就随便选取数据一些字段进行存储,选取字段以及数据描述如下:
字段 | 描述 |
---|---|
location | 地区/城市名称 |
loc | 当地时间,24小时制,格式yyyy-MM-dd HH:mm |
cloud | 云量 |
cond_txt | 实况天气状况描述 |
tmp | 温度,默认单位:摄氏度 |
wind_dir | 风向 |
根据选取字段,设计表结构,创建表语句如下:
CREATE TABLE IF NOT EXISTS XWD.TIANQI (
location VARCHAR NOT NULL,
loc VARCHAR NOT NULL,
cloud VARCHAR,
cond_txt VARCHAR,
tmp VARCHAR,
wind_dir VARCHAR
CONSTRAINT PK PRIMARY KEY (location, loc)
) SALT_BUCKETS=32;
为了方便,这里的字段全部使用字符类型。不过我也尝试了针对个别字段使用其他数据类型,不过会有一些问题,这里不做赘述。
根据数据样例,测试数据插入,语句如下:
UPSERT INTO XWD.TIANQI VALUES('北京','2018-11-27 08:46','0','晴','6','北风');
数据插入测试完成后,记得清空表数据。
注意,这里的插入语句,将为下面的NiFi ReplaceText处理器的配置提供参考。
数据流配置
控制器服务配置
Phoenix-DBCPConnectionPool
类型:DBCPConnectionPool
SETTINGS:
Name | Value |
---|---|
Name | Phoenix-DBCPConnectionPool |
PROPERTIES:
Name | Value |
---|---|
Database Connection URL | jdbc:phoenix:xwd01,xwd02,xwd03:2181 |
Database Driver Class Name | org.apache.phoenix.jdbc.PhoenixDriver |
Database Driver Location(s) | /usr/lib/phoenix/phoenix-4.13.1-HBase-1.2-client.jar |
这里的Phoenix-DBCPConnectionPool是我创建的DBCPConnectionPool类型的Phoenix数据库连接池。在这里是提供给PutSQL处理器使用的。
处理器配置
说明
每个处理器的配置窗口,有四个Tab,说明如下:
Tab | 描述 |
---|---|
SETTINGS | 处理器信息的配置,例如处理器名称。可以说是一个处理器类型的实例。 |
SCHEDULING | 处理器的调度配置,可以进行定时调度和调整分配给此处理器的资源,例如可以使用的线程数。 |
PROPERTIES | 定义处理器的行为,是处理器配置的重点部分。 |
COMMENTS | 一个文本区,可以写一些描述信息或者其他信息,也可以不写任何内容。 |
详情可参考 Configuring a Processor。
下面的处理器配置说明,我只写改动的和添加的配置部分。
InvokeHTTP
类型:InvokeHTTP
SCHEDULING:
Name | Value |
---|---|
Scheduling Strategy | CRON driven |
Run Schedule | 0 0 * * * ? |
这里,我选择每小时获取一次数据。如果想要更快的看到效果,可以自行调整定时调度。
PROPERTIES:
Name | Value |
---|---|
HTTP Method | GET |
Remote URL | https://free-api.heweather.com/s6/weather/now?key=xxxxxxxxxxxxxx&location=CN101010100 |
注意这里的Remote URL属性值,是根据和风天气提供的API URL格式拼接的,URL中的key的参数值部分填写你的用户认证key。
EvaluateJsonPath
类型:EvaluateJsonPath
PROPERTIES:
Name | Value |
---|---|
Destination | flowfile-attribute |
location | $.HeWeather6[0].basic.location |
loc | $.HeWeather6[0].update.loc |
cloud | $.HeWeather6[0].now.cloud |
cond_txt | $.HeWeather6[0].now.cond_txt |
tmp | $.HeWeather6[0].now.tmp |
wind_dir | $.HeWeather6[0].now.wind_dir |
上面的属性,除了Destination,其他都是动态属性,是我额外添加的,用来提供给下游处理器使用。
ReplaceText
类型:ReplaceText
PROPERTIES:
Name | Value |
---|---|
Replacement Value | UPSERT INTO XWD.TIANQI VALUES('${'location'}','${'loc'}','${'cloud'}','${'cond_txt'}','${'tmp'}','${'wind_dir'}') |
PutSQL
类型:PutSQL
PROPERTIES:
Name | Value |
---|---|
JDBC Connection Pool | Phoenix-DBCPConnectionPool |
测试运行
开启所有处理器。
等待一段时间后,进入Phoenix的sqlline,查询数据:
select * from xwd.tianqi order by loc;
结果如下:
+-----------+-------------------+--------+-----------+------+-----------+
| LOCATION | LOC | CLOUD | COND_TXT | TMP | WIND_DIR |
+-----------+-------------------+--------+-----------+------+-----------+
| 北京 | 2018-11-27 11:46 | 0 | 晴 | 9 | 北风 |
| 北京 | 2018-11-27 12:45 | 0 | 晴 | 9 | 北风 |
+-----------+-------------------+--------+-----------+------+-----------+
这个结果证明整个数据流运行通畅。
其他
在本文中,数据流的周期是一小时,而在许多的生产环境中,数据流的周期是微秒级的。这里只描述了数据流的流程构建,在具体的应用中,需要对处理器的配置进行适当的优化,以达到满足需求的目的。
试想,如果在本文的数据流程中流转的数据流是微秒级的,那么最可能出问题的是哪一环节?
我做的微秒级流数据处理项目中,曾遇到这样一个问题:数据积压在队列中,从而导致整条数据流崩溃。
经过调试和分析后,我发现瓶颈是数据入库环节,这一环节处理一条数据的速度跟不上上游处理器生产数据的速度,也就是说无法及时消费掉数据队列中的数据,造成数据积压在队列中。
问题根源找到后,解决问题的思路有两个方向:
- 降低数据生产速度(延长处理器运行周期)
- 提高数据入库环节的数据消费速度(增加处理器线程数)
一般来说,在一个生产环境中,降低数据生产速度,对业务影响极大,所以大多数情况下,不会选择降低数据生产速度。于是我选择提高数据入库环节的数据消费速度。
具体实现,就是增加PutSQL处理器的线程数,示例如下:
SCHEDULING:
Name | Value |
---|---|
Concurrent Tasks | 30 |
这个属性值,表示分配给当前处理器的线程数,也就是说当前处理器最多可用的线程数。根据具体的服务器环境做调整即可。
上一篇: 畜生!这是谁干的?站出来我保证不打死你!
下一篇: 你俩心可真大
推荐阅读
-
php实现与erlang的二进制通讯实例解析
-
从零学python系列之数据处理编程实例(二)
-
解析二进制流接口应用实例 pack、unpack、ord 函数使用方法
-
php中判断数组是一维,二维,还是多维的解决方法_php实例
-
Javascript实现简单二级下拉菜单实例_javascript技巧
-
PHP基于phpqrcode生成带LOGO图像的二维码实例,phpqrcodelogo
-
php生成二维码的几种方式整理及使用实例_PHP
-
一起talk C栗子吧(第一百八十三回:C语言实例--在printf函数中设置输出宽度二)
-
Drupal7 form表单二次开发要点与实例
-
怎样给PHP源代码加密?PHP二进制加密与解密的解决办法_php实例