欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  互联网

Admaster数据挖掘总监谢超:大数据下BI新特性

程序员文章站 2022-06-19 12:18:44
谢超认为必须分布式存储(TB/天)、多个海量数据集(千亿行join)、差的数据质量以及不统一的数据格式(结构化、半结构化等、非结构化合并分析数据集的特点)是数据存储方案面临的挑战。...

谢超认为必须分布式存储(TB/天)、多个海量数据集(千亿行join)、差的数据质量以及不统一的数据格式(结构化、半结构化等、非结构化合并分析数据集的特点)是数据存储方案面临的挑战。

谢超表示大数据BI的新需求包括大量化(多个大数据集并行分析)、多样化(结构化、半结构化、非结构化)、快速化(Velocity)和价值(易用性)。而计算分层(流计算、块计算、全局计算)、快速分析(冗余维度、数据常驻在内存中分析)和接近价值(业务人员易用的命令、灵活的编程框架)是解决新需求的BI方案。

以下为文字实录

今天,主要介绍的是Twitter刚刚开源的一个计算框架,结合在一起实现一个快速灵活的架构。先介绍一下我们Admaster的BI业务,我们对BI的定义是什么呢?统计,发现,预测。Admaster数据主要来自于门户、客户端、硬广。门户比如搜狐,新浪,客户端,比如QQ迅雷(微博)。我们数据量非常大,基本上几个TB,还有采集,我们有自己的搜集在线问卷,同时还有微博论坛,所以我们数据其实是有结构化,有非结构化的。

Admaster系统不适用于我们,左边这两个图可以看到,这些机柜非常昂贵,我们处于成本考虑不会使用这些东西。右上角HPCC Systems,是一个更好的系统,为什么不使用?因为生态系统非常封闭,没有办法把接口和我们自己的程序结合起来。其次,他开源出来的社区版比企业版性能差很多。所以,我们不会使用这样的机器。

我们看一下传统分子方法,很多互联网公司都是这样分子数据。原始日志采集Scribe到Hadoop,然后到Hive结构理念,在做OLAP分析,比如多维数据查询。Hadoop是最近几年才出来的,但是整套BI分析流程,其实还是《数据仓库》这本书里面的关系模型,没有超出这个框架的。

互联网数据最常见的一个分析例子是用户行为分析,我们采集用户访问信息,用户看到什么广告,点击了什么广告,购买了什么,去了哪里。还有一些非结构化的信息,半结构化的信息,比如他发表的什么评论,微博上写的什么东西,还有图象,音频,视频,一些非结构化的数据。可见我们数据特点是有以下几个大到必须分布式存储,因为我们基本上都超过TB每天,而且有一个趋势不断在增长。今年可能是TB每天,明年可能就是2TB,4TB,事实上我们今年数据比去年翻了5倍,我们需要一个非常平滑的分布式架构,只需要加机器就可以,不需要改我的分析算法。

还有海量数据集非常多,有很多海量级数据集进行分析。因为我们采集全部都是日式数据,所以数据量非常少。还有我们要把结构化,半结构化的数据合并起来进行分析。因为我们数据集拥有这些特点,所以特点决定了存储和分析的方式。首先只有append,没有更新和删除,这种方式和刚才OceanBase是相似的,从字面上理解,首先要结果一个读的操作,然后再去进行一个写的措施,首先更新和删除开销肯定比读和写要大。

其次,如果我们不做更新和删除,实质上是规避了一致性。我们只有更新数据相当于带着时间和全新的数据,其实就不可能发生一致性的问题,什么最终一致性,因果一致性等等完全就规避掉了。其实今天上午大家讲的这些,大家也可以看出来一致性是非常头疼的问题。但是我们BI分析应用,完全规避了一致性问题。

