欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ELK实践(三)北京历年空气质量数据分析

程序员文章站 2024-03-22 11:55:04
...

一、了解数据及建模

北京空气质量数据,下载地址
ELK实践(三)北京历年空气质量数据分析
数据建模:

PUT air_quality
{
  "mappings": {
    "doc": {
      "dynamic": false,
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "city": {
          "type": "keyword",
          "ignore_above": 256
        },
        "parameter": {
          "type": "keyword",
          "ignore_above": 256
        },
        "status": {
          "type": "keyword",
          "ignore_above": 256
        },
        "value": {
          "type": "long"
        }
      }
    }
  },
  "settings": {
    "index": {
      "number_of_shards": "1",
      "number_of_replicas": "0"
    }
  }
}

二、导入数据

本次实验使用Filebeat + Ingest Nodde架构。
ELK实践(三)北京历年空气质量数据分析
filebeat 配置:


filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /home/wfs/data/*.csv
  exclude_lines: ["^A ","^The","^Site","^,"]

output.elasticsearch:
  hosts: ["192.168.20.20:9200"]
  pipeline: "airquality"
  index: "air_quality"
  username: "elastic"
  password: "123456"

setup.template.name: "airquality"
setup.template.pattern: "airquality*"
setup.template.enabled: false

数据:

# ls /home/wfs/data/
Beijing_2008_HourlyPM2.5_created20140325.csv  Beijing_2014_HourlyPM25_created20150203.csv
Beijing_2009_HourlyPM25_created20140709.csv   Beijing_2015_HourlyPM25_created20160201.csv
Beijing_2010_HourlyPM25_created20140709.csv   Beijing_2016_HourlyPM25_created20170201.csv
Beijing_2011_HourlyPM25_created20140709.csv   Beijing_2017_HourlyPM25_created20170803.csv
Beijing_2012_HourlyPM2.5_created20140325.csv  
Beijing_2013_HourlyPM2.5_created20140325.csv

Pipeline配置:

PUT /_ingest/pipeline/airquality
{
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{DATA:city},%{DATA:parameter},%{DATA:date},%{NUMBER:year},%{NUMBER:month},%{NUMBER:day},%{NUMBER:hour},%{NUMBER:value},%{DATA:unit},%{DATA:duration},%{WORD:status}"
        ]
      }
    },
    {
      "set": {
        "field": "_id",
        "value": "{{city}}-{{date}}"
      }
    },
    {
      "date": {
        "field": "date",
        "target_field": "@timestamp",
        "formats": [
          "MM/dd/yyyy HH:mm",
          "yyyy-MM-dd HH:mm"
        ],
        "timezone": "Asia/Shanghai"
      }
    },
    {
      "remove": {
        "field": "message"
      }
    },
    {
      "remove": {
        "field": "beat"
      }
    },
    {
      "remove": {
        "field": "offset"
      }
    },
    {
      "remove": {
        "field": "source"
      }
    },
    {
      "remove": {
        "field": "date"
      }
    },
    {
      "convert": {
        "field": "year",
        "type": "integer"
      }
    },
    {
      "convert": {
        "field": "month",
        "type": "integer"
      }
    },
    {
      "convert": {
        "field": "day",
        "type": "integer"
      }
    },
    {
      "convert": {
        "field": "hour",
        "type": "integer"
      }
    },
    {
      "remove": {
        "field": "duration"
      }
    },
    {
      "remove": {
        "field": "unit"
      }
    },
    {
      "convert": {
        "field": "value",
        "type": "integer"
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "error.message",
        "value": "{{ _ingest.on_failure_message }}"
      }
    }
  ]
}

数据导入完成后诶下图所示:
ELK实践(三)北京历年空气质量数据分析
可以看到数据是以小时为间隔采集的,为了便于分析,可以借助python,将小时数据聚合到天的维度。

from datetime import datetime
from elasticsearch import Elasticsearch

es = Elasticsearch(['192.168.20.20:9200'])

search_query = {
    "query": {
        "range": {
            "value": {
                "gte": 1
            }
        }
    },
    "aggs": {
        "days": {
            "date_histogram": {
                "field": "@timestamp",
                "interval": "day",
                "time_zone": "+08:00"
            },
            "aggs": {
                "pm25": {
                    "stats": {
                        "field": "value"
                    }
                }
            }
        }
    },
    "size": 0
}
res = es.search(index='air_quality', body=search_query)

index_name = 'air_quality_days'
index_type = 'doc'
es.indices.delete(index=index_name, ignore=[400, 404])

for info in res['aggregations']['days']['buckets']:
    cur_date = datetime.strptime(info['key_as_string'], '%Y-%m-%dT%H:%M:%S.%f+08:00')
    new_doc = {
        "@timestamp": info['key_as_string'],
        'year': cur_date.year,
        'month': cur_date.month,
        'day': cur_date.day,
        "value_max": info['pm25']['max'],
        "value_avg": info['pm25']['avg'],
        "value_min": info['pm25']['min'],
    }
    es.index(index=index_name, doc_type=index_type, id=new_doc['@timestamp'], body=new_doc)
    print(new_doc)

可以看到,上边的search_query实质上是对value大于1的按天进行分桶,并使用status返回当天PM25的系列统计值,完全等价于如下DSL:

GET air_quality/_search
{
  "size":0,
  "query": {
    "range": {
      "value": {
        "gt": 1
      }
    }
  },
  "aggs": {
    "days": {
      "date_histogram": {
        "field": "@timestamp",
        "interval": "day",
        "time_zone": "+08:00"
      },
      "aggs": {
        "PM25": {
          "stats": {
            "field": "value"
          }
        }
      }
    }
  }
}

然后对上述聚合分析循环遍历,创建新的以天为维度的索引air_quality_days
ELK实践(三)北京历年空气质量数据分析

三、数据实战分析

通过数据,我们首先可以从整体上看下十年来空气质量是否有好转:
1.空气质量分析 – 每年蓝天占比饼图:
ELK实践(三)北京历年空气质量数据分析
这里使用了脚本动态生成到了rate_level字段,
ELK实践(三)北京历年空气质量数据分析
在ManageMent–>Index Patterns中配置:
ELK实践(三)北京历年空气质量数据分析
脚本内容:

double val=doc['value_max'].value;
String rtn="";
if(val<50){
rtn="1-Good"
}else if(val<100){
rtn="2-Moderate"
}else if(val<150){
rtn="3-Unhealthy for Sensitive Groups"
}else if(val<200){
rtn="4-Unhealthy"
}else if(val<300){
rtn="5-Very Unhealthy"
}else{
rtn="6-Hazardous"
}
return rtn;

2.空气质量分析 – AQI质量随时间分布占比:
ELK实践(三)北京历年空气质量数据分析
配置比较简单:Options中设置:Chart Type:Bar Stacked:Percent
ELK实践(三)北京历年空气质量数据分析
3.空气质量分析 – 每年蓝天占比(VB):
以value_max的值等于100为判断依据,小于100定为Good,Panel Options 中将书剑间隔Interval设置为1y即可
ELK实践(三)北京历年空气质量数据分析
4.空气质量分析 – 每月蓝天占比(VB):
同理,将时间间隔修改为1M
ELK实践(三)北京历年空气质量数据分析
然后在具体到某一时间段内,看一下空气质量是否有改善。比如通过数据对比2016年冬季较2015年同一时刻的空气质量情况。
1.空气质量分析 – 2016 vs 2015 冬季雾霾天数占比(TL)

.es(index=air_quality_days,q='value_max:>150',offset=-1y).divide(.es(index=air_quality_days,offset=-1y)).multiply(100).label(2015).lines(fill=1,width=1),.es(index=air_quality_days,q='value_max:>150').divide(.es(index=air_quality_days)).multiply(100).label(2016).lines(fill=3,width=1).title("2016 vs 2015雾霾天数占比")

ELK实践(三)北京历年空气质量数据分析
2.空气质量分析 – 2016 VS 2015 PM25最大值比较 (VB)
ELK实践(三)北京历年空气质量数据分析
将 Offset series time设置为1y即可得到2015年的数据,通过Fill(0-1)和Line Width控制线的样式和透明度
ELK实践(三)北京历年空气质量数据分析
3.空气质量分析 – 2016、2015年冬季雾霾天数(VB Metric)
ELK实践(三)北京历年空气质量数据分析
4.空气质量分析 – 2016、2015年冬季每天空气质量情况
ELK实践(三)北京历年空气质量数据分析

最后附上两个dashboard:
ELK实践(三)北京历年空气质量数据分析

ELK实践(三)北京历年空气质量数据分析

相关标签: ELK