大数平台总体架构选型

数据处理分为三大类: 

 

  • 第一类是从业务的角度,细分为查询检索、数据挖掘、统计分析、深度分析,其中深度分析分为机器学习和神经网络。
  • 第二类是从技术的角度,细分为Batch、SQL、流式处理、machine learning、Deep learning。
  • 第三类是编程模型,细分为离线编程模型、内存编程模型、实时编程模型。

结合前文讲述的数据源特色、分类、采集方式、存储选型、数据分析、数据处理,我在这里给出一个整体的大数据平台的架构。值得注意的是,架构图中去掉了监控、资源协调、安全日志等。 node

大数据平台总体架构思考 


左侧是数据源,有实时流的数据(多是结构化、非结构化,但其特色是实时的),有离线数据,离线数据通常采用的多为ETL的工具,常见的作法是在大数据平台里使用Sqoop或Flume去同步数据,或调一些NIO的框架去读取加载,而后写到HDFS里面,固然也有一些特别的技术存储的类型,好比HAWQ就是一个支持分布式、支持事务一致性的开源数据库。 

从业务场景来看,若是咱们作统计分析,就可使用SQL或MapReduce或streaming或Spark。若是作查询检索,同步写到HDFS的同时还要考虑写到ES里。若是作数据分析,能够建一个Cube,而后再进入OLAP的场景。 算法

 

为了支持这么多功能,咱们怎么搭建咱们的数据平台的呢?

Lambda架构

Lambda架构的主要思想是将大数据系统架构为多层个层次,分别为批处理层(batchlayer)、实时处理层(speedlayer)、服务层(servinglayer)如图(C)。sql

理想状态下,任何数据访问均可以从表达式Query= function(alldata)开始,可是,若数据达到至关大的一个级别(例如PB),且还须要支持实时查询时,就须要耗费很是庞大的资源。一个解决方式是预运算查询函数(precomputedquery funciton)。书中将这种预运算查询函数称之为Batch View(A),这样当须要执行查询时,能够从BatchView中读取结果。这样一个预先运算好的View是能够创建索引的,于是能够支持随机读取(B)。因而系统就变成:数据库

(A)batchview = function(all data);编程

(B)query =function(batch view)。后端

Lambda架构组件选型

下图给出了Lambda架构中各组件在大数据生态系统中和阿里集团的经常使用组件。数据流存储选用不可变日志的分布式系统Kafa、TT、Metaq;BatchLayer数据集的存储选用Hadoop的HDFS或者阿里云的ODPS;BatchView的加工采用MapReduce;BatchView数据的存储采用Mysql(查询少许的最近结果数据)、Hbase(查询大量的历史结果数据)。SpeedLayer采用增量数据处理Storm、Flink;RealtimeView增量结果数据集采用内存数据库Redis。数组

图(H)缓存

Lambda是一个通用框架,各模块选型不要局限于上面给出的组件,特别是view的选型。由于View是和各业务关联很是大的概念,View选择组件时要根据业务的需求,选择最合适的组件。安全

Lambda架构的评估

优势:服务器

a、数据的不可变性。里面给出的数据传输模型是在初始化阶段对数据进行实例化,这样的作法是能获益良多的。可以使得大量的MapReduce工做变得有迹可循,从而便于在不一样阶段进行独立调试。

 b、强调了数据的从新计算问题。在流处理中从新计算是个主要挑战,可是常常被忽视。比方说,某工做流的数据输出是由输入决定的,那么一旦代码发生改动,咱们将不得不从新计算来检视变动的效度。什么状况下代码会改动呢?例如需求发生变动,计算字段须要调整或者程序发出错误,须要进行调试。

缺点:

aJay Kreps认为Lambda包含固有的开发和运维的复杂性。Lambda须要将全部的算法实现两次,一次是为批处理系统,另外一次是为实时系统,还要求查询获得的是两个系统结果的合并。

 

因为存在以上缺点,Linkedin的Jaykreps提出了Kappa架构如图(I):

图(I)

1、使用Kafka或其它系统来对须要从新计算的数据进行日志记录,以及提供给多个订阅者使用。例如须要从新计算30天内的数据,咱们能够在Kafka中设置30天的数据保留值。

2、当须要进行从新计算时,启动流处理做业的第二个实例对以前得到的数据进行处理,以后直接把结果数据放入新的数据输出表中。

3、看成业完成时,让应用程序直接读取新的数据记录表。

4、中止历史做业,删除旧的数据输出表。

 

Kappa架构暂时未作深刻了解,在此不作评价。我我的以为,不一样的数据架构有各自的优缺点,咱们使用的时候只能根据应用场景,选择更合适的架构,才能扬长避短。

 

 

  • flume + Hadoop

 

