ELK实践(三)北京历年空气质量数据分析
一、了解数据及建模
北京空气质量数据,下载地址
数据建模:
PUT air_quality
{
"mappings": {
"doc": {
"dynamic": false,
"properties": {
"@timestamp": {
"type": "date"
},
"city": {
"type": "keyword",
"ignore_above": 256
},
"parameter": {
"type": "keyword",
"ignore_above": 256
},
"status": {
"type": "keyword",
"ignore_above": 256
},
"value": {
"type": "long"
}
}
}
},
"settings": {
"index": {
"number_of_shards": "1",
"number_of_replicas": "0"
}
}
}
二、导入数据
本次实验使用Filebeat + Ingest Nodde
架构。
filebeat 配置:
filebeat.prospectors:
- type: log
enabled: true
paths:
- /home/wfs/data/*.csv
exclude_lines: ["^A ","^The","^Site","^,"]
output.elasticsearch:
hosts: ["192.168.20.20:9200"]
pipeline: "airquality"
index: "air_quality"
username: "elastic"
password: "123456"
setup.template.name: "airquality"
setup.template.pattern: "airquality*"
setup.template.enabled: false
数据:
# ls /home/wfs/data/
Beijing_2008_HourlyPM2.5_created20140325.csv Beijing_2014_HourlyPM25_created20150203.csv
Beijing_2009_HourlyPM25_created20140709.csv Beijing_2015_HourlyPM25_created20160201.csv
Beijing_2010_HourlyPM25_created20140709.csv Beijing_2016_HourlyPM25_created20170201.csv
Beijing_2011_HourlyPM25_created20140709.csv Beijing_2017_HourlyPM25_created20170803.csv
Beijing_2012_HourlyPM2.5_created20140325.csv
Beijing_2013_HourlyPM2.5_created20140325.csv
Pipeline配置:
PUT /_ingest/pipeline/airquality
{
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{DATA:city},%{DATA:parameter},%{DATA:date},%{NUMBER:year},%{NUMBER:month},%{NUMBER:day},%{NUMBER:hour},%{NUMBER:value},%{DATA:unit},%{DATA:duration},%{WORD:status}"
]
}
},
{
"set": {
"field": "_id",
"value": "{{city}}-{{date}}"
}
},
{
"date": {
"field": "date",
"target_field": "@timestamp",
"formats": [
"MM/dd/yyyy HH:mm",
"yyyy-MM-dd HH:mm"
],
"timezone": "Asia/Shanghai"
}
},
{
"remove": {
"field": "message"
}
},
{
"remove": {
"field": "beat"
}
},
{
"remove": {
"field": "offset"
}
},
{
"remove": {
"field": "source"
}
},
{
"remove": {
"field": "date"
}
},
{
"convert": {
"field": "year",
"type": "integer"
}
},
{
"convert": {
"field": "month",
"type": "integer"
}
},
{
"convert": {
"field": "day",
"type": "integer"
}
},
{
"convert": {
"field": "hour",
"type": "integer"
}
},
{
"remove": {
"field": "duration"
}
},
{
"remove": {
"field": "unit"
}
},
{
"convert": {
"field": "value",
"type": "integer"
}
}
],
"on_failure": [
{
"set": {
"field": "error.message",
"value": "{{ _ingest.on_failure_message }}"
}
}
]
}
数据导入完成后诶下图所示:
可以看到数据是以小时为间隔采集的,为了便于分析,可以借助python,将小时数据聚合到天的维度。
from datetime import datetime
from elasticsearch import Elasticsearch
es = Elasticsearch(['192.168.20.20:9200'])
search_query = {
"query": {
"range": {
"value": {
"gte": 1
}
}
},
"aggs": {
"days": {
"date_histogram": {
"field": "@timestamp",
"interval": "day",
"time_zone": "+08:00"
},
"aggs": {
"pm25": {
"stats": {
"field": "value"
}
}
}
}
},
"size": 0
}
res = es.search(index='air_quality', body=search_query)
index_name = 'air_quality_days'
index_type = 'doc'
es.indices.delete(index=index_name, ignore=[400, 404])
for info in res['aggregations']['days']['buckets']:
cur_date = datetime.strptime(info['key_as_string'], '%Y-%m-%dT%H:%M:%S.%f+08:00')
new_doc = {
"@timestamp": info['key_as_string'],
'year': cur_date.year,
'month': cur_date.month,
'day': cur_date.day,
"value_max": info['pm25']['max'],
"value_avg": info['pm25']['avg'],
"value_min": info['pm25']['min'],
}
es.index(index=index_name, doc_type=index_type, id=new_doc['@timestamp'], body=new_doc)
print(new_doc)
可以看到,上边的search_query
实质上是对value大于1的按天进行分桶,并使用status
返回当天PM25的系列统计值,完全等价于如下DSL:
GET air_quality/_search
{
"size":0,
"query": {
"range": {
"value": {
"gt": 1
}
}
},
"aggs": {
"days": {
"date_histogram": {
"field": "@timestamp",
"interval": "day",
"time_zone": "+08:00"
},
"aggs": {
"PM25": {
"stats": {
"field": "value"
}
}
}
}
}
}
然后对上述聚合分析循环遍历,创建新的以天为维度的索引air_quality_days
:
三、数据实战分析
通过数据,我们首先可以从整体上看下十年来空气质量是否有好转:
1.空气质量分析 – 每年蓝天占比饼图:
这里使用了脚本动态生成到了rate_level
字段,
在ManageMent–>Index Patterns中配置:
脚本内容:
double val=doc['value_max'].value;
String rtn="";
if(val<50){
rtn="1-Good"
}else if(val<100){
rtn="2-Moderate"
}else if(val<150){
rtn="3-Unhealthy for Sensitive Groups"
}else if(val<200){
rtn="4-Unhealthy"
}else if(val<300){
rtn="5-Very Unhealthy"
}else{
rtn="6-Hazardous"
}
return rtn;
2.空气质量分析 – AQI质量随时间分布占比:
配置比较简单:Options中设置:Chart Type:Bar Stacked:Percent
3.空气质量分析 – 每年蓝天占比(VB):
以value_max的值等于100为判断依据,小于100定为Good,Panel Options 中将书剑间隔Interval
设置为1y
即可
4.空气质量分析 – 每月蓝天占比(VB):
同理,将时间间隔修改为1M
:
然后在具体到某一时间段内,看一下空气质量是否有改善。比如通过数据对比2016年冬季较2015年同一时刻的空气质量情况。
1.空气质量分析 – 2016 vs 2015 冬季雾霾天数占比(TL)
.es(index=air_quality_days,q='value_max:>150',offset=-1y).divide(.es(index=air_quality_days,offset=-1y)).multiply(100).label(2015).lines(fill=1,width=1),.es(index=air_quality_days,q='value_max:>150').divide(.es(index=air_quality_days)).multiply(100).label(2016).lines(fill=3,width=1).title("2016 vs 2015雾霾天数占比")
2.空气质量分析 – 2016 VS 2015 PM25最大值比较 (VB)
将 Offset series time设置为1y
即可得到2015年的数据,通过Fill
(0-1)和Line Width
控制线的样式和透明度
3.空气质量分析 – 2016、2015年冬季雾霾天数(VB Metric)
4.空气质量分析 – 2016、2015年冬季每天空气质量情况
最后附上两个dashboard:
上一篇: 2、测试理论--测试模型
推荐阅读