其次我们要尽可能的反观新模式,反关系模式里面有一个雪花模型和星型模型,特别大的数据量情况下,我们要尽可能把纬度表冗余到事实表里面,尽量不做交易,特别一些小交易要放到事实表里面。还有一个特点,我们对计算框架灵活性的要求越来越好,而且很多算法更好理解,表达力更强。但是,就像今天上午巨先生讲的一样,MapReduce表现力还是受限,特别我们要一些发达算法的情况下有很大的问题。比如我在MapReduce使用一个共享数据群,首先编程非常复杂,其次大小是受限的,我不能灵活使用一个共享数据集。我们现在其实渐渐转到DRPC,本质就是一个分布式的远程调用,凡是在单机上能实现的分机算法一定可以利用DRPC实现,我们可以利用其实现各种各样的灵活分析算法。

这是BI对我们提出的新需求,首先多个非常大的数据集放在一起分析,然后我们要把结构化,半结构化,非结构化的数据放在一起分析,这就决定了不可能用基于关系型,结构化型的数据集,把结构化,半结构化,非结构化混合在一起进行分析,还有要快,Hadoop本质是一个离线分析,首先启动非常慢,从硬盘读取数据,我们需要一个非常快,甚至达到秒级,让用户无缝体现的一个平台,其次我们还需要业务人员尽可能简单的使用它。

为了解决刚才的这些需求,现在有以下一些BI方案。首先是计算分层,我们把数据分析的算法按照计算方式,分成三层,流计算层,块计算,全局计算,一会我会一个层一个层介绍。快速分析有两种方案,一种是冗余纬度,其实应该叫做冗余纬度的索引,一会会详细介绍,还有一个方案把数据进行分析常驻在,使用内存分析系统。还有使用非常灵活的框架,非常灵活的数据挖掘算法。

这是我们使用的一个完整数据架构,从采集数据进入流计算层,从流计算层进入快数据和全计算层,每个齿轮其实代表着数百台服务器,整个架构是一个分布式的。流数据,会把触发的事件写入关系型数据库中,会把持续计算的结果写到里面,快数据层会把大的数据进行分析结果写到里面,同时全局数据,一般是把离线结果也写进里面,对于结果和关系数据库做一个对比,通过缓存层提供给用户。流数据框架,块数据框架,全局数据框架我们使用一种分析语言,本质上相当于编程一次可以在三个地方同时使用,这样的话无论是对开发人员,还是对业务人员来说都非常简单。

首先讲一下计算分层,比如我们可以简单的把数据分成实施层和P处理层,实施层把数据常驻在内存当中可以达到非常快的秒级分析,每次查询,我们同时去实施层和P处理层查询,然后把两个结果合并起来,这样才能得到我们最终结果。因为分层本质其实一般来说,都是按时间来分的,比如数据量非常大,每天有一个TB进来,我们只能把今天TB数据放在里面,把今天之前的数据全部进行离线分析。今天数据我可以做到一个实时分析,我在调用,比如昨天半夜跑出来的结果,把那个结果加起来,就可以得到一个非常快,相当于进一步实施分析的结果。

我们看一下Twitter的例子,是可以做到实时分析的。怎么做?Hadoop批量分析结果可能半夜做,写到他自己开发的一个KV系统里面,批量导入的数据非常快,偏重于快速的写,把这个结果加起来反馈给客户。

我们介绍一下Twitter开源的流计算框架,用途有以下几种。首先他可以做一个最快的ETL工具,ETL搞BI的都比较清楚,一个是抽取,转换,转入,大家可以简单理解数据格式转换,就把数据格式变成我们所需要的。然后他还可以利用冗余纬度数据,把小的纬度数据尽可能冗余到事实表里面,Storm是非常适合做这个事情。还有事件驱动报警,什么是事件驱动报警呢?举个例子,比如我们监测门户上面广告网站,我们监测一条一个广告发生了一个点击,但是他之前10分钟我们并没有发现有暴光,这是不合常理的,一个广告怎么可能被点击了却没有被人看到了,这可能是一个作弊事件,我们就需要记录到数据库里面。