在具体介绍本文内容以前,先给你们看一下Hadoop业务的总体开发流程: 
这里写图片描述
从Hadoop的业务开发流程图中能够看出,在大数据的业务处理过程当中,对于数据的采集是十分重要的一步,也是不可避免的一步,从而引出咱们本文的主角—Flume。本文将围绕Flume的架构、Flume的应用(日志采集)进行详细的介绍。 

 

  flume的特色:
  flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各种数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各类数据接受方(好比文本、HDFS、Hbase等)的能力 。
  flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)而且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,而后Source会把事件推入(单个或多个)Channel中。你能够把Channel看做是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另外一个Source。

 

  • Kafka + Hbase + Phoenix

先看一下咱们数据处理的主要步骤,首先是咱们SDK采集数据,采集数据以后,首先把它扔到咱们的消息队列里作一个基础的持久化,以后咱们会有两部分,一部分是实时统计,一部分是离线统计,这两部分统计完以后会把统计结果存下来,而后提供给咱们的查询服务,最后是咱们外部展现界面。咱们的数据平台主要基于中间的四个绿色的部分。

关于要求,对消息队列来讲确定是吞吐量必定要大,要很是好的扩展性,若是有一个消息的波峰的话要随时可以扩展,由于全部的东西都是分布式的,因此要保证节点故障不会影响咱们正常的业务。

咱们的实时计算目前采用的是分钟级别的实时,没有精确到秒级,离线计算须要计算速度很是快,这两部分咱们当初在考虑的时候就选用了Spark,由于Spark自己既支持实时,又支持离线,并且相对于其余的实时的方案来讲,像Flink或者是Storm和Samza来讲,咱们不须要到秒级的这种实时,咱们须要的是吞吐量,因此咱们选择Spark。实时部分用的是Spark streaming,离线部分用的是Spark offline的方案。

查询方案由于咱们要支持多个维度的组合排序,因此咱们但愿支持sql,这样的话各类组合排序就能够转化成sql的group和order操做。

消息队列 -- Kafka

消息队列咱们选择的是Kafka,由于在咱们看来,Kafka目前是最成熟的分布式消息队列方案,并且它的性能、扩展性也很是好,并且支持容错方案,你能够经过设置冗余来保证数据的完整性。 Kafka目前获得了全部主流流式计算框架的支持,像Spark, Flink, Storm, Samza等等;另一个就是咱们公司的几个创始人都来自于LinkedIn,他们以前在LinkedIn的时候就已经用过Kafka,对Kafka很是熟,因此咱们选择了Kafka。

消息时序 -- HBase

但选定Kafka以后咱们发现了一个问题就是消息时序的问题。首先咱们的数据采集 程中,由于不一样的用户网络带宽不同,数据多是有延迟的,晚到的消息反而可能更早发生,并且Kafka不一样的partition之间是不保证时序的。

可是咱们全部的离线统计程序都是须要按时间统计的,因此咱们就须要一个支持时序的数据库帮咱们把数据排好序,这里咱们选了HBase。咱们用消息产生的时间加上咱们生成消息的ID作成它惟一的row key,进行排序和索引。

SQL On HBase -- Apache Phoenix

对于sql的方案来讲,咱们选择的是Phoenix。选Phoenix是由于咱们考虑了目前几个SQL On HBase的方案,咱们发现Phoenix的效率很是好,是由于它充分的利用了HBase coprocessor的特性,在server端进行了大量的计算,因此大量减轻了client的数据压力还有计算压力。

还有就是它支持HBase的Column Family概念,好比说咱们要支持40个纬度的时候咱们会有一张大宽表,若是咱们把全部的列都设置一个列族的话,在查询任意一个列的时候都须要把40列的数据都读出来,这样是得不偿失的,因此Phoenix支持Column Family的话,咱们就能够把不一样的列根据它们的相关性分红几个列族,查询的时候可能只会命中一个到两个列族,这样大大减小了读取量。

Phoenix还支持Spark的DataSource API,支持列剪枝和行过滤的功能,并且支持数据写入。什么是Spark的DataSource API呢, Spark在1.2的时候提供了DataSource API,它主要是给Spark框架提供一种快速读取外界数据的能力,这个API能够方便的把不一样的数据格式经过DataSource API注册成Spark的表,而后经过Spark SQL直接读取。它能够充分利用Spark分布式的优势进行并发读取,并且Spark自己有一个很好的优化引擎,可以极大的加快Spark SQL的执行。

由于Spark最近很是的火,因此它的社区资源很是的多,基本上全部主流的框架,像咱们常见的Phoenix,Cassandra, MongoDB都有Spark DataSource相关的实现。还有一个就是它提供了一个统一的数据类型,把全部的外部表都统一转化成Spark的数据类型,这样的话不一样的外部表可以相互的关联和操做。

在通过上述的思考以后,咱们选择了这样的一个数据框架。

首先咱们最下面是三个SDK,JS、安卓和iOS,采集完数据以后会发到咱们的负载均衡器,咱们的负载均衡器用的是AWS,它会自动把咱们这些数据发到咱们的server端,server在收集完数据以后会进行一个初步的清洗,把那些不规律的数据给清洗掉,而后再把那些数据发到Kafka里,后面就进入到咱们的实时和离线过程。

