基于云计算和大数据的模拟车辆行车监控系统
系统架构">一、 系统架构
这是基于云计算和大数据的模拟车辆行车监控系统,可模拟实现在线远程对车辆行车的信息记录以及数据处理。其中,记录信息其中包括车辆的id、经过的地点(经纬度)、时间,数据处理包括对数据的排序、错误数据的排查、通过时间以及地点在地图上获得车辆行驶的轨迹、车辆相遇次数。
系统包括数据产生模块、数据接受与处理模块、数据库模块、客户端模块。其中kafka进行数据的接收,并进行数据过滤,将过滤后的数据传递给Redis,Redis再将数据存入Hbase数据库,Spark从Hbase中获得数据,将处理后的数据再传递回Hbase,客户端从Hbase中获得数据并将其展示在前端。
逻辑架构如下:
物理架构图如下:
二、数据流程分析
1. 数据采集过程分析
数据采集过程包括Kafka数据采集、Redis数据过滤、Hbase数据入库三部分,其中包括三个实体:Kafka生产者、Kafka消费者兼Redis发布者、Redis订阅者。
各自的作用如下:
Kafka生产者:负责从json文件中以行为单位读取数据源,通过Kafka生产者代码编写生产消息,将json读取的消息发布在topic上。
Kafka消费者兼Redis发布者:负责从topic上消费Kafka生产者生产的消息,将消息通过Redis发布订阅功能发布到一个信道,等待订阅者接受消息。
Redis订阅者:负责订阅发布者相应的信道,接受发布者的消息,将消息存入Hbase数据库。
数据过滤过程使用了Kafka streams对原始数据进行过滤,本小组采用HIGH-LEVEL STREAMS DSL进行处理。Kafka创建一个Filter流,流的源绑定filter-before topic,同时Kafka生成者将消息生产在这个topic上;流的出口绑定filter-after topic,Kafka消费者绑定这个topic消费消息。过滤器消息选择条件过滤掉不正确的经纬度数据,并将这部分数据存放在Redis filter 键里,合格的数据传送到filter-after topic上。
Redis的缓冲作用
在Redis订阅者上,由于生产者生产消息过快,如果选择一条一条的存入数据库,会出现存取数据过慢,导致生产者的消息经过规定的时间(本小组设置的时间是90秒)没有被消费,报出Timeout错误。为避免这样的问题,选择每1000条数据存入数据库一次,这样的方式优点在于每1000条数据才请求连接数据库一次。请求连接数据库是较耗时的一个步骤,频繁的请求连接数据库会拖慢程序的运行时长。在基础项时,选择将所有数据存入list,然后一次请求数据库连接,将所有数据存入数据库,请求数据库连接的时间占比很小。
出现的问题以及解决方案
1000条数据一次存入无法达到实时的记录,这是本小组项目的一个缺点,但同时,这个问题可以通过选择storm 流式框架数据处理来解决,直接在Kafka消费阶段对数据进行流式处理能达到实时效果。
2.数据查询和离线处理分析
数据查询:数据采集完成,所有数据存入Hbase数据库的‘Record’表中,行键设计为eid、placeid、time组合键,在数据查询时,需要将行键截取,获取对应的数据,与查询条件比较,返回满足条件的数据。
spark处理:spark分析过程包含三个阶段——程序源码发布到master节点、master将map程序分配给map节点进行map操作、master将reduce程序分配给reduce节点进行reduce操作。数据流向是map节点从master节点获取Hbase数据索引,进而获取数据,接着运行map程序将数据分散处理。Map程序处理完的数据流入reduce进行聚合处理,最后将reduce结果存入Hbase数据库中。
问题:在进行spark分析时,限于物理机,整个集群仅有一个master节点、一个map worker节点、一个reduce worker节点,在数据分析时出现的情况是map worker节点的工作任务量远远大于reduce worker的工作任务量。在任务启动时,集群中各个节点使用top命令查看当前节点的CPU占比,发现在整个任务中map worker 节点长时间高CPU占比工作,而reduce worker节点在map worker节点处理完成后有10秒左右的高CPU占比工作期,然后整个数据分析完成。鉴于上述的问题,考虑在主机充足的情况下,选择为map任务分配多台主机。使得任务执行量较均匀分布。
三、软件功能分析
1、完成基本搭建系统,完成过车统计功能
系统可根据输入的地点ID进行检索,显示通过该地点的车辆ID、时间、地点以及经纬度;或者根据输入的车辆ID,显示出该车辆经过的地点、经过时间以及对应地点的经纬度。
结果展示:
2.系统附加功能分析
(1)原始信息过滤
原始数据包含若干条错误记录,如经纬度不合法等,需要实时对kafka中接收到的数据进行过滤处理,将处理后的数据传递给Redis。
(2)车辆行驶轨迹重现
实现方式:我们想出了两种方法实现其轨迹重新。
A、hbase方式
建立一张新表,重新编排行键。
在hbaseTest类中完成具体操作。首先使用HBaseConf类中的getConnection()方法与HBase数据库进行连接。然后利用HBaseConf类中getTableByName()方法得到对表“Record”表的操作句柄。同时使用相同的方法得到对Trace表的操作句柄。之后,使用Table类中的getScanner()方法得到Record表中的所有数据,并记录中“result”中。因为重现轨迹的时候只需要车辆的标识信息(Eid)和车辆经过的时间(time)和经过地方的经纬度(latitude,longitude)所以我们只需要在“Trace”表中存入这些数据即可。
现在我们已经将得到的所有的“Record”表中的数据都存在了“result”中。然后将result中的所有数据扫描一遍,同时将每条记录中的“Eid,time,latitude,longitude”信息记录下来,同时将每一条记录的这些信息作为新的一条记录,以“Eid”为rowKey且以“time”为列族的第一列放在Put类的对象中,最后通过Table类的put()方法将新的记录存在“Trace”表中。这样得到的“Trace”表中的数据即会以“time”自动排序。
当所有数据被读取并被重新放入“Trace”表中后,关闭与数据库的连接,所有的信息即被重新规划好。
B、spark的MapReduce方式
展示结果:
输入要查询的车辆的ID,显示其行驶轨迹。鼠标点击地点,可显示其经纬度。
轨迹展示结果:
(3)车辆相遇次数统计
我们定义相遇为“两车之间出现在同一地点的时间间隔小于一分钟”。
首先,通过Spark从Hbase表中读取数据,自身以地点为键进行join操作,计算除自身外的车辆是否相遇;再以地点为键进行分组,同一组内的数据按照时间进行排序,遍历整个列表,找出满足小于一分钟的数据。
结果展示:
输入要查询的车辆Id,查询结果显示与之相遇过的车辆的ID以及次数。
四、实验感受及收获
在选修这门课之前,就已经对云计算与大数据产生了浓厚的兴趣,通过这学期对这个项目的完成,更加深了我对云计算与大数据的理解以及实际的应用。此次我负责的部分是hbase对数据的存储,通过这个项目我了解了nosql的特点以及运用。虽然在这个项目过程中,遇到了很多困难,但与队友们一起不厌其烦地解决了,我在这个过程中学到了很多。希望在今后的云计算与大数据的学习道路上,可以克服重重困难,加深对其的学习。
上一篇: django之路:第一天