Storm+HBase实时实践

1.HBase Increment计数器html

 hbase counter的原理: read+count+write,正好完成,就是讲key的value读出,若存在,则完成累加,再写入,若不存在,则按“0”处理,再加上你须要累加的值。java

  传统上,若是没有 counter,当咱们要给一个 column 的值 +1 或者其余数值时,就须要先从该 column 读取值,而后在客户端修改值,最后写回给 Region Server,即一个 Read-Modify-Write (RMW) 操做。在这样的过程当中,按照 Lars 的描述1,还须要对操做所在的 row 事先加锁,过后解锁。会引发许多 contention,以及随之而来不少问题。而 HBase 的 increment 接口就保证在 Region Server 端原子性的完成一个客户端请求。数据库

   RMW 操做的代码:apache

db.read (table,keyname,fields, new HashMap < String,String > ( ) ) ;
db.update (table,keyname,values ) ;

  它并无对所操做的 row 进行加锁、解锁操做,而是简单的读取改写。这在 counter 的应用场景中是不可接受的。不加锁在大并发状况下,很容易致使 counter 的值与预期不符。架构

  HBase 引入 Increment/Counter 是很是重要的,对某些须要原子性更改操做的应用来讲则是“致命”的。除了单个 increment 的接口 incrementColumnValue() 外,还有批量 increment 的接口increment(Increment),方便客户端调用。并发

  除此以外,HBase 还在进行 Coprocessor 的开发,使计算直接在 Region Server 上进行,省去了繁琐耗时的数据移动。负载均衡

使用方法:分布式

long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,long amount) throws IOException

 

2.Hbase读写函数

 

3.Storm架构oop

  http://shiyanjun.cn/archives/977.html

每个工做进程执行一个topology的一个子集;一个运行的topology由运行在不少机器上的不少工做进程组成。

Topology: 是个应用的spout,bolt,grouping的组合

Storm Grouping:

  1. Shuffle Grouping :随机分组,尽可能均匀分布到下游Bolt中

    将流分组定义为混排。这种混排分组意味着来自Spout的输入将混排,或随机分发给此Bolt中的任务。shuffle grouping对各个task的tuple分配的比较均匀。

  2. Fields Grouping :按字段分组,按数据中field值进行分组;相同field值的Tuple被发送到相同的Task

    这种grouping机制保证相同field值的tuple会去同一个task,这对于WordCount来讲很是关键,若是同一个单词不去同一个task,那么统计出来的单词次数就不对了。“if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”. —— 小示例

  3. All grouping :广播

    广播发送, 对于每个tuple将会复制到每个bolt中处理。

  4. Global grouping :全局分组,Tuple被分配到一个Bolt中的一个Task,实现事务性的Topology。

    Stream中的全部的tuple都会发送给同一个bolt任务处理,全部的tuple将会发送给拥有最小task_id的bolt任务处理。

  5. None grouping :不分组

    不关注并行处理负载均衡策略时使用该方式,目前等同于shuffle grouping,另外storm将会把bolt任务和他的上游提供数据的任务安排在同一个线程下

  6. Direct grouping :直接分组 指定分组

    由tuple的发射单元直接决定tuple将发射给那个bolt,通常状况下是由接收tuple的bolt决定接收哪一个bolt发射的Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪一个task处理这个消息。 只有被声明为Direct Stream的消息流能够声明这种分组方法。并且这种消息tuple必须使用emitDirect方法来发射。消息处理者能够经过TopologyContext来获取处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)

Storm Tuple

  storm中的数据首先是有spout收集,相似于一个消息源,spout的open()函数通常就是接收数据的地方,而后spout的 nextTuple()是发送(emit)tuple的地方。tuple究竟是什么?感受仍是用英语来讲比较容易理解吧,"A tuple is a named of values where each value can be any type."  tuple是一个相似于列表的东西,存储的每一个元素叫作field(字段)。咱们用getString(i)能够得到tuple的第i个字段。而其中的每一个字段均可以任意类型的,也能够一个很长的字符串。咱们能够用:

  1. String A = tuple.getString(0);  
  2. long a= tuple.getLong(1);  

 来获得我想要的数据,不过前提你是要知道你的tuple的组成。具体tuple是什么类型,彻底取决于本身的程序,取决于spout中nextTuple()方法中emit发送的类型。

 

4.kafka+storm+hbase架构

kafka做为分布式消息系统,实时消息系统,有生产者和消费者;storm做为大数据的实时处理系统;hbase是apache hadoop 的数据库,其具备高效的读写性能,把kafka生产的数据做为storm的源头spout来消费,通过bolt处理把结果保存到hbase,进行持久化保存.

相关文章
相关标签/搜索