最终咱们的数据会统计到HBase里面,对外暴露的是一个sql的接口,能够经过各类sql的组合去查询所须要的统计数据。目前咱们用的主要版本,Spark用的仍是1.5.1,咱们本身根据咱们本身的业务需求打了一些定制的patch,Hadoop用的仍是2.5.2,HBase是0.98,Phoenix是4.7.0,咱们修复了一些小的bug,以及加了一些本身的特性,打了本身的patch。 

 

Hadoop危机?替代HDFS的8个绝佳方案

Ceph 是一个开源、多管齐下的操做系统,由于其高性能并行文件系统的特性,有人甚至认为它是基于Hadoop环境下的HDFS的接班人,由于自2010年就有研究者在寻找这个特性。

 

Apache Flink

Apache Flink是一种能够处理批处理任务的流处理框架。该技术可将批处理数据视做具有有限边界的数据流,借此将批处理任务做为流处理的子集加以处理。为全部处理任务采起流处理为先的方法会产生一系列有趣的反作用。

这种流处理为先的方法也叫作Kappa架构,与之相对的是更加被广为人知的Lambda架构(该架构中使用批处理做为主要处理方法,使用流做为补充并提供早期未经提炼的结果)。Kappa架构中会对一切进行流处理,借此对模型进行简化,而这一切是在最近流处理引擎逐渐成熟后才可行的。

流处理模型

Flink的流处理模型在处理传入数据时会将每一项视做真正的数据流。Flink提供的DataStream API可用于处理无尽的数据流。Flink可配合使用的基本组件包括:

  • Stream(流)是指在系统中流转的,永恒不变的无边界数据集
  • Operator(操做方)是指针对数据流执行操做以产生其余数据流的功能
  • Source(源)是指数据流进入系统的入口点
  • Sink(槽)是指数据流离开Flink系统后进入到的位置,槽能够是数据库或到其余系统的链接器

为了在计算过程当中遇到问题后可以恢复,流处理任务会在预约时间点建立快照。为了实现状态存储,Flink可配合多种状态后端系统使用,具体取决于所需实现的复杂度和持久性级别。

此外Flink的流处理能力还能够理解“事件时间”这一律念,这是指事件实际发生的时间,此外该功能还能够处理会话。这意味着能够经过某种有趣的方式确保执行顺序和分组。

批处理模型

Flink的批处理模型在很大程度上仅仅是对流处理模型的扩展。此时模型再也不从持续流中读取数据,而是从持久存储中以流的形式读取有边界的数据集。Flink会对这些处理模型使用彻底相同的运行时。

Flink能够对批处理工做负载实现必定的优化。例如因为批处理操做可经过持久存储加以支持,Flink能够不对批处理工做负载建立快照。数据依然能够恢复,但常规处理操做能够执行得更快。

另外一个优化是对批处理任务进行分解,这样便可在须要的时候调用不一样阶段和组件。借此Flink能够与集群的其余用户更好地共存。对任务提早进行分析使得Flink能够查看须要执行的全部操做、数据集的大小,以及下游须要执行的操做步骤,借此实现进一步的优化。

优点和局限

Flink目前是处理框架领域一个独特的技术。虽然Spark也能够执行批处理和流处理,但Spark的流处理采起的微批架构使其没法适用于不少用例。Flink流处理为先的方法可提供低延迟,高吞吐率,近乎逐项处理的能力。

Flink的不少组件是自行管理的。虽然这种作法较为罕见,但出于性能方面的缘由,该技术可自行管理内存,无需依赖原生的Java垃圾回收机制。与Spark不一样,待处理数据的特征发生变化后Flink无需手工优化和调整,而且该技术也能够自行处理数据分区和自动缓存等操做。

Flink会经过多种方式对工做进行分许进而优化任务。这种分析在部分程度上相似于SQL查询规划器对关系型数据库所作的优化,可针对特定任务肯定最高效的实现方法。该技术还支持多阶段并行执行,同时可将受阻任务的数据集合在一块儿。对于迭代式任务,出于性能方面的考虑,Flink会尝试在存储数据的节点上执行相应的计算任务。此外还可进行“增量迭代”,或仅对数据中有改动的部分进行迭代。

在用户工具方面,Flink提供了基于Web的调度视图,借此可轻松管理任务并查看系统状态。用户也能够查看已提交任务的优化方案,借此了解任务最终是如何在集群中实现的。对于分析类任务,Flink提供了相似SQL的查询,图形化处理,以及机器学习库,此外还支持内存计算。

Flink能很好地与其余组件配合使用。若是配合Hadoop 堆栈使用,该技术能够很好地融入整个环境,在任什么时候候都只占用必要的资源。该技术可轻松地与YARN、HDFS和Kafka 集成。在兼容包的帮助下,Flink还能够运行为其余处理框架,例如Hadoop和Storm编写的任务。

