基于Morphia实现MongoDB按小时、按天聚合操作
mongodb按照天数或小时聚合
需求
最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图.
实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库*用户后续查询.
涉及到的技术栈分别为:spring boot
,mongodb
,morphia
.
数据模型
@data @builder @entity(value = "rawdevstatus", noclassnamestored = true) // 设备状态索引 @indexes({ // 设置数据超时时间(ttl,mongodb根据ttl在后台进行数据删除操作) @index(fields = @field("time"), options = @indexoptions(expireafterseconds = 3600 * 24 * 72)), @index(fields = {@field("userid"), @field(value = "time", type = indextype.desc)}) }) public class rawdevstatus { @id @jsonproperty(access = jsonproperty.access.write_only) private objectid objectid; private string userid; private instant time; @embedded("points") list<point> protocolpoints; @data @allargsconstructor public static class point { /** * 协议类型 */ private protocol protocol; /** * 设备总数 */ private integer total; /** * 设备在线数目 */ private integer onlinenum; /** * 处于启用状态设备数目 */ private integer enablenum; } }
上述代码是设备状态
实体类,其中设备状态数据是按照设备所属协议
进行区分的.
@data @builder @entity(value = "aggregationdevstatus", noclassnamestored = true) @indexes({ @index(fields = @field("expireat"), options = @indexoptions(expireafterseconds = 0)), @index(fields = {@field("userid"), @field(value = "time", type = indextype.desc)}) }) public class aggregationdevstatus { @id @jsonproperty(access = jsonproperty.access.write_only) private objectid objectid; /** * 用户id */ private string userid; /** * 设备总数 */ private double total; /** * 设备在线数目 */ private double onlinenum; /** * 处于启用状态设备数目 */ private double enablenum; /** * 聚合类型(按照小时还是按照天聚合) */ @property("aggduration") private aggregationduration aggregationduration; private instant time; /** * 动态设置文档过期时间 */ private instant expireat; }
上述代码是期待的聚合结果,其中构建两个索引:(1)超时索引;(2)复合索引,程序会根据用户名以及时间查询设备状态聚合结果.
聚合操作符介绍
聚合操作类似于管道,管道中的每一步操作产生的中间结果作为下一步的输入源,最终输出聚合结果.
此次聚合主要涉及以下操作:
-
$project
:指定输出文档中的字段. -
$unwind
:拆分数据中的数组; -
match
:选择要处理的文档数据; -
group
:根据key
分组聚合结果.
原始聚合语句
db.getcollection('raw_dev_status').aggregate([ {$match: { time:{$gte: isodate("2019-06-27t00:00:00z")}, } }, {$unwind: "$points"}, {$project: { userid:1,points:1, tmp: {$datetostring: { format: "%y:%m:%dt%h:00:00z", date: "$time" } } } }, {$project: { userid:1,points:1, grouptime: {$datefromstring: { datestring: "$tmp", format: "%y:%m:%dt%h:%m:%sz", } } } }, {$group: { _id:{user_id:'$userid', cal_time:'$grouptime'}, devtotal:{'$avg':'$points.total'}, onlinetotal:{'$avg':'$points.onlinenum'}, enabletotal:{'$avg':'$points.enablenum'} } }, ])
上述代码是按小时聚合数据,以下来逐步介绍处理思路:
(1) $match
根据小时聚合数据,因为只需要获取近24小时的聚合结果,所以对数据进行初步筛选.
(2) $unwind
raw_dev_status
中的设备状态是按照协议区分的数组,因此需要对其进行展开,以便下一步进行筛选;
(3) $project
{$project: { userid:1,points:1, tmp: {$datetostring: { format: "%y:%m:%dt%h:00:00z", date: "$time" } } } }
选择需要输出的数据,分别为:userid
,points
以及tmp
.
需要注意,为了按照时间聚合,对$time
属性进行操作,提取%y:%m:%dt%h
时信息至$tmp
作为下一步的聚合依据.
如果需要按天聚合,则
format
数据可修改为:%y:%m:%dt00:00:00z
即可满足要求.
(4) $project
{$project: { userid:1,points:1, grouptime: {$datefromstring: { datestring: "$tmp", format: "%y:%m:%dt%h:%m:%sz", } } } }
因为上一步project
操作中,tmp
为字符串数据,最终的聚合结果需要时间戳(主要懒,不想在程序中进行转换操作).
因此,此处对$tmp
进行操作,转换为时间类型数据,即grouptime
.
(5) $group
对聚合结果进行分类操作,并生成最终输出结果.
{$group: { # 根据_id进行分组操作,依据是`user_id`以及`$grouptime` _id:{user_id:'$userid', cal_time:'$grouptime'}, # 求设备总数平均值 devtotal:{'$avg':'$points.total'}, # 求设备在线数平均值 onlinetotal:{'$avg':'$points.onlinenum'}, # ... enabletotal:{'$avg':'$points.enablenum'} } }
代码编写
此处odm
选择morphia
,亦可以使用mongotemplate
,原理类似.
/** * 创建聚合条件 * * @param pasttime 过去时间段 * @param datetostring 格式化字符串(%y:%m:%dt%h:00:00z或%y:%m:%dt00:00:00z) * @return 聚合条件 */ private aggregationpipeline createaggregationpipeline(instant pasttime, string datetostring, string stringtodate) { query<rawdevstatus> query = datastore.createquery(rawdevstatus.class); return datastore.createaggregation(rawdevstatus.class) .match(query.field("time").greaterthanoreq(pasttime)) .unwind("points", new unwindoptions().preservenullandemptyarrays(false)) .match(query.field("points.protocol").equal("all")) .project(projection.projection("userid"), projection.projection("points"), projection.projection("converttime", projection.expression("$datetostring", new basicdbobject("format", datetostring) .append("date", "$time")) ) ) .project(projection.projection("userid"), projection.projection("points"), projection.projection("converttime", projection.expression("$datefromstring", new basicdbobject("format", stringtodate) .append("datestring", "$converttime")) ) ) .group( group.id(group.grouping("userid"), group.grouping("converttime")), group.grouping("total", group.average("points.total")), group.grouping("onlinenum", group.average("points.onlinenum")), group.grouping("enablenum", group.average("points.enablenum")) ); } /** * 获取聚合结果 * * @param pipeline 聚合条件 * @return 聚合结果 */ private list<aggregationmiddevstatus> getaggregationresult(aggregationpipeline pipeline) { list<aggregationmiddevstatus> statuses = new arraylist<>(); iterator<aggregationmiddevstatus> resultiterator = pipeline.aggregate( aggregationmiddevstatus.class, aggregationoptions.builder().allowdiskuse(true).build()); while (resultiterator.hasnext()) { statuses.add(resultiterator.next()); } return statuses; } //...................................................................................... // 获取聚合结果(省略若干代码) aggregationpipeline pipeline = createaggregationpipeline(pasttime, datetostring, stringtodate); list<aggregationmiddevstatus> midstatuses = getaggregationresult(pipeline); if (collectionutils.isempty(midstatuses)) { log.warn("can not get dev status aggregation result."); return; }
ps:
如果您觉得我的文章对您有帮助,可以扫码领取下红包或扫码支持(随意多少,一分钱都是爱),谢谢!
支付宝红包 | 支付宝 | 微信 |
---|---|---|