海量数据处理(2):最高频K项问题
海量数据处理(2):最高频K项问题
一、简介
这个问题即:找到一个大文件或者数据流中出现频率最高的 K 项。问题的难点在于,如果条件不一样,解决的办法是完全不一样的,比如:
- 是否需要精确的 Top K 结果?即,是否允许小概率出错。
- 数据是离线的还是在线的?即是一个大文件的形式计算一次得到一个结果,还是数据流的形式实时返回结果。
如果对于一个大文件(也就是说离线的),只是简单的找到前k大的,使用quick select
算法即可。在线数据流如何处理?很简单,只要拿新加进来的数和前k大中的最后一个个数比较即可,新来的比它大,那就把最后一个踢出。问题就转变为集合中维护最小的数,马上就想到了min heap
。
离线问题---->quick select算法
在线问题---->维护minHeap
LintCode 545. Top K Largest Numbers
public class Solution {
private int k;
private Queue<Integer> minHeap;
public Solution(int k) {
this.k = k;
minHeap = new PriorityQueue<>(k);
}
public void add(int num) {
if (minHeap.size() < k) {
minHeap.offer(num);
} else {
if (num > minHeap.peek()) {
minHeap.poll();
minHeap.offer(num);
}
}
}
public List<Integer> topk() {
Iterator<Integer> it = minHeap.iterator();
List<Integer> res = new ArrayList<>();
while (it.hasNext()) {
res.add(it.next());
}
Collections.sort(res, Collections.reverseOrder());
return res;
}
}
二、离线处理 Top K问题
离线问题的主要处理步骤是
- 首先用hash表统计所有项的出现次数
- 寻遍每一个出现过的项,用最大K项的方法,获得最大的前K项
1、MapReduce处理离线 Top K 步骤
- 通过 Map 步骤,将每一个文件中的单词一个个取出,每个单词构造一个 <Word, 1> 的 Key-value 二元组,作为 Map 的输出。
- 通过 Reduce 的步骤,每台机器(Reducer)会处理若干个不同的 Key,在每个 Reducer 一开始初始化的时候,构建一个最小堆,输入 key(某个 word) 和他对应的values(可以假设 values 就是一堆 1,事实上 Map Reduce 会帮你做一些优化,导致有可能 value 已经被加过,所以实际处理的时候,还是老老实实的把 values 加起来,而不是看一下 values 有多少个)。那么我们把所有的 values 加起来就是当前这个 key(某个 word)的出现次数。那么当我们拿到这个单词的出现次数之后,就可以在当前的 Reducer 里去和最小堆里的第K大比大小,来决定是否淘汰当前的第K大了。Reducer 在处理完他需要处理的数据之后,就输出他得到的 Top K。
- 由于有多个 Reducers,因此我们会得到多个 Top K,最后还需要从这些输出中过一遍,得到最终的 Top K。这个步骤已经在 Map Reduce 之外了,用一个单独的代码扫一遍就可以了。
实际情况下,如果真的是非常庞大的数据,全表扫描的时间消耗很长,而且把所有数据放到内存中,这个做法依旧不可行,即使用了MapReduce,用了很多台机器,具体分配到每台机器,也可能出现无法全部加载到内存的情况,我们不能一碰到情况就加机器。
那就是面试官问:假设现在只有一台机器,内存为 1G,你有一个 1T 大小的文件,需要统计里面最高频的 K 个单词,你该怎么做?
答案是:哈希算法
2、Hash算法
哈希函数对于同一个 Key,会返回一个固定的,无规律的整数值。虽然哈希值是可能重复的,并不是一对一的,但并不影响我们的计算。这样我们处理离线问题就分成了下面这三步:
- 先将文件扫描一次,把每个单词作为 Key,算一下他的哈希值,然后模上大概 2000 - 10000 的这样一个数。之所以取这这么一个数是因为,内存的大小是 1G,那么如果将 1T 的文件分成若干个 1G 大小的小文件的话,那么理想需要 1000 个文件(1024G/1G)。平均下来就是将所有的单词分成 1000 组,每组大概就是 1G 个不同的单词(理想状况),实际上处理的时候,分成 2000 组比较保险。10000 组当然更保险了,但是可能就没有合理利用上内存了。实际做的时候,可以先看一下分成 2000 行不行,不行的话,再放大分组数。
- 对于每个文件,分别导入内存进行处理,使用哈希表+最小堆(MapReduce那块)。每一组文件得到一个 Top K。
- 类似于 Map Reduce 一样,我们得到了若干个 Top K,我们最后把这若干个 Top K 再合并一次就好了。
三、在线处理 Top K 问题
数据流问题的特点是没有第二次从头访问数据的机会。因此在离线算法中,先通过哈希表(HashMap)计数,再通过堆(Heap)来统计Top K的方法就行不通了。那在线算法的思路就是边计数,边比较Top K。这种算法的空间复杂度消耗和数据流中流过的数据量总大小有关,如果数据量一大,根本不好控制内存消耗。想要找一个在线的、精确的、省空间的TOP K算法是很困难的,只能牺牲掉准确性,用精度换空间。这就是我们优化的余地。(就好比热搜TOP 10里面 排名不用非常精确)
常见的精度换空间算法有:Lossy Counting、Efficient Count、Hash Count等
1、HashCount
class TopKAnalyzer:
def __init__(self,k):
self.k = k
self.hash_heap = HashHeap()
self.hash_count = 开一个数组,内存有多大开多大
def add(self,word):
index = hashfunc(word)% self.hash_count.size
self.hash_count[index]+=1
word_count = self.hash_count[index]
...
def topk(self):
...
这里将原本记录所有单词的出现次数的哈希表,换成了一个根据内存大小能开多大开多大的数组。这个数组的每个下标存储了“某些”单词的出现次数。使用了hashfun函数计算每个单词的hashcode,将hashcode模整个hashcount数组的大小得到一个下标(index),用这个下标对应的计数来代表这个单词的出现次数。有了这个单词的出现次数之后,再拿去 hash heap 里进行比较就可以了。
问题是如果有两个单词,计算下来的index一样,一个单词把另一个单词挤掉了,实际上根据长尾效应(统计学名词。正态曲线中间的突起部分叫头,两边相对平缓的部分叫尾,人们需求的角度来看,大多数的需求会集中在头部,分布在尾部的只是少量个性化的需求),由于 Top K 的 K 肯定是远小于 N(整个数据集的),而 Top K 的这些数据项的计数又远远大于其他的数据项。因此,Top K 的 index 扎堆的可能性是非常非常小的。
四、面试问题
1、设计一个听歌统计系统,返回用户 7 天内听的最多 10 首的歌。
(1)问题分析
几个问题条件:
- 7天和10首歌这个数字是固定的么?有可能一会儿7天一会儿10天,一会儿10首歌一会儿8首歌么?
- 对实时性要求严格么?是否允许一定时间的延迟?比如一首个一分钟内被点爆,是否需要在这1分钟之内在榜单中体现出来?
澄清问题是面试中重要的一个步骤,因为上述问题的答案,稍有不同,则算法的设计,系统的设计就截然不同。我们先做如下的合理假设:
- 7天10首歌这两个数字是固定的。
- 对实时性要求不严格,可以有1小时的误差。
(2)离线算法
通常来说,系统都会进行一些 log。比如用户在什么时候听了什么歌曲,都会被作为一条条的log 记录下来,这个时候,我们可以每小时运行一次分析程序,计算最近7天被听的最多的10首歌。这个分析程序则读取最近 7 天的听歌记录,用前面的 Hash + Heap 的方法进行统计即可。如果这个记录过大,需要加速的话,还可以使用 Map Reduce 来提速。
-
缺点:
- 每小时都进行一次对前7天的数据统计,若数据量很大,使用 Map Reduce 则会耗费很多计算资源。
- 如果系统的实时性要求变高,则该方法很有可能不奏效
-
解决方案:提出基于桶(Bucket)的统计方法
- 聚合(Aggregate):将用户的同个记录,按照1小时为单位进行一次聚合(Aggregate),即整合成一个 Hash 表,Key是歌曲的id,Value是歌曲在这1小时之内被播放的次数。这种方法的好处在于,因为很多歌曲,特别是热门歌曲,是被高频率点播的,这个时候没有必要去一条一条的记录点播记录,只需要记录一个1小时的统计即可。这里每个小时就是一个桶(Bucket),比如当前时刻是 1月1日的18点,那么18点之后,19点之前的点播记录,都放在18点的这个桶里,进行聚合统计。
- 滑动窗口(Sliding Window):7天的话,只需要在内存中保存 7 * 24 = 144 个桶,随着时间轴的推移,旧的桶则可以被删除。每次需要获得 Top 10 的时候,则将这 144 个桶的结果进行合并即可。
-
关于桶的问题
- 如果桶统计的hash表很大,无法放进能存,那么在每个桶的局部统计中,可以删除value很小的key(长尾理论),他们都没用,但却占据了很大的空间
- 桶是同事存放在内存和硬盘的,存在内存中是为了更快计算top10,存在硬盘中,防止断电,并且即便桶里的数据没被存下来,可以利用数据库中的log,重新还原每个桶里的hash表
(3)在线算法
由于有7天这个窗口,我们要做到:
- 新数据来的时候,需要丢弃对应的7天前的旧数据。
- 7天之内的数据,都应该按照某种带着时间标记的方式被保存下来,而不是只有一个计数。
- 在线 Hash + Heap 的方法“可能”不再奏效,因为跌出前10名的歌曲,还可能在过短时间后回到前10名。而之前介绍中我们在 Heap 中保存的是前10名,跌出前10名的元素不再有机会回到前10名,则无需保存。
2、在 10 亿个数中找最小的 100 万个数(假设内存只能放下 100 万个数)
使用一个最大堆保存最小的前 100 万个数。循环每个数的过程中,和 Max Heap 的堆顶比较,看看是否能被加入最小前 100 万个数里。可以结合MapReduce以及HashCount算法来讲。
上一篇: 晶圆为什么是圆形的?
下一篇: Python——while循环语句
推荐阅读