目前Flink最大的局限之一在于这依然是一个很是“年幼”的项目。现实环境中该项目的大规模部署尚不如其余处理框架那么常见,对于Flink在缩放能力方面的局限目前也没有较为深刻的研究。随着快速开发周期的推动和兼容包等功能的完善,当愈来愈多的组织开始尝试时,可能会出现愈来愈多的Flink部署。

总结

Flink提供了低延迟流处理,同时可支持传统的批处理任务。Flink也许最适合有极高流处理需求,并有少许批处理任务的组织。该技术可兼容原生Storm和Hadoop程序,可在YARN管理的集群上运行,所以能够很方便地进行评估。快速进展的开发工做使其值得被你们关注。

Spark or Flink or hadoop or Storm 总结

大数据系统可以使用多种处理技术。

对于仅须要批处理的工做负载,若是对时间不敏感,比其余解决方案实现成本更低的Hadoop将会是一个好选择。

对于仅须要流处理的工做负载,Storm可支持更普遍的语言并实现极低延迟的处理,但默认配置可能产生重复结果而且没法保证顺序。Samza与YARN和Kafka紧密集成可提供更大灵活性,更易用的多团队使用,以及更简单的复制和状态管理。

对于混合型工做负载,Spark可提供高速批处理和微批处理模式的流处理。该技术的支持更完善,具有各类集成库和工具,可实现灵活的集成。Flink提供了真正的流处理并具有批处理能力,经过深度优化可运行针对其余平台编写的任务,提供低延迟的处理,但实际应用方面还为时过早。

最适合的解决方案主要取决于待处理数据的状态,对处理所需时间的需求,以及但愿获得的结果。具体是使用全功能解决方案或主要侧重于某种项目的解决方案,这个问题须要慎重权衡。随着逐渐成熟并被普遍接受,在评估任何新出现的创新型解决方案时都须要考虑相似的问题。

数据存储: HBase vs Cassandra

  HBase Cassandra
语言 Java Java
出发点 BigTable BigTable and Dynamo
License Apache Apache
Protocol HTTP/REST (also Thrift) Custom, binary (Thrift)
数据分布 表划分为多个region存在不一样region server上 改进的一致性哈希(虚拟节点)
存储目标 大文件 小文件
一致性 强一致性 最终一致性,Quorum NRW策略
架构 master/slave p2p
高可用性 NameNode是HDFS的单点故障点 P2P和去中心化设计,不会出现单点故障
伸缩性 Region Server扩容,经过将自身发布到Master,Master均匀分布Region 扩容需在Hash Ring上多个节点间调整数据分布
读写性能 数据读写定位可能要经过最多6次的网络RPC,性能较低。 数据读写定位很是快
数据冲突处理 乐观并发控制(optimistic concurrency control) 向量时钟
临时故障处理 Region Server宕机,重作HLog 数据回传机制:某节点宕机,hash到该节点的新数据自动路由到下一节点作 hinted handoff,源节点恢复后,推送回源节点。
永久故障恢复 Region Server恢复,master从新给其分配region Merkle 哈希树,经过Gossip协议同步Merkle Tree,维护集群节点间的数据一致性
成员通讯及错误检测 Zookeeper 基于Gossip
CAP 1,强一致性,0数据丢失。2,可用性低。3,扩容方便。 1,弱一致性,数据可能丢失。2,可用性高。3,扩容方便。

二:部署运维

单纯的就部署和运维hbase以及Cassandra来讲,部署hbase前,须要部署的组件有zookeeper,hdfs,而后才是hbase。对应的Cassandra就比较简单不少,编译完成一个jar包,单台服务器启动一个Cassandra进程便可。

在部署hbase的时候,可能须要规划好,哪些机器跑hmaser,rs,zk,hdfs的相关进程等, 还有可能为了集群的性能,还要预先规划好多少个rs。本身人工去部署这么一个hbase集群仍是比较麻烦的,更别提本身维护(阿里云ApsaraDB-HBase你值得拥有)。

Cassandra部署的时候比较简单,一个tar包搞定,因为cassandra数据落本地盘,须要人为的配置一些参数好比是否须要虚拟节点(vnode)以及多少vnode;须要基于业务的场景选择特定的key的放置策略(partitioner),这个放置策略的选择以及一些参数的配置须要必定的门槛。

简单总结下:部署运维的话,hbase依赖组件多,部署麻烦一点,可是相关资料不少,下降了难度;cassandra部署依赖少,可是配置参数多,相关资料较少。

特别是使用云HBase彻底避免了部署形成的各类麻烦,比手工部署运维任何大数据数据库都方便太多。

 

 

  • 预测系统架构

总体架构从上至下依次是:数据源输入层、基础数据加工层、核心业务层、数据输出层和下游系统。首先从外部数据源获取咱们所需的业务数据,而后对基础数据进行加工清洗,再经过时间序列、机器学习等人工智能技术对数据进行处理分析,最后计算出预测结果并经过多种途径推送给下游系统使用。

