简单快速的传输层框架,安装以下:java
wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
tar zxf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7
./configure
make
sudo make install
sudo ldconfig git
应该是zmq的java包吧,安装步骤以下:github
git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install算法
针对大型分布式系统提供配置维护、名字服务、分布式同步、组服务等,能够保证:spring
在zookeeper中有几种角色:sql
在启动以前须要在conf下编写zoo.cfg配置文件,里面的内容包括:数据库
在单机的时候能够直接将zoo_sample.cfg修改成zoo.cfg,而后使用启动服务便可(若是报错没有目录,手动建立便可):apache
sudo ./zkServer.sh start编程
如今用netstat -na(或者是./zkCli.sh 127.0.0.1:2181)就能看到在监听指定的端口,那么zookeeper如今起来了。 服务器
参考:
该系统是阿里巴巴在对storm作了重写和优化,在storm里面能运行的在jstorm里面也能运行,该系统擅长执行实时计算,并且基本上都在内存中搞定。进入正题,jstorm中有以下几种角色:
jstorm是用zookeeper来管理的,下面来看conf/storm.yaml中的经常使用配置:
在把配置搞正确以后,就能够用bin中的脚原本启动节点服务了:
sudo ./jstorm nimbus
sudo ./jstorm supervisor
参考:
结构和hadoop的很像,总体看来以下(Nimbus负责控制、提交任务,Supervisor负责执行任务):
为了作实时计算你须要创建topology,由计算节点组成的图:
在JStorm上的topology的生命周期以下:
在集群运行的时候要明白Worker、Executor、Task的概念,固然消息被传递的时候其实发起者、接收者都是Task,而真正执行的是Executor(能够理解为一个线程),由它来轮询其中的Spout/Bolt:
在jstorm中经过ack机制来保证数据至少被处理一次,简单来讲下ack:
在消息发、收的过程当中会造成一棵树状的结构,在一个消息收的时候发一个验证消息,发的时候也发一个验证消息,那么整体上每一个消息出现两次。那么ack机制就是将每一个消息的随机生成的ID进行异或,若是在某一时刻结果为0,那就说明处理成功。
以下图所示:
须要补充一下:虽然ack算是随机算法,可是出错的几率极低,可是系统应该具有在出错以后矫正的能力(甚至检查是否出错)。ack机制保证了消息会被处理,可是不能保证只处理一次&顺序处理,在须要的情形就有了事务的概念:
所谓普通模式是指不去使用JStorm为开发人员提供的高级抽象,用其提供的原生的接口进行开发,主要涉及到的接口有:
由于还存在多种其余类型的拓扑结构,那么在builder这个环节固然不能乱传,在基本用法要去实现IRichSpout、IRichBolt接口,他们并无新增任何的方法,仅仅是用来区分类型。既然是拓扑结构那么应该是一个比较复杂的网络,其实这个是在builder中完成的,其中setSpout/setBolt返回的结果实际上是InputDeclarer对象,在其中定义了N个流分组的策略:
public T fieldsGrouping(String componentId, String streamId, Fields fields); // 字段分组 public T globalGrouping(String componentId, String streamId); // 全局分组 public T shuffleGrouping(String componentId, String streamId); // 随机分组 public T localOrShuffleGrouping(String componentId, String streamId); // 本地或随机分组 public T noneGrouping(String componentId, String streamId); // 无分组 public T allGrouping(String componentId, String streamId); // 广播分组 public T directGrouping(String componentId, String streamId); // 直接分组 // 自定义分组 public T customGrouping(String componentId, CustomStreamGrouping grouping); public T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping); public T grouping(GlobalStreamId id, Grouping grouping);
经过这些接口,咱们能够一边增长处理节点、一边指定其消费哪些消息。
基本的用法是每次处理一个tuple,可是这种效率比较低,不少状况下是能够批量获取消息而后一块儿处理,批量用法对这种方式提供了支持。打开代码能够很明显地发现jstorm和storm的有着不小的区别:
// storm 中的定义 public interface IBatchSpout extends Serializable { void open(Map conf, TopologyContext context); void emitBatch(long batchId, TridentCollector collector);// 批次发射tuple void ack(long batchId); // 成功处理批次 void close(); Map getComponentConfiguration(); Fields getOutputFields(); } // jstorm中的定义 public interface IBatchSpout extends IBasicBolt, ICommitter, Serializable { }
另外若是用批次的话就须要改用BatchTopologyBuilder来构建拓扑结构,在IBatchSpout中主要实现的接口以下:
事务拓扑并非新的东西,只是在原始的ISpout、IBolt上作了一层封装。在事务拓扑中以并行(processing)和顺序(commiting)混合的方式来完成任务,使用Transactional Topology能够保证每一个消息只会成功处理一次。不过须要注意的是,在Spout须要保证可以根据BatchId进行屡次重试,在这里有一个基本的例子,这里有一个不错的讲解。
此次一种更高级的抽象(甚至不须要知道底层是怎么map-reduce的),所面向的再也不是spout和bolt,而是stream。主要涉及到下面几种接口:
在这里有一个jstorm中使用Trident的简单例子。
在不少的实际问题中,咱们面对的模型都是大同小异,下面先来看问题是什么:
一、在流式计算中常常须要对一批的数据进行汇总计算,若是用SQL来描述就是:
SELECT MIN(status) FROM my_table GROUP BY order_id
在用JStorm来实现这一条简单的SQL时,面对的是一条一条的数据库变化的消息(这里须要保证有序消费),其实至关于在一堆的消息上面作了一个嵌套的SQL查询,用一张图表示以下:
二、
----- updating -----