Flink+Druid构建实时OLAP的探索
程序员文章站
2022-07-05 14:20:37
场景 k12在线教育公司的业务场景中,有一些业务场景需要实时统计和分析,如分析在线上课老师数量、学生数量,实时销售额,课堂崩溃率等,需要实时反应上课的质量问题,以便于对整个公司的业务情况有大致的了解。 方案对比 对比了很多解决方案,如下几种,列出来供参考。 设计方案 实时处理采用Flink SQL, ......
场景
k12在线教育公司的业务场景中,有一些业务场景需要实时统计和分析,如分析在线上课老师数量、学生数量,实时销售额,课堂崩溃率等,需要实时反应上课的质量问题,以便于对整个公司的业务情况有大致的了解。
方案对比
对比了很多解决方案,如下几种,列出来供参考。
方案 | 实时入库 | sql支持度 |
---|---|---|
spark+carbondata | 支持 | spark sql语法丰富 |
kylin | 不支持 | 支持join |
flink+druid | 支持 | 0.15以前不支持sql,不支持join |
- 上一篇文章所示,使用spark+carbondata也是一种解决方案,但是他的缺点也是比较明显,如不能和flink进行结合,因为我们整个的大数据规划的大致方向是,spark用来作为离线计算,flink作为实时计算,并且这两个大方向短时间内不会改变;
- kylin一直是老牌olap引擎,但是有个缺点无法满足我们的需求,就是在技术选型的那个时间点kylin还不支持实时入库(后续2.0版本支持实时入库),所以就选择了放弃;
- 使用flink+druid方式实现,这个时间选择这个方案,简直是顺应潮流呀,flink现在如日中天,各大厂都在使用,druid是olap的新贵,关于它的文章也有很多,我也不赘述太多。有兴趣的可以看下,我的博客其它文章也有最新版本的安装教程,实操方案哦。
设计方案
实时处理采用flink sql,实时入库druid方式采用 ,另一种方式入库方式,tranquility,这种方式测试下来问题多多,放弃了。数据流向如下图。
场景举例
实时计算课堂连接掉线率。此事件包含两个埋点上报,进入教室和掉线分别上报数据。druid设计的字段
flink的处理
将上报的数据进行解析,上报使用的是json格式,需要解析出所需要的字段然后发送到kafka。字段包含如下
systime,datetime格式 pt,格式yyyy-mm-dd eventid,事件类型(enterroom|disconnect) lessonid,课程id
druid处理
启动druid supervisor,消费kafka里的数据,使用预聚合,配置如下
{ "type": "kafka", "dataschema": { "datasource": "sac_core_analyze_v1", "parser": { "parsespec": { "dimensionsspec": { "spatialdimensions": [], "dimensions": [ "eventid", "pt" ] }, "format": "json", "timestampspec": { "column": "systime", "format": "auto" } }, "type": "string" }, "metricsspec": [ { "filter": { "type": "selector", "dimension": "msg_type", "value": "disconnect" }, "aggregator": { "name": "lesson_offline_molecule_id", "type": "cardinality", "fields": ["lesson_id"] }, "type": "filtered" }, { "filter": { "type": "selector", "dimension": "msg_type", "value": "enterroom" }, "aggregator": { "name": "lesson_offline_denominator_id", "type": "cardinality", "fields": ["lesson_id"] }, "type": "filtered" } ], "granularityspec": { "type": "uniform", "segmentgranularity": "day", "querygranularity": { "type": "none" }, "rollup": true, "intervals": null }, "transformspec": { "filter": null, "transforms": [] } }, "tuningconfig": { "type": "kafka", "maxrowsinmemory": 1000000, "maxbytesinmemory": 0, "maxrowspersegment": 5000000, "maxtotalrows": null, "intermediatepersistperiod": "pt10m", "basepersistdirectory": "/tmp/1564535441619-2", "maxpendingpersists": 0, "indexspec": { "bitmap": { "type": "concise" }, "dimensioncompression": "lz4", "metriccompression": "lz4", "longencoding": "longs" }, "buildv9directly": true, "reportparseexceptions": false, "handoffconditiontimeout": 0, "resetoffsetautomatically": false, "segmentwriteoutmediumfactory": null, "workerthreads": null, "chatthreads": null, "chatretries": 8, "httptimeout": "pt10s", "shutdowntimeout": "pt80s", "offsetfetchperiod": "pt30s", "intermediatehandoffperiod": "p2147483647d", "logparseexceptions": false, "maxparseexceptions": 2147483647, "maxsavedparseexceptions": 0, "skipsequencenumberavailabilitycheck": false }, "ioconfig": { "topic": "sac_druid_analyze_v2", "replicas": 2, "taskcount": 1, "taskduration": "pt600s", "consumerproperties": { "bootstrap.servers": "bd-prod-kafka01:9092,bd-prod-kafka02:9092,bd-prod-kafka03:9092" }, "polltimeout": 100, "startdelay": "pt5s", "period": "pt30s", "useearliestoffset": false, "completiontimeout": "pt1200s", "latemessagerejectionperiod": null, "earlymessagerejectionperiod": null, "stream": "sac_druid_analyze_v2", "useearliestsequencenumber": false }, "context": null, "suspended": false }
最重要的配置是metricsspec,他主要定义了预聚合的字段和条件。
数据查询
数据格式如下
pt | eventid | lesson_offline_molecule_id | lesson_offline_denominator_id |
---|---|---|---|
2019-08-09 | enterroom | "aqaaaaaaaa==" | "aqaaaaaaaa==" |
2019-08-09 | disconnect | "aqaaaaaaaa==" | "aqaaaaaaaa==" |
结果可以按照这样的sql出
select pt,cast(approx_count_distinct(lesson_offline_molecule_id) as double)/cast(approx_count_distinct(lesson_offline_denominator_id) as double) from sac_core_analyze_v1 group by pt
可以使用druid的接口查询结果,肥肠的方便~
推荐阅读
-
使用D3.js构建实时图形的示例代码
-
EasyNVR RTSP转HLS(m3u8+ts)流媒体服务器前端构建之:bootstrap-datepicker日历插件的实时动态展现
-
Kafka能做什么?十分钟构建你的实时数据流管道
-
日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践 数据结构
-
Flink+Druid构建实时OLAP的探索
-
基于构建实时WEb应用的HTML5 WebSocket协议
-
python和websocket构建实时日志跟踪器的步骤
-
云栖干货回顾 | 更强大的实时数仓构建能力!分析型数据库PostgreSQL 6.0新特性解读
-
云栖干货回顾 | 更强大的实时数仓构建能力!分析型数据库PostgreSQL 6.0新特性解读
-
使用D3.js构建实时图形的示例代码