数据源输入层:京东数据仓库中存储着咱们须要的大部分业务数据,例如订单信息、商品信息、库存信息等等。而对于促销计划数据则大部分来自于采销人员经过Web系统录入的信息。除此以外还有一小部分数据经过文本形式直接上传到HDFS中。

基础数据加工层:在这一层主要经过Hive对基础数据进行一些加工清洗,去掉不须要的字段,过滤不须要的维度并清洗有问题的数据。

核心业务层:这层是系统的的核心部分,横向看又可分为三层:特征构建、预测算法和预测结果加工。纵向看是由多条业务线组成,彼此之间不发生任何交集。

  • 特征构建:将以前清洗过的基础数据经过近一步的处理转化成标准格式的特征数据,提供给后续算法模型使用。

  • 核心算法:利用时间序列分析、机器学习等人工智能技术进行销量、单量的预测,是预测系统中最为核心的部分。

  • 预测结果加工:预测结果可能在格式和一些特殊性要求上不能知足下游系统,因此还须要根据实际状况对其进行加工处理,好比增长标准差、促销标识等额外信息。

预测结果输出层:将最终预测结果同步回京东数据仓库、MySql、HBase或制做成JSF接口供其余系统远程调用。

下游系统:包括下游任务流程、下游Web系统和其余系统。

预测系统核心介绍预测系统核心层技术选型

 

预测系统核心层技术主要分为四层:基础层、框架层、工具层和算法层。

 基础层:

HDFS用来作数据存储,Yarn用来作资源调度,BDP(Big Data Platform)是京东本身研发的大数据平台,咱们主要用它来作任务调度。

 框架层:

以Spark RDD、Spark SQL、Hive为主, MapReduce程序占一小部分,是原先遗留下来的,目前正逐步替换成Spark RDD。 选择Spark除了对性能的考虑外,还考虑了Spark程序开发的高效率、多语言特性以及对机器学习算法的支持。在Spark开发语言上咱们选择了Python,缘由有如下三点:

  • Python有不少不错的机器学习算法包可使用,比起Spark的MLlib,算法的准确度更高。咱们用GBDT作过对比,发现xgboost比MLlib里面提供的提高树模型预测准确度高出大概5%~10%。虽然直接使用Spark自带的机器学习框架会节省咱们的开发成本,但预测准确度对于咱们来讲相当重要,每提高1%的准确度,就可能会带来成本的成倍下降。

  • 咱们的团队中包括开发工程师和算法工程师,对于算法工程师而言他们更擅长使用Python进行数据分析,使用Java或Scala会有不小的学习成本。

  • 对比其余语言,咱们发现使用Python的开发效率是最高的,而且对于一个新人,学习Python比学习其余语言更加容易。

 工具层:

一方面咱们会结合自身业务有针对性的开发一些算法,另外一方面咱们会直接使用业界比较成熟的算法和模型,这些算法都封装在第三方Python包中。咱们比较经常使用的包有xgboost、numpy、pandas、sklearn、scipy和hyperopt等。

  • Xgboost:它是Gradient Boosting Machine的一个C++实现,xgboost最大的特色在于,它可以自动利用CPU的多线程进行并行,同时在算法上加以改进提升了精度。

  • numpy:是Python的一种开源的数值计算扩展。这种工具可用来存储和处理大型矩阵,比Python自身的嵌套列表结构要高效的多(该结构也能够用来表示矩阵)。

  • pandas:是基于NumPy 的一种工具,该工具是为了解决数据分析任务而建立的。Pandas 归入了大量库和一些标准的数据模型,提供了高效地操做大型数据集所需的工具。

  • sklearn:是Python重要的机器学习库,支持包括分类、回归、降维和聚类四大机器学习算法。还包含了特征提取、数据处理和模型评估三大模块。

  • scipy:是在NumPy库的基础上增长了众多的数学、科学以及工程计算中经常使用的库函数。例如线性代数、常微分方程数值求解、信号处理、图像处理和稀疏矩阵等等。

 算法层:

咱们用到的算法模型很是多,缘由是京东的商品品类齐全、业务复杂,须要根据不一样的状况采用不一样的算法模型。咱们有一个独立的系统来为算法模型与商品之间创建匹配关系,有些比较复杂的预测业务还须要使用多个模型。咱们使用的算法整体上能够分为三类:时间序列、机器学习和结合业务开发的一些独有的算法。

1. 机器学习算法主要包括GBDT、LASSO和RNN :

GBDT:是一种迭代的决策树算法,该算法由多棵决策树组成,全部树的结论累加起来作最终答案。咱们用它来预测高销量,但历史规律不明显的商品。

RNN:这种网络的内部状态能够展现动态时序行为。不一样于前馈神经网络的是,RNN能够利用它内部的记忆来处理任意时序的输入序列,这让它能够更容易处理如时序预测、语音识别等。

LASSO:该方法是一种压缩估计。它经过构造一个罚函数获得一个较为精炼的模型,使得它压缩一些系数,同时设定一些系数为零。所以保留了子集收缩的优势,是一种处理具备复共线性数据的有偏估计。用来预测低销量,历史数据平稳的商品效果较好。

