TopK算法及实现

1. 问题描述
  在大规模数据处理中,常遇到的一类问题是,在海量数据中找出出现频率最高的前K个数,或者从海量数据中找出最大的前K个数,这类问题一般称为“top K”问题,如:在搜索引擎中,统计搜索最热门的10个查询词;在歌曲库中统计下载率最高的前10首歌等等。

2. 当前解决方案 c++

  针对top k类问题,一般比较好的方案是【分治+trie树/hash+小顶堆】,即先将数据集按照hash方法分解成多个小数据集,而后使用trie树或者hash统计每一个小数据集中的query词频,以后用小顶堆统计出每一个数据集中出频率最高的前K个数,最后在全部top K中求出最终的top K。
  实际上,最优的解决方案应该是最符合实际设计需求的方案,在实际应用中,可能有足够大的内存,那么直接将数据扔到内存中一次性处理便可,也可能机器有多个核,这样能够采用多线程处理整个数据集。

本文针对不一样的应用场景,介绍了适合相应应用场景的解决方案。
3. 解决方案
3.1 单机+单核+足够大内存
  设每一个查询词平均占8Byte,则10亿个查询词所需的内存大约是10^9*8=8G内存。若是你有这么大的内存,直接在内存中对查询词进行排序,顺序遍历找出10个出现频率最大的10个便可。这种方法简单快速,更加实用。固然,也能够先用HashMap求出每一个词出现的频率,而后求出出现频率最大的10个词。

3.2 单机+多核+足够大内存
  这时能够直接在内存中实用hash方法将数据划分红n个partition,每一个partition交给一个线程处理,线程的处理逻辑是同3.1节相似,最后一个线程将结果归并。
该方法存在一个瓶颈会明显影响效率,即数据倾斜,每一个线程的处理速度可能不一样,快的线程须要等待慢的线程,最终的处理速度取决于慢的线程。解决方法是,将数据划分红c*n个partition(c>1),每一个线程处理完当前partition后主动取下一个partition继续处理,直到全部数据处理完毕,最后由一个线程进行归并。

3.3 单机+单核+受限内存
  这种状况下,须要将原数据文件切割成一个一个小文件,如,采用hash(x)%M,将原文件中的数据切割成M小文件,若是小文件仍大于内存大小,继续采用hash的方法对数据文件进行切割,直到每一个小文件小于内存大小,这样,每一个文件可放到内存中处理。采用3.1节的方法依次处理每一个小文件。

3.4 多机+受限内存
  这种状况下,为了合理利用多台机器的资源,可将数据分发到多台机器上,每台机器采用3.3节中的策略解决本地的数据。可采用hash+socket方法进行数据分发。
从实际应用的角度考虑,3.1~3.4节的方案并不可行,由于在大规模数据处理环境下,做业效率并非首要考虑的问题,算法的扩展性和容错性才是首要考虑的。算法应该具备良好的扩展性,以便数据量进一步加大(随着业务的发展,数据量加大是必然的)时,在不修改算法框架的前提下,可达到近似的线性比;算法应该具备容错性,即当前某个文件处理失败后,能自动将其交给另一个线程继续处理,而不是从头开始处理。
  Top k问题很适合采用MapReduce框架解决,用户只需编写一个map函数和两个reduce 函数,而后提交到Hadoop(采用mapchain和reducechain)上便可解决该问题。对于map函数,采用hash算法,将hash值相同的数据交给同一个reduce task;对于第一个reduce函数,采用HashMap统计出每一个词出现的频率,对于第二个reduce 函数,统计全部reduce task输出数据中的top k便可。

  附上小根堆实现的c/c++代码: 算法

/*
 *交换函数
*/
template<typename T>
inline void Swap( T &a, T &b)
{
	T c;
	c = a;
	a = b;
	b = c;
}
/*

 */
template<typename T>
void HeapAdjustSmall(T *array, int i, int nLength)
{ 
	int nChild; 
	T   tTemp; 
	for (tTemp = array[i]; 2 * i + 1 < nLength; i = nChild) { 
		nChild = 2 * i + 1;  
		if (nChild < nLength - 1 && array[nChild + 1] < array[nChild]) 
			++nChild; 
		if (tTemp > array[nChild]) { 
				array[i]= array[nChild]; 
		}
		else { 
			break; 
		} 
			array[nChild]= tTemp; 
	} 
}
/*
 *@ µ÷ÕûÐòÁеÄÇ°°ë²¿·ÖÔªËØ,µ÷ÕûÍêÖ®ºóµÚÒ»¸öÔªËØÊÇÐòÁеÄ×îСµÄÔªËØ
 */
template<typename T>
void HeapCreate(T *array, int length) {
	 
	for (int i = length / 2 - 1; i >= 0; --i) 
	{ 
		HeapAdjustSmall(array, i, length); 
	} 
} 
/*
*/
template<typename T>
void HeapSort(T *array, int length) { 
	for (int i = length - 1; i > 0; --i) { 
		Swap(array[0], array[i]); 
		HeapAdjustSmall(array, 0, i); 
	} 
}

template<typename T>
void HeapUpdate(T *array, int nLength, T *value) { 

	if ( *value > array[0] ) {
		array[0] = *value;
		HeapAdjustSmall(array, 0, nLength);
	}else
		return;
}

template <typename T>
inline bool GetTopK(T *pData, const se_uint32_t num, se_uint32_t k){
	
	num_t size = 0;
	HeapCreate(pData, k);
	for ( int i = k; i < num; ++i ) {
		HeapUpdate( pData, k, (pData + i) );
	}
	HeapSort(pData, k);
	return true;
}
#endif