还有一个例子,我们在5分钟之内发现同一个IP下面发现1千次点击,很明显这是一个机器控制的作弊程序,也记录到数据库里面,说明只是一个作弊事件。还有Storm可以实现一种计算,叫做持续计算,请注意持续计算是一种受限算法,能够持续计算的计算是“易并行”的,什么意思呢?比如典型的“易并行”问题,就是求coint,我可以分到10台服务器上,每一台服务器专门求一个行数,反馈给我加起来,这个结果肯定正确。同时,我上5分钟得出求和结果,和下5分钟求和结果加起来依然是正确的。

不适用的问题是什么呢?比如数据去重统计,count,distinct,只能对全局数据去重。流计算,能够计算的问题都是一些非常简单的“易并行问题,其实在很多场合也比较有用,比如记数。Storm,吞吐性能非常好,因为底层使用了ZeroMQ,ZeroMQ性能非常好,比传统要好的很多,达到了每秒将近2多万消息吞吐量,还是在一台PC上面,要超过百万很清楚,本质上就是内存拷贝,没有任何的持久化。显而易见他是有安全性的问题,如果服务器掉链,正在处理的消息就完全消失了,那怎么办呢?

我们用Storm可以确保每行消息都被正确处理,失败消息打回去重新处理。我们可以看右边这个图,比如一条消息是一句话,做什么事情,对每个单词进行记录,比如他把这句话六个单词分还进行处理,最后一个失败怎么了办?每个任务都会发送一个ACK给Storm,Storm收到5个ACK,他就认为这个消息处理全部失败,所以全部打回重新处理。通过这样的方式来确保整个Stosm不会失效,出错。

同时还有最后一道防线,用P处理层来校正,整个Storm完蛋了,Twitter也发生类似事情,Storm失效之后,当然他是有人工错误引起的,并不是Storm本身的问题,就可以从,比如过了12小时之后,可以把从这当中把数据导出到Storm里面。Storm处理数据格式是基于元组tuple,类似MapReduce数据处理序列,本质是DRPC编程框架,最灵活的进行编程。但是,我们需要自己控制内存,Storm不是所有事情都做到了。

一句话性肉元组在很多机器内存中进行M和R不断变化着自己的结构,最后变成我们想要的元组形式。可以看一个例子,最左边正方题里面是所有交易数据,A代表A商品,客户可能卖了三个型号,又一个客户买了B35商品,我们按照客户购买列出来,把哪种商品最热卖的型号列出来,其实可以看最右边的结果,我们希望生成这样的结果,客户购买一次商品是B和C,是B的35和A的23,客户购买商品是A,最终卖的是5,我们中间怎么做呢?我们只需要执行三个任务,比如第一个任务调整数据格式,其实相当于把原数据发送到三台服务器上,把后面省略号去掉。然后我们在把数据相当于按照商品A,B,C做一个覆盖,发送两台服务器上,大家可以看到最上面相当于按照A分类汇总,得出A结果,同时我们统计一下A的次序,生成A括号5,逗号2。同时再按频次来做,最后得到我们想要的结果,整个流程非常简单。

大家还可以注意到一点,任务1,任务2,任务3,并不是分的很清楚,他只是把自己叫做精简的任务处理,他只是在这些任务之间到底要不做grrouping,任务1和任务2,任务2和任务3之间是按照某个字段进行一个对比。我们可以看出Storm流计算就是一个Bolt接龙,数据相当于有一个喷射器,从最左边一行行发射出来,之后通过整个相当于任务A进行分布式处理,每个圆圈代表一个服务器,最后得出我们想要的结果。同时,他在任务处理的过程中还可以轻易进行交互,我们BoltA,B,C,D,E,组成一个相当于类似Hadoop概念,会一直常驻在内存当中,不管什么时候发生数据,我一直常驻在内存中,来这样数据就进行这样的处理逻辑。所以,他是一个持续性的流计算框架。

