Spark和Storm
Spark基于MapReduce算法实现的分布式计算,不一样于MapReduce的是,做业中间结果能够保存在内存中,而不要再读写HDFS,
Spark适用于数据挖掘和机器学习等须要迭代的MapReduce算法
Spark Streaming是创建在Spark上的实时计算框架,能够结合流式、批处理和交互式进行查询和实时计算,
基本原理是将Stream数据分红小的时间片断,以相似batch批量处理的方式来处理这些小部分数据
Spark Streaming相比于基于Record的其余处理框架(Storm),弹性分布式数据集更容易实现高效的容错处理;
此外小批量处理的方式使得它能够同时兼容批量和实时数据处理的逻辑和算法,方便了一些须要历史数据和实时数据联合分析的特定应用场合
Spark Streaming和Storm两个框架都提供了可扩展性和容错性,根本区别在于处理模型,Storm处理的是每次传入的一个事件,
而Spark Streaming是处理某个时间段窗口内的事件流。所以,Storm处理一个时间能够达到极低的延迟。java
Hadoop和Storm
Topology=spout+Bolt
Hadoop上运行的是Job(Mapper/Reducer),Storm上运行的是Topology(Spout/Bolt),Job会运行结束,Topology会一直运行下去
Hadoop集群包含(Master Node/Worker Node),对应到Storm集群上的(主节点Nimbus/工做节点Supervisor)
Hadoop集群上的(JobTracker/TaskTracker)对应到Storm集群上的(Nimbus/Supervisor)算法
Storm架构
Supervisor-->Worker(n个Executor(n个Task))-->Topology
Nimbus和Supervisor经过Zookeeper通讯,而且这两个进程都是无状态和快速失败的,全部状态只存在于Zookeeper和本地磁盘上
Spout获取数据源的数据,调用nextTuple函数,发射数据供Bolt消费,发射的数据单元叫Tuple(消息传递的基本单元),源源不断的Tuple组成了Stream
客户端提交Topology代码到Nimbus,Nimbus针对该Topology创建本地的目录,Nimbus中的调度器根据Topology的配置计算Task,并把Task分配到不一样的Worker上,调度的结果
写入ZooKeeper中,ZooKeeper上创建assignments节点,存储Task和Supervisor中Worker的对应关系。在ZooKeeper上建立workerbeats节点来监控Worker的心跳。
Supervisor去ZooKeeper上获取分配的Tasks信息,启动一个或者多个Worker来执行,每一个Worker上运行多个Task,Task由Executor来具体执行。
Worker根据Topology信息初始化创建Task之间的连接,相同Worker类的Task经过DisrupterQueue来通讯,不一样Worker间默认采用Netty来通讯,而后整个Topology就运行起来了
topologies包含全部Topology的静态信息,而cluster中包含了Topology的运行态信息,根据topologies和cluster中的信息,就能够进行真正的调度分配
服务器
在worker中,线程间通讯使用的是Disruptor,而进程间的通讯也就是Worker跟Worker之间的通讯使用的是IContext接口实现,也多是Netty和ZMQ,默认使用Netty架构
在storm中的backtype.storm.task包中含有若干上下文(GeneralTopologyContext\WorkerTopologyContext\TopologyContext),用于记录Topology或者Storm中信息
StormTopology类中定义了不少能够操做读取内部信息的方法app
Task是在Executor中,经过调用mk-task方法来建立一个新的task,并经过调用mk-task-data函数为该Task建立对应的数据框架
Topology、work、Executor、task以及组件关系运维
一个组件(spout/bolt)包含的Executor数量是由在提交Topology时设置的并行度决定的
Topology最终会调度成一个或多个worker,每一个worker即为一个真正的操做系统执行进程,
每一个worker又能够有多个task,每一个task是storm中进行计算的最小的运行单位,也就是spout或者bolt实例,dom
spout 的nextTuple()会在同一个循环内被ack()和fail()周期性的调用。没有任务时它必须释放对线程的控制,其它方法才有机会得以执行。机器学习
executor是worker生成的线程,该线程运行着相同的组件(spout或bolt)的一个或多个task,一个task执行着实际的数据处理
1个组件的task数量老是同样的,可是1个组件的executor的数量能够改变,thread的数量<=task的数量
parallelism_hint参数指定的是bolt的初始的executor的数量分布式
eg
1)
builder.setSpout("id",new Spout(),2);//两个线程执行spout
builder.setBolt("id",new Bolt(),2);//两个线程执行bolt
stormConf.setNumworkers(3);//work数
由于每个worker默认都会占用一个executor(每一个executor会启动一个acker任务(task)),7个executor,7个task
能够在topology中取消acker任务,这样的话就不会多出来一个executor和任务了
加上stormConf.setNumAckers(0);2个executor,2个task
2) 一个线程executor 执行多个 任务task(默认一个executor对应一个task)
int worknum = 3;
builder.setSpout("spout", new RandomSpout(),worknum).setNumTasks(worknum*2); 3个executor,6个task
builder.setBolt("bolt", new SenqueceBolt(),2*worknum).shuffleGrouping("spout").setNumTasks(worknum*2); 6个executor,6个task
conf.setNumWorkers(worknum);
conf.setNumAckers(0);
9个executor,12个task
Stream分组,即消息的分区方法,共7种内置分组方式,也能够经过CustomStreamGrouping接口来定义本身的分组
1 shuffle分组 保证同一级Bolt上的每一个Task处理的Tuple数量一致
2 Fields分组 根据Tuple中的某一个Field或者多个Field的值来划分,具备相同Field的会被分发到同一个Task上
3 All分组 全部的Tuple都会分发到全部的Task上,为每一个接收数据的实例复制一份元组副本。这种分组方式用于向bolts发送信号。
4 Global分组 整个Stream会选择一个Task做为分发的目的地,一般是具备最新ID的Task
5 None分组 等同于shuffle分组
6 Direct分组 产生数据的Spout/Botl本身明确决定这个Tuple被Bolt的哪些Task所消费,须要使用OutputCollector的emitDirect方法来实现
7 Local or shuffle分组 若是目标Bolt中的一个或多个Task和当前产生数据的Task在同一个Worker进程中,那么就走内部的线程间通讯,
将Tuple直接发给在当前Worker进程中的目的Task。不然,同Shuffle分组
可靠性
Spout中发射一个新的Tuple时为其制定一个MessageId,这个MessageId能够是任意的Object对象,多个Stream Tuple能够共用一个MessageId,
Storm会告知用户每个消息单元是否在一个制定的时间内被彻底处理,在Storm中,使用了Acker来解决消息处理的可靠性
消息树
不论是在spout里emit仍是bolt里emit的消息,框架都会给这个消息加一个64位的随机数当作id,
实际上是<root_id,randomID>这样的一个结构,即每一个tuple除了上游给他建立了一个随机64位id外
还带有一个不变的root_id,来自spout task同一个消息树的ack信息都发给同一个acker bolt
jstorm使用的是取模hash算法,只须要对spout的tuple 64位id取模就好了。这样基本上能够知足上面2点要求,
由于spout tuple的id会透传给下游的所有消息树节点,所以,bolt也会正确路由到那个acker bolt
acker bolt弄了一个map来作这件事情,key就是spout tuple id即消息树的root id(64)位的,
一个root id表明一个消息树;value则是一个value对,第一个value是task id(spout),
这些都属于元数据,第二个value是一个64位的数字,这个64位的数字表明了一棵消息树的状态,
当整个value变成0了,说明,消息树被“彻底处理”了,就找这个pair的第一个value,发消息就好了。
slot概念
一个节点的slot的数量用来表示某个节点的资源的容量或者说是能力的大小,于是slot是 Hadoop的资源单位
supervisor.slots.ports Storm的slot最好设置成OS核数的整数倍,同时因为storm是基于内存的实时计算,
slot数不要大于每台物理机可运行slot个数:(物理内存-虚拟内存)/单个java进程最大可占用内存数
worker.childopts storm的worker进程的java限制,有效地设置该参数可以在Topology异常时进行缘由分析
-XX:+HeapDumpOnOutOfMemoryError 当内存使用量超过Xmx时,java进程将被JVM杀掉同时会生产java_pidxxx.hprof文件,便于使用MemoryAnalyzer分析内存使用状况
schedule-topology方法和DefaultScheduler的default-schedule的有一些类似的逻辑,主要根据当前的可用资源完成对Topology的任务分配,包括得到当前的可用Slot资源,计算当前
Topology所能使用的所有Slot数目、对Slot从新分配和进行排序以及获得最后的分配信息等
Storm运维监控
Ganglia是一个跨平台可扩展的、高性能计算系统下的分布式监控系统,监控Storm主机的信息
zabbix来监控 Nimbus和Supervisor进程,当发现进程挂掉后能够重启并报警
经过Storm UI来监控和调试相关应用,以Storm Metric、ZooKeeper目录以及Hook等方式帮助完成一些深刻的调试和监控
鉴于Storm做为一个平台提供给不一样的业务共用,进行资源隔离室必须的,开源资源隔离方案有CGoup(其余封装方案)、
YARN及StormOnYarn(可让Storm、Hadoop、Spark等共同运行在同一套集群上)
storm文件结构
service cgconfig start
在/cgroup目录下会生成 blkio cpu cpuacct cpuset devices freezer memory net_cls子目录,每一个子目录对应一个控制项,每一个子目录下都会存在如下配置文件
cgroup.procs 文件内容为受控制的进程ID
notify_no_release 文件内容设置为1时,当没有可控制进程是,出发release_agent指定的内容
release_agent 文件内容为可执行文件、命令
tasks 文件内容为受控制的线程ID
内存设置memory.oom_disable为0或1,能够控制使用的内存超过限制的内存时时杀死仍是进入休眠
cpu是基于CPU时间片进行的资源带哦度,cpuset是基于CPU核心进行的资源调度
service cgconfig status
Storm开发
Trident是基于Storm的高级抽象,除了提供实时流聚合、分组、过滤等功能,还提供了对数据持久化和事务性操做,保证了Tuple只能被处理一次而且不丢失
每次发送数据,Tuple被分红一组组的batch,每个batch分配一个惟一的事务ID,batch之间的更新严格有序
分布式RPC(DRPC)用于对Storm上大量的函数调用进行并行计算过程,分布式RPC经过DRPC服务器协调接收一个RPC请求,发送请求到Storm Topology,并从Storm Topology接收结果;
一般应用分布式RPC对Trident存储的各类数据源进行并行查询
用户画像建模 比较流行成熟的SQL-ON-Hadoop是Spark SQL,Mesos Spark或者Yarn Spark