欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Flink+Druid构建实时OLAP的探索

程序员文章站 2022-04-09 21:33:19
场景 k12在线教育公司的业务场景中,有一些业务场景需要实时统计和分析,如分析在线上课老师数量、学生数量,实时销售额,课堂崩溃率等,需要实时反应上课的质量问题,以便于对整个公司的业务情况有大致的了解。 方案对比 对比了很多解决方案,如下几种,列出来供参考。 设计方案 实时处理采用Flink SQL, ......

场景

k12在线教育公司的业务场景中,有一些业务场景需要实时统计和分析,如分析在线上课老师数量、学生数量,实时销售额,课堂崩溃率等,需要实时反应上课的质量问题,以便于对整个公司的业务情况有大致的了解。

方案对比

对比了很多解决方案,如下几种,列出来供参考。

方案 实时入库 sql支持度
spark+carbondata 支持 spark sql语法丰富
kylin 不支持 支持join
flink+druid 支持 0.15以前不支持sql,不支持join
  1. 上一篇文章所示,使用spark+carbondata也是一种解决方案,但是他的缺点也是比较明显,如不能和flink进行结合,因为我们整个的大数据规划的大致方向是,spark用来作为离线计算,flink作为实时计算,并且这两个大方向短时间内不会改变;
  2. kylin一直是老牌olap引擎,但是有个缺点无法满足我们的需求,就是在技术选型的那个时间点kylin还不支持实时入库(后续2.0版本支持实时入库),所以就选择了放弃;
  3. 使用flink+druid方式实现,这个时间选择这个方案,简直是顺应潮流呀,flink现在如日中天,各大厂都在使用,druid是olap的新贵,关于它的文章也有很多,我也不赘述太多。有兴趣的可以看下,我的博客其它文章也有最新版本的安装教程,实操方案哦。

设计方案

实时处理采用flink sql,实时入库druid方式采用 ,另一种方式入库方式,tranquility,这种方式测试下来问题多多,放弃了。数据流向如下图。

Flink+Druid构建实时OLAP的探索

 

场景举例

实时计算课堂连接掉线率。此事件包含两个埋点上报,进入教室和掉线分别上报数据。druid设计的字段

flink的处理

将上报的数据进行解析,上报使用的是json格式,需要解析出所需要的字段然后发送到kafka。字段包含如下

systime,datetime格式
pt,格式yyyy-mm-dd
eventid,事件类型(enterroom|disconnect)
lessonid,课程id
druid处理

启动druid supervisor,消费kafka里的数据,使用预聚合,配置如下

{
  "type": "kafka",
  "dataschema": {
    "datasource": "sac_core_analyze_v1",
    "parser": {
      "parsespec": {
        "dimensionsspec": {
          "spatialdimensions": [],
          "dimensions": [
            "eventid",
            "pt"
          ]
        },
        "format": "json",
        "timestampspec": {
          "column": "systime",
          "format": "auto"
        }
      },
      "type": "string"
    },
    "metricsspec": [
      {
            "filter": {
                "type": "selector",
                "dimension": "msg_type",
                "value": "disconnect"
            },
            "aggregator": {
                "name": "lesson_offline_molecule_id",
                "type": "cardinality",
                "fields": ["lesson_id"]
            },
            "type": "filtered"
        }, {
            "filter": {
                "type": "selector",
                "dimension": "msg_type",
                "value": "enterroom"
            },
            "aggregator": {
                "name": "lesson_offline_denominator_id",
                "type": "cardinality",
                "fields": ["lesson_id"]
            },
            "type": "filtered"
        }
    ],
    "granularityspec": {
      "type": "uniform",
      "segmentgranularity": "day",
      "querygranularity": {
        "type": "none"
      },
      "rollup": true,
      "intervals": null
    },
    "transformspec": {
      "filter": null,
      "transforms": []
    }
  },
  "tuningconfig": {
    "type": "kafka",
    "maxrowsinmemory": 1000000,
    "maxbytesinmemory": 0,
    "maxrowspersegment": 5000000,
    "maxtotalrows": null,
    "intermediatepersistperiod": "pt10m",
    "basepersistdirectory": "/tmp/1564535441619-2",
    "maxpendingpersists": 0,
    "indexspec": {
      "bitmap": {
        "type": "concise"
      },
      "dimensioncompression": "lz4",
      "metriccompression": "lz4",
      "longencoding": "longs"
    },
    "buildv9directly": true,
    "reportparseexceptions": false,
    "handoffconditiontimeout": 0,
    "resetoffsetautomatically": false,
    "segmentwriteoutmediumfactory": null,
    "workerthreads": null,
    "chatthreads": null,
    "chatretries": 8,
    "httptimeout": "pt10s",
    "shutdowntimeout": "pt80s",
    "offsetfetchperiod": "pt30s",
    "intermediatehandoffperiod": "p2147483647d",
    "logparseexceptions": false,
    "maxparseexceptions": 2147483647,
    "maxsavedparseexceptions": 0,
    "skipsequencenumberavailabilitycheck": false
  },
  "ioconfig": {
    "topic": "sac_druid_analyze_v2",
    "replicas": 2,
    "taskcount": 1,
    "taskduration": "pt600s",
    "consumerproperties": {
      "bootstrap.servers": "bd-prod-kafka01:9092,bd-prod-kafka02:9092,bd-prod-kafka03:9092"
    },
    "polltimeout": 100,
    "startdelay": "pt5s",
    "period": "pt30s",
    "useearliestoffset": false,
    "completiontimeout": "pt1200s",
    "latemessagerejectionperiod": null,
    "earlymessagerejectionperiod": null,
    "stream": "sac_druid_analyze_v2",
    "useearliestsequencenumber": false
  },
  "context": null,
  "suspended": false
}

 

最重要的配置是metricsspec,他主要定义了预聚合的字段和条件。

数据查询

数据格式如下

pt eventid lesson_offline_molecule_id lesson_offline_denominator_id
2019-08-09 enterroom "aqaaaaaaaa==" "aqaaaaaaaa=="
2019-08-09 disconnect "aqaaaaaaaa==" "aqaaaaaaaa=="

结果可以按照这样的sql出

select pt,cast(approx_count_distinct(lesson_offline_molecule_id) as double)/cast(approx_count_distinct(lesson_offline_denominator_id) as double) from sac_core_analyze_v1 group by pt

可以使用druid的接口查询结果,肥肠的方便~