来多少数据我处理多少,我可以变成来一条处理一条,我也可以来5千条处理5千条得出一个结果。所以,他是一个不停在运行的流式计算框架。我们可以看一下代码的例子,非常简单。我们可以先建一Storm,然后我们在Storm加一个数据喷射口,不断喷出单词,我们做第一个任务,随机接受数据喷射口传出来数据,我们可以做一个格式转化,我还可以指定数据用几台服务器来处理,比如逗号3是第一个,我们用3台服务器处理。最后一行,可能是用fieldsGruping,相当于做一个操作,非常简单。

但是,这个例子其实会错误运行的,数据源源不断发射,最后会源源不断会导致超出服务器内存,服务器内存会爆掉。实际上我们需要每割一段时间就去清理Bolt里面的内存,把里面内存释放掉,需要我自己手工控制内存,这是一个难点。我们讲一下快计算,为什么需要快计算,显然是实施之后更有价值。比如下面这个图可以发现,在早上6点钟的时候,新西兰某个汽车品牌发现销量上涨,我们就立刻捕捉这个趋势,就做出一些决策。实际上洞察的决策,动态决策非常有价值,特别是现在,举个例子我们互联网广告行业,在美国不能说所有广告,相当一部分广告份额已经将近三分之一了,是从实时广告计价平台上购买的,包括雅虎,微软(微博)也有,相当于一条在线广告实时发布出去,我们各个采购商去竞价,如果我这个采购商,或者说广告代理商能够在秒级发现一个数据,能够在秒级发现价格有这样的上涨,或者某个地区,某个广告可以实现非常好的CTS,我可以立刻抢拍下这个广告收入增加,这时这在国外是非常活火的动态决策,尽可能快的进行挖掘发现趋势。

我们如果实现快速的块计算,在块上做呢?淘宝有一个很好的prom方案,甚至可以做到毫秒级响应,当然有一些缺点,只能固定分析几个纬度,同时导入数据花时间,相当于把所有冗余索引牺牲内存孔,现在淘宝在很多产品里面使用prom方案,他可以把前一天所有支付宝(微博)交易数据,按照不同的纬度导入不同Server里面。大家可以看里面这个图,Data Server相当于上午型和个人用途,笔记本电脑索引,有商务型索引写在Server里面,个人用电脑写在另一个Server里面。

然后Data Server2都写在里面,Data Server品牌写在里面,基于D品牌笔记本成交总量,我们把13寸所有都拿过来,得到5和6两条索引,我们再去某台实际Data Server上得到的交易金额,加起来返回给客户,通过这样一种索引达到非常快速的查询。所以,哪怕是几千万数据,上亿数据,我们依然可以做到在秒级之内返回。大家可以看到这一页,每个索引都要增加一台服务器,如果纬度固定还好,如果有上千台纬度是不是需要准备上千个服务器,肯定就没有办法用这个方案了。我们现在在用一个Storm+内存文件系统,在内存文件系统上调用Storm有一个模块,相当于LinearDRPCT opologyBuilder,包括消息处理,失败处理都分装好了。

优点是什么算法非常灵活,我可以实现各种各样的数据分析算法,而且足够的快。一般来说,只要数据,比如只有几个G基本上都是秒级可以算出来,比如我几百台服务器,每台64G内存,我可以实现几个TB数据存储,全局全部内存之中。当然缺点也非常明显,大家也都可以看的出来,数据非常容易失掉,如果掉链了全部数据都没有,我需要从硬盘上再倒出来,非常花时间。同时,我能够分析热数据大小,其实完全受限,我不能只分配给内存系统,其实不停在服务器终端大幅度移动,和Hadoop有本质不同,Hadoop设计思想是转移计算到数据这边,绝对不是让数据在不同服务器之间移动,那是绝对不行的,因为Hadoop这么快,设计思想就是把计算移到数据这边。