2. 时间序列主要包括ARIMA和Holt winters :

ARIMA:全称为自回归积分滑动平均模型,于70年代初提出的一个著名时间序列预测方法,咱们用它来主要预测相似库房单量这种平稳的序列。

Holt winters:又称三次指数平滑算法,也是一个经典的时间序列算法,咱们用它来预测季节性和趋势都很明显的商品。

3. 结合业务开发的独有算法包括WMAStockDT、SimilarityModel和NewProduct等:

WMAStockDT:库存决策树模型,用来预测受库存状态影响较大的商品。

SimilarityModel:类似品模型,使用指定的同类品数据来预测某商品将来销量。

NewProduct:新品模型,顾名思义就是用来预测新品的销量。

预测系统核心流程

预测核心流程主要包括两类:以机器学习算法为主的流程和以时间序列分析为主的流程。

 1. 以机器学习算法为主的流程以下:

 

  • 特征构建:经过数据分析、模型试验肯定主要特征,经过一系列任务生成标准格式的特征数据。

  • 模型选择:不一样的商品有不一样的特性,因此首先会根据商品的销量高低、新品旧品、假节日敏感性等因素分配不一样的算法模型。

  • 特征选择:对一批特征进行筛选过滤不须要的特征,不一样类型的商品特征不一样。

  • 样本分区:对训练数据进行分组,分红多组样本,真正训练时针对每组样本生成一个模型文件。通常是同类型商品被分红一组,好比按品类维度分组,这样作是考虑并行化以及模型的准确性。

  • 模型参数:选择最优的模型参数,合适的参数将提升模型的准确度,由于须要对不一样的参数组合分别进行模型训练和预测,因此这一步是很是耗费资源。

  • 模型训练:待特征、模型、样本都肯定好后就能够进行模型训练,训练每每会耗费很长时间,训练后会生成模型文件,存储在HDFS中。

  • 模型预测:读取模型文件进行预测执行。

  • 多模型择优:为了提升预测准确度,咱们可能会使用多个算法模型,当每一个模型的预测结果输出后系统会经过一些规则来选择一个最优的预测结果。

  • 预测值异常拦截:咱们发现越是复杂且不易解释的算法越容易出现极个别预测值异常偏高的状况,这种预测偏高没法结合历史数据进行解释,所以咱们会经过一些规则将这些异常值拦截下来,而且用一个更加保守的数值代替。

  • 模型评价:计算预测准确度,咱们一般用使用mapd来做为评价指标。

  • 偏差分析:经过分析预测准确度得出一个偏差在不一样维度上的分布,以便给算法优化提供参考依据。

  •  

 2. 以时间序列分析为主的预测流程以下:

  • 生成历史时序:将历史销量、价格、库存等数据按照规定格式生成时序数据。

  • 节假日因子:计算节假日与销量之间的关系,用来平滑节假日对销量影响。

  • 周日因子:计算周一到周日这7天与销量的关系,用来平滑周日对销量的影响。

  • 促销因子:计算促销与销量之间的关系,用来平滑促销对销量的影响。

  • 因子平滑:历史销量是不稳定的,会受到节假日、促销等影响,在这种状况下进行预测有很大难度,因此须要利用以前计算的各种因子对历史数据进行平滑处理。

  • 时序预测:在一个相对平稳的销量数据上经过算法进行预测。

  • 因子叠加:结合将来节假日、促销计划等因素对预测结果进行调整。

Spark在预测核心层的应用

咱们使用Spark SQL和Spark RDD相结合的方式来编写程序,对于通常的数据处理,咱们使用Spark的方式与其余无异,可是对于模型训练、预测这些须要调用算法接口的逻辑就须要考虑一下并行化的问题了。咱们平均一个训练任务在一天处理的数据量大约在500G左右,虽然数据规模不是特别的庞大,可是Python算法包提供的算法都是单进程执行。咱们计算过,若是使用一台机器训练所有品类数据须要一个星期的时间,这是没法接收的,因此咱们须要借助Spark这种分布式并行计算框架来将计算分摊到多个节点上实现并行化处理。

咱们实现的方法很简单,首先须要在集群的每一个节点上安装所需的所有Python包,而后在编写Spark程序时考虑经过某种规则将数据分区,好比按品类维度,经过groupByKey操做将数据从新分区,每个分区是一个样本集合并进行独立的训练,以此达到并行化。流程以下图所示:

 

 

伪码以下:

 

repartitionBy方法即设置一个重分区的逻辑返回(K,V)结构RDD,train方法是训练数据,在train方法里面会调用Python算法包接口。saveAsPickleFile是Spark Python独有的一个Action操做,支持将RDD保存成序列化后的sequnceFile格式的文件,在序列化过程当中会以10个一批的方式进行处理,保存模型文件很是适合。

