随着Last.fm服务的发展,用户数目从数千增加到数百万,这时,存储、处理和管理这些用户数据渐渐变成一项挑战。幸运的是,当你们认识到Hadoop技术能解决众多问题以后,Hadoop的性能迅速稳定下来,并被你们积极地运用。2006年初,Last.fm开始使用Hadoop,几个月以后便投入实际应用。Last.fm使用Hadoop的理由概括以下。 html
(1)分布式文件系统为它所存储的数据(例如,网志,用户收听音乐的数据)提供冗余备份服务而不增长额外的费用。 java
(2)能够方便地经过增添便宜、普通的硬件来知足可扩展性需求。 服务器
(3)当时Last.fm财力有限,Hadoop是免费的。 网络
(4)开源代码和活跃的社区团体意味着Last.fm可以自由地修改Hadoop,从而增添一些自定义特性和补丁。 app
(5)Hadoop提供了一个弹性的容易掌握的框架来进行分布式计算。 框架
如今,Hadoop已经成为Last.fm基础平台的关键组件,目前包括2个Hadoop集群,涉及50台计算机、300个内核和100 TB的 硬盘 空间。在这些集群上,运行着数百种执行各类操做的平常做业,例如日志文件分析、A/B测试评测、即时处理和图表生成。本节的例子将侧重于介绍产生图表的处理过程,由于这是Last.fm对Hadoop的第一个应用,它展现出Hadoop在处理大数据集时比其余方法具备更强的功能性和灵活性。
一般状况下,Last.fm有两种收听信息。 分布式
用户播放本身的音乐(例如,在PC机或其余设备上听MP3文件),这种信息经过Last.fm的官方客户端应用或一种第三方应用 (有上百种)发送到Last.fm。 ide
用户收听Last.fm某个网络电台的节目,并在本地计算机上经过流技术缓冲一首歌。Last.fm播放器或站点能被用来访问这些流数据,而后它能给用户提供一些额外的功能,好比容许用户对她收听的音频进行喜好、跳过或禁止等操做。 函数
在处理接收到的数据时,咱们对它们进行分类:一类是用户提交的收听的音乐数据从如今开始,第一类数据称为“scrobble”(收藏数据);另外一类是用户收听的Last.fm的电台数据(从如今开始,第二类数据称为“radio listen”(电台收听数据)。为了不Last.fm的推荐系统出现信息反馈循环的问题,对数据源的区分是很是重要的,而Last.fm的推荐系统只使用scrobble数据。
数据命名及意义: oop
scrobble -- 收藏数据
radio listen--电台收听数据
Track Statistics程序
音乐收听信息被发送到Last.fm时,会经历验证和转换阶段,最终结果是一系列由空格分隔的文本文件,包含的信息有用户ID(userId)、音乐(磁道)ID(trackId)、这首音乐被收藏的次数(Scrobble)、这首音乐在电台中收听的次数(Radio)以及被选择跳过的次数(Skip)。表16-1包含一些采样的收听数据,后面介绍的例子将用到这些数据,它是Track Statistics程序的输入(真实数据达GB数量级,而且具备更多的属性字段,为了方便介绍,这里省略了其余的字段)。
命名 | 含义 | 命名 | 含义 |
userId | 用户Id | trackId | 音乐Id |
scrobbles | 收藏次数 | skip | 跳过次数 |
radio | 收听次数 | |
|
计算不一样的听众数--Unique Listeners做业模块用于计算每一个音频的不一样收听用户数
UniqueListenerMaper:程序处理用空格分隔的原始收听数据,而后对每一个track ID(音频ID)产生相应的user ID(用户ID):
public void map(LongWritable position, Text rawLine, OutputCollector IntWritable> output, Reporter reporter) throws IOException { String[] parts = (rawLine.toString()).split(" "); int scrobbles = Integer.parseInt(parts[TrackStatisticsProgram.COL_SCROBBLES]); int radioListens = Integer.parseInt(parts[TrackStatisticsProgram.COL_RADIO]); // if track somehow is marked with zero plays - ignore if (scrobbles <= 0 && radioListens <= 0) { return; } // if we get to here then user has listened to track, // so output user id against track id IntWritable trackId = new IntWritable(Integer.parseInt(parts[TrackStatisticsProgram.COL_TRACKID])); IntWritable userId = new IntWritable(Integer.parseInt(parts[TrackStatisticsProgram.COL_USERID])); output.collect(trackId, userId); }代码释义:按空格“ ”读取内容, 排除无效数据(既没有被收藏,又没有被收听的数据),按照 音乐Id 和 用户Id 合并成key-value
UniqueListenersReducer:接收到每一个track ID对应的user ID数据列表以后,把这个列表放入Set类型对象以消除重复的用户ID数据。而后输出每一个track ID对应的这个集合的大小(不一样用户数)。可是若是某个键对应的值太多,在set对象中存储全部的reduce值可能会有内存溢出的危险。实际上尚未出现过这个问题,可是为了不这一问题,咱们能够引入一个额外的MapReduce处理步骤来删除重复数据或使用辅助排序的方法(详细内容请参考第241页的“辅助排序”小节)。
public void reduce(IntWritable trackId, Iterator values,OutputCollector output, Reporter reporter) throws IOException { Set userIds = new HashSet(); // add all userIds to the set, duplicates automatically removed (set contract) while (values.hasNext()) { IntWritable userId = values.next(); userIds.add(Integer.valueOf(userId.get())); } // output trackId -> number of unique listeners per track output.collect(trackId, new IntWritable(userIds.size())); }
代码释义:经过map处理后的key-value,按照相同的key,对value进行合并操做。
表16-2是这一做业模块的样本输入数据。map输出结果如表16-3所示,reduce输出结果如表16-4所示。
表16-2. 做业的输入
表16-3. map输出
表16-4. reduce输出
Sum做业相对简单,它只为每一个音轨累计咱们感兴趣的数据。
SumMapper 输入数据仍然是原始文本文件,可是这一阶段对输入数据的处理彻底不一样。指望的输出结果是针对每一个音轨的一系列累计值(不一样用户、播放次数、收藏次数、电台收听次数和跳过次数)。为了方便处理,咱们使用一个由Hadoop Record I/O类产生的TrackStats中间对象,它实现了WritableComparable方法(所以可被用做输出)来保存这些数据。mapper建立一个TrackStats对象,根据文件中的每一行数据对它进行值的设定,可是“不一样的用户数”(unique listener count)这一项没有填写(这项数据由merge做业模块填写)。
Map过程
public void map(LongWritable position, Text rawLine, OutputCollector output, Reporter reporter) throws IOException { String[] parts = (rawLine.toString()).split(" "); int trackId = Integer.parseInt(parts[TrackStatisticsProgram.COL_TRACKID]); int scrobbles = Integer.parseInt(parts[TrackStatisticsProgram.COL_SCROBBLES]); int radio = Integer.parseInt(parts[TrackStatisticsProgram.COL_RADIO]); int skip = Integer.parseInt(parts[TrackStatisticsProgram.COL_SKIP]); // set number of listeners to 0 (this is calculated later) // and other values as provided in text file TrackStats trackstat = new TrackStats(0, scrobbles + radio, scrobbles, radio, skip); output.collect(new IntWritable(trackId), trackstat); }释义:对于指望输出结果多维度(好比此例子中须要输出: 不一样用户、播放次数、收藏次数、电台收听次数和跳过次数 ),采用虚拟对象来实现(VO作value时,须要实现 WritableComparable或Writable )
new TrackStats(0, scrobbles + radio, scrobbles, radio, skip)根据业务建立实例:0,喜好(经过scrobbles+radio),收藏,试听过,跳过。
建立完实例后输出:(trackId,trackstat)
Reduce过程
SumReducer 在这一过程,reducer执行和mapper类似的函数——对每一个音频使用总数状况进行统计,而后返回一个总的统计数据:
public void reduce(IntWritable trackId, Iterator values,OutputCollector output, Reporter reporter) throws IOException { TrackStats sum = new TrackStats(); // holds the totals for this track while (values.hasNext()) { TrackStats trackStats = (TrackStats) values.next(); sum.setListeners(sum.getListeners() + trackStats.getListeners()); sum.setPlays(sum.getPlays() + trackStats.getPlays()); sum.setSkips(sum.getSkips() + trackStats.getSkips()); sum.setScrobbles(sum.getScrobbles() + trackStats.getScrobbles()); sum.setRadioPlays(sum.getRadioPlays() + trackStats.getRadioPlays()); } output.collect(trackId, sum); }代码释义:建立一个能够接收全部统计数据的VO,合并输出。
表16-5是这个部分做业的输入数据(和Unique Listener做业模块的输入同样)。map的输出结果如表16-6所示,reduce的输出结果如表16-7所示。
表16-5. 做业输入
表16-6. map输出
表16-7. reduce 输出
合并结果
最后一个做业模块须要合并前面两个做业模块产生的输出数据:每一个音频对应的不一样用户数和每一个音频的使用统计信息。为了可以合并这两种不一样的输入数据,咱们采用了两个不一样的mapper(对每一种输入定义一个)。两个中间做业模块被配置以后能够把他们的输出结果写入路径不一样的文件,MultipleInputs类用于指定mapper和文件的对应关系。下面的代码展现了做业的JobConf对象是如何设置来完成这一过程的:
MultipleInputs.addInputPath(conf, sumInputDir, SequenceFileInputFormat.class, IdentityMapper.class); MultipleInputs.addInputPath(conf, listenersInputDir, SequenceFileInputFormat.class, MergeListenersMapper.class);
虽然单用一个mapper也能处理不一样的输入,可是示范解决方案更方便,更巧妙。
MergeListenersMapper 这个mapper用来处理UniqueListenerJob输出的每一个音轨的不一样用户数据。它采用和SumMapper类似的方法建立TrackStats对象,但此次它只填写每一个音轨的不一样用户数信息,无论其余字段:
public void map(IntWritable trackId, IntWritable uniqueListenerCount,OutputCollector output, Reporter reporter) throws IOException { TrackStats trackStats = new TrackStats(); trackStats.setListeners(uniqueListenerCount.get()); output.collect(trackId, trackStats); }
代码释义:刚才知道为何在建立虚拟对象TrackStats第一个参数置零的缘由了吧,这个参数是为了第一步统计用户喜欢总数而预留的。
表16-8是mapper的一些输入数据;表16-9是对应的输出结果。
表16-8. MergeListenersMapper的输入
表16-9. MergeListenersMapper的输出
IdentityMapper IdentityMapper被配置用来处理SumJob输出的TrackStats对象,由于不要求对数据进行其余处理,因此它直接输出输入数据(见表16-10)。
表16-10. IdentityMapper的输入和输出
▲
SumReducer 前面两个mapper产生同一类型的数据:每一个音轨对应一个TrackStats对象,只是数据赋值不一样。最后的reduce阶段可以重用前面描述的SumReducer来为每一个音轨建立一个新的TrackStats对象,它综合前面两个TrackStats对象的值,而后输出结果(见表16-11)。
表16-11. SumReducer的最终输出
▲
最终输出文件被收集后复制到服务器端,在这里一个Web服务程序使Last.fm网站能获得并展现这些数据。如图16-3所示,这个网页展现了一个音频的使用统计信息:接听者总数和播放总次数。
▲图16-3. TrackStats结果
总结
Hadoop已经成为Last.fm基础框架的一个重要部件,它用于产生和处理各类各样的数据集,如网页日志信息和用户收听数据。为了让你们可以掌握主要的概念,这里讲述的例子已经被大大地简化;在实际应用中输入数据具备更复杂的结构而且数据处理的代码也更加繁琐。虽然Hadoop自己已经足够成熟能够支持实际应用,但它仍在被你们积极地开发,而且每周Hadoop社区都会为它增长新的特性并提高它的性能。Last.fm很高兴是这个社区的一分子,咱们是代码和新想法的贡献者,同时也是对大量开源技术进行利用的终端用户。
(做者:Adrian Woodhead和Marc de Palol)