我在内存之间移动数据这个开销还是可以接受的,因为硬盘拷贝还是足够快的。其实我们现在最院士K-means聚类的算法是这样的,比如这么一个闪点图,我要找到比较密集的几个图,比如随机指示三个点,对每一个点进行一个聚类计算,我认为我是这个中心点里面的人,这样每个点都可以归纳到一个里面,重新求一个中心点,这样重新计算中心点,实际越来越近,补贴的迭代,这个位置会越来越向中心点移动。

如果我用K-means重新计算,都会把这些数据搭到硬盘上面来移动非常慢,如果我们有Storm的话,很简单,数据永远在这之中,这个算法会比K-means上快几百倍,非常正常。我们回顾一下流计算,块计算,全局计算。Storm同时承担了ETL,流计算,块计算工作,流计算就是一些易并行简单算法,还可以报警,发生事件组合报警,还有反观模式,同时还可以对内存文件系统上的数据进行一个快速计算快速迭代。Hadoop很明显做的和Storm不太一样,是廉价全局大数据计算。我刚才讲的很多算法必须在全局数据上计算,比如一个最简单的驱动数据,如果能够忍受误差可以这么算,有一定错误质量算法,如果你要得到一个非常准确的认同,必须在Hadoop上进行运转。所以,这两者之间完全不可能替代,Hadoop绝对做不到Storm那么快。Storm部分算法也实现不了,只能在Hadoop中做一个全局问题,需要其进行补充,所以两者之间非常好。

现在我们整个比较框架,就是Storm和Hadoop,两者互相配合,互相补充,能够实时计算尽量放到Storm上去做,Hadoop是必须的一个廉价全局大计算增长。因为我们整个BI架构是拼在一起的,相当于现在有两套系统,我们开发了DSL,这是针对业务定制语言。首先Hadoop Cascalog太复杂,首先要有一些可以简单的逻辑代码,看一看右下角这是Hadoop只有五行代码就实现了,非常简单。所以说,作为DSL可以在Admaster上看一下,甚至MapReduce都发现有四种DSL,我们自己还开发针对业务DSL,对业务人员不需要知道业务是在Hadoop上面还是storm上面,也无需知道数据是结构化还是半结构化。

左下角是我们开发的SUM,类似于微软MDS,实现一个分析功能,我列出13寸,14寸,15村,日期在2011-11-20之前,写出A品牌,我把这个交易额数据进行一个分类汇总得出结果。实际实现逻辑是非常简单的,有一个命令解析器得到纬度和度量及,数据解析,信息传递到Hadoop的map的reduce和storm的bolt中,筛选数据拼接维度。工作流拼接,根据功能拼接,Hadoop job和storm topology,合并查询,组合storm和Hadoop的查询结果。最后这些结果写入到里面,类似于刚才Twitter的一种做法,同时去查询storm和Hadoop分析结果,得到最终一个汇总。

我们再回顾一下完整的BI分析架构,这个图和刚才略有差别。这是采集数据进来之后进到storm,数据报警写在Hadoop里面,把采集到数据storm变形,一部分写入Hadoop之中,一部分写入内存分件系统,我们自己开发一套BSL,同时在HadoopDSL上跑得到分析结果。当然一般来说,offline跑昨天的数据,实时分析结果显示在Mongodb里面,有一些持续计算也写进Mongodb而里面,一个查询需要三个地方加起来,得到一个精确实时的结果。我可以得到,比如一秒前的统计结果,比如一个广告上线之后,就可以知道这个广告在1秒之前被暴光多少次,我从Hadoop,我从内存文件系统,我从Storm等结果加起来就可以知道一秒之前发生的任何事情。

最后结果将被写到MongoDB,然后用户可以进行查询得到结果。用MongoDB的原因,我们需要对三个层次最后的分析结果,我们需要在做一些分析,MongoDB的分析还是很强的,有时候我们需要在对统计之后的结果进行再统计,这是MongoDB的想象。同时,我们也有非常好的MongoDB专家,所以我们可以保证能够吃透MongoDB。