虽然原理简单,但存在着一个难点,即以什么样的规则进行分区,key应该如何设置。为了解决这个问题咱们须要考虑几个方面,第一就是哪些数据应该被聚合到一块儿进行训练,第二就是如何避免数据倾斜。

针对第一个问题咱们作了以下几点考虑:

  • 被分在一个分区的数据要有必定的类似性,这样训练的效果才会更好,好比按品类分区就是个典型例子。

  • 分析商品的特性,根据特性的不一样选择不一样的模型,例如高销商品和低销商品的预测模型是不同的,即便是同一模型使用的特征也可能不一样,好比对促销敏感的商品就须要更多与促销相关特征,相同模型相同特征的商品应倾向于分在一个分区中。

针对第二个问题咱们采用了以下的方式解决:

  • 对于数据量过大的分区进行随机抽样选取。

  • 对于数据量过大的分区还能够作二次拆分,好比图书小说这个品类数据量明显大于其余品类,因而就能够分析小说品类下的子品类数据量分布状况,并将子品类合并成新的几个分区。

  • 对于数据量太小这种状况则须要考虑进行几个分区数据的合并处理。

总之对于后两种处理方式能够单独经过一个Spark任务按期运行,并将这种分区规则保存。

结合图解Spark进行应用、优化

注:《图解Spark:核心技术与案例实战》为本文做者所著。

《图解Spark:核心技术与案例实战》一书以Spark2.0版本为基础进行编写,系统介绍了Spark核心及其生态圈组件技术。其内容包括Spark生态圈、实战环境搭建和编程模型等,重点介绍了做业调度、容错执行、监控管理、存储管理以及运行架构,同时还介绍了Spark生态圈相关组件,包括了Spark SQL的即席查询、Spark Streaming的实时流处理、MLlib的机器学习、GraphX的图处理和Alluxio的分布式内存文件系统等。下面介绍京东预测系统如何进行资源调度,并描述如何使用Spark存储相关知识进行系统优化。

结合系统中的应用

在图解Spark书的第六章描述了Spark运行架构,介绍了Spark集群资源调度通常分为粗粒度调度和细粒度调度两种模式。粗粒度包括了独立运行模式和Mesos粗粒度运行模式,在这种状况下以整个机器做为分配单元执行做业,该模式优势是因为资源长期持有减小了资源调度的时间开销,缺点是该模式中没法感知资源使用的变化,易形成系统资源的闲置,从而形成了资源浪费。

而细粒度包括了Yarn运行模式和Mesos细粒度运行模式,该模式的优势是系统资源可以获得充分利用,缺点是该模式中每一个任务都须要从管理器获取资源,调度延迟较大、开销较大。

因为京东Spark集群属于基础平台,在公司内部共享这些资源,因此集群采用的是Yarn运行模式,在这种模式下能够根据不一样系统所须要的资源进行灵活的管理。在YARN-Cluster模式中,当用户向YARN集群中提交一个应用程序后,YARN集群将分两个阶段运行该应用程序:

第一个阶段是把Spark的SparkContext做为Application Master在YARN集群中先启动;第二个阶段是由Application Master建立应用程序,而后为它向Resource Manager申请资源,并启动Executor来运行任务集,同时监控它的整个运行过程,直到运行完成。下图为Yarn-Cluster运行模式执行过程:

 

结合系统的优化

咱们都知道大数据处理的瓶颈在IO。咱们借助Spark能够把迭代过程当中的数据放在内存中,相比MapReduce写到磁盘速度提升近两个数量级;另外对于数据处理过程尽量避免Shuffle,若是不能避免则Shuffle前尽量过滤数据,减小Shuffle数据量;最后,就是使用高效的序列化和压缩算法。在京东预测系统主要就是围绕这些环节展开优化,相关Spark存储原理知识能够参见图解Spark书第五章的详细描述。

因为资源限制,分配给预测系统的Spark集群规模并非很大,在有限的资源下运行Spark应用程序确实是一个考验,由于在这种状况下常常会出现诸如程序计算时间太长、找不到Executor等错误。咱们经过调整参数、修改设计和修改程序逻辑三个方面进行优化:

 参数调整

  • 减小num-executors,调大executor-memory,这样的目的是但愿Executor有足够的内存可使用。

  • 查看日志发现没有足够的空间存储广播变量,分析是因为Cache到内存里的数据太多耗尽了内存,因而咱们将Cache的级别适当调成MEMORY_ONLY_SER和DISK_ONLY。

  • 针对某些任务关闭了推测机制,由于有些任务会出现暂时没法解决的数据倾斜问题,并不是节点出现问题。

  • 调整内存分配,对于一个Shuffle不少的任务,咱们就把Cache的内存分配比例调低,同时调高Shuffle的内存比例。

 修改设计

参数的调整虽然容易作,但每每效果很差,这时候须要考虑从设计的角度去优化:

  • 原先在训练数据以前会先读取历史的几个月甚至几年的数据,对这些数据进行合并、转换等一系列复杂的处理,最终生成特征数据。因为数据量庞大,任务有时会报错。通过调整后当天只处理当天数据,并将结果保存到当日分区下,训练时按天数须要读取多个分区的数据作union操做便可。

  • 将“模型训练”从天天执行调整到每周执行,将“模型参数选取”从每周执行调整到每个月执行。由于这两个任务都十分消耗资源,而且属于不须要频繁运行,这么作虽然准确度会略微下降,但都在可接受范围内。

  • 经过拆分任务也能够很好的解决资源不够用的问题。能够横向拆分,好比原先是将100个品类数据放在一个任务中进行训练,调整后改为每10个品类提交一次Spark做业进行训练。这样虽然总体执行时间变长,可是避免了程序异常退出,保证任务能够执行成功。除了横向还能够纵向拆分,即将一个包含10个Stage的Spark任务拆分红两个任务,每一个任务包含5个Stage,中间数据保存到HDFS中。

 修改程序逻辑

为了进一步提升程序的运行效率,经过修改程序的逻辑来提升性能,主要是在以下方面进行了改进:避免过多的Shuffle、减小Shuffle时须要传输的数据和处理数据倾斜问题等。

1. 避免过多的Shuffle

Spark提供了丰富的转换操做,可使咱们完成各种复杂的数据处理工做,可是也正由于如此咱们在写Spark程序的时候可能会遇到一个陷阱,那就是为了使代码变的简洁过度依赖RDD的转换操做,使原本仅需一次Shuffle的过程变为了执行屡次。咱们就曾经犯过这样一个错误,原本能够经过一次groupByKey完成的操做却使用了两回。

业务逻辑是这样的:咱们有三张表分别是销量(s)、价格(p)、库存(v),每张表有3个字段:商品id(sku_id)、品类id(category)和历史时序数据(data),如今须要按sku_id将s、p、v数据合并,而后再按category再合并一次,最终的数据格式是:[category,[[sku_id, s , p, v], [sku_id, s , p, v], […],[…]]]。一开始咱们先按照sku_id + category做为key进行一次groupByKey,将数据格式转换成[sku_id, category , [s,p, v]],而后按category做为key再groupByKey一次。

后来咱们修改成按照category做为key只进行一次groupByKey,由于一个sku_id只会属于一个category,因此后续的map转换里面只须要写一些代码将相同sku_id的s、p、v数据group到一块儿就能够了。两次groupByKey的状况:

 

 

 

修改后变为一次groupByKey的状况:

 

多表join时,若是key值相同,则可使用union+groupByKey+flatMapValues形式进行。好比:须要将销量、库存、价格、促销计划和商品信息经过商品编码链接到一块儿,一开始使用的是join转换操做,将几个RDD彼此join在一块儿。后来发现这样作运行速度很是慢,因而换成union+groypByKey+flatMapValue形式,这样作只需进行一次Shuffle,这样修改后运行速度比之前快多了。实例代码以下:

 

若是两个RDD须要在groupByKey后进行join操做,可使用cogroup转换操做代替。好比, 将历史销量数据按品类进行合并,而后再与模型文件进行join操做,流程以下:

 

使用cogroup后,通过一次Shuffle就可完成了两步操做,性能大幅提高。

2. 减小Shuffle时传输的数据量

  • 在Shuffle操做前尽可能将不须要的数据过滤掉。

  • 使用comebineyeByKey能够高效率的实现任何复杂的聚合逻辑。

  •  

comebineyeByKey属于聚合类操做,因为它支持map端的聚合因此比groupByKey性能好,又因为它的map端与reduce端能够设置成不同的逻辑,因此它支持的场景比reduceByKey多,它的定义以下:

educeByKey和groupByKey内部实际是调用了comebineyeByKey,

 

 

咱们以前有不少复杂的没法用reduceByKey来实现的聚合逻辑都经过groupByKey来完成的,后来所有替换为comebineyeByKey后性能提高了很多。

3.处理数据倾斜

有些时候通过一系列转换操做以后数据变得十分倾斜,在这样状况下后续的RDD计算效率会很是的糟糕,严重时程序报错。遇到这种状况一般会使用repartition这个转换操做对RDD进行从新分区,从新分区后数据会均匀分布在不一样的分区中,避免了数据倾斜。若是是减小分区使用coalesce也能够达到效果,但比起repartition不足的是分配不是那么均匀。

 

 

其它阅读:

facebook为何放弃Cassandra?

参考:http://www.zhihu.com/question/19593207:

Facebook开发Cassandra初衷是用于Inbox Search,可是后来的Message System则使用了HBase,Facebook对此给出的解释是Cassandra的最终一致性模型不适合Message System,HBase具备更简单的一致性模型,固然还有其余的缘由。HBase更加的成熟,成功的案例也比较多等等。Twitter和Digg都曾经很高调的选用Cassandra,可是最后也都放弃了,固然Twitter还有部分项目也还在使用Cassandra,可是主要的Tweet已经不是了。

相关文章
相关标签/搜索