Storm入门,包学会

前言

只有光头才能变强。

文本已收录至个人GitHub精选文章,欢迎Starhttps://github.com/ZhongFuCheng3y/3yhtml

据说过大数据的同窗应该都据说过Storm吧?其实我如今负责的系统用的就是Storm,在最开始接手系统的时候,我是彻底不了解Storm的(如今其实也是只知其一;不知其二而已)git

因为最近在整理系统,因此顺便花了点时间入门了一下Storm(前几天花了点时间改了一下,上线之后一堆Bug,因而就果断回滚了。)github

这篇文章来说讲简单Storm的简单使用,没有复杂的东西。看完这篇文章,等到接手Storm的代码的时候大家**『大概』『应该』**能看懂Storm的代码。算法

什么是Storm

咱们首先进官方看一下Storm的介绍:apache

Apache Storm is a free and open source distributed realtime computation system

Storm是一个分布式的实时计算系统后端

分布式:我在以前已经写过挺多的分布式的系统了,好比Kafka/HDFS/Elasticsearch等等。如今看到分布式这个词,三歪第一反应就是「它的存储或者计算交由多台服务器上完成,最后汇总起来达到最终的效果」。安全

实时:处理速度是毫秒级或者秒级的服务器

计算:能够简单理解为对数据进行处理,好比清洗数据(对数据进行规整,取出有用的数据)。微信

咱们使用Storm作了什么?

我如今作的消息管理平台是能够推送各种的消息的(IM/PUSH/短信/微信消息等等),消息下发后,咱们是确定要知道这条消息的下发状况的(是否发送成功,若是用户没收到是因为什么缘由致使用户没收到,消息是否被点击了等等)。数据结构

消息是否成功下发到用户上,这是运营和客服常常关心的问题。

消息下发的效果,这是运营很是关心的问题

基于上面问题,咱们用了Storm作了一套本身的埋点方案,帮助咱们快速确认消息是否成功下发到用户上以及统计消息下发的效果

听起来好像很牛逼,下面我来说讲背景,看完你就会发现一点儿都不难。

需求背景

消息管理平台虽然看起来只是发消息的,可是系统设计仍是有点东西的。咱们以「微服务」的思想去看这个系统,会将不一样的功能模块抽取到不一样的系统的。

其中PUSH(推送)的链路是最长的,一条消息下发通过的后端系统就有7个,如图下:

这7个系统都有可能「干掉」了这条消息,致使用户没收到。若是咱们每去查一个问题,都要逐一排查每一个系统,那实在是太慢了。

不少时候客服反馈过来的问题都是当天的,甚至是前几分钟的,咱们须要有一个及时的反馈给客服来帮助用户找到为何收不到消息的缘由。

因而咱们要作两个功能:

  1. 可以查询用户当天全部的消息下发状况。(可以快速定位是哪一个系统什么缘由致使用户收不到消息)
  2. 查询某条消息的实时总体下发状况。(可以快速查看该消息的总体下发状况,包括下发量,中途过滤的量以及点击量)
若是是单纯查问题,咱们将各个系统的日志收集到Kafka,而后写到Elasticsearch这个是彻底没问题的(如今咱们也是这么干的)

涉及到统计相关的,咱们就有本身的一套埋点方案,这个是便于对数据的统计,也能完成部分排查的功能

需求实现

前面提到了「埋点」,实际上就是打日志。其实就是在关键的地方上打上日志作记录,方便排查问题。

好比,如今咱们有7个系统,每一个系统在执行消息的时候都会可能致使这条消息发不出去(多是消息去重了,多是用户的手机号不正确,多是用户过久没有登陆了等等都有可能)。咱们在这些『关键位置』都打上日志,方便咱们去排查。

这些「关键位置」咱们都给它用简单的数字来命个名。好比说:咱们用「11」来表明这个用户没有绑定手机号,用「12」来表明这个用户10分钟前收到了一条如出一辙的消息,用「13」来表明这个用户屏蔽了消息.....

「11」「12」「13」「14」「15」「16」这些就叫作「点位」,把这些点位在关键的位置中打上日志,这个就叫作「埋点

有了埋点,咱们要作的就是将这些点位收集起来,而后统一处理成咱们的格式,输出到数据源中。

OK,就是分三步:

  1. 收集日志
  2. 清洗日志
  3. 输出到数据源

收集日志咱们有logAgent帮咱们收集到Kafka,实时清洗日志咱们用的就是Storm,清洗完咱们输出到Redis(实时)/Hive(离线)。

Storm通常是在处理(清洗)那层,Storm的上下游也很明确了(上游是消息队列,下游写到各类数据源,这种是最多见的):

Storm统一清洗出来放到Redis,咱们就能够经过接口来很方便去查一条消息的总体下发状况,好比:

到这里,主要想说明咱们经过Storm来实时清洗数据,下来来说讲Storm的基本使用~

Storm入门

咱们从一段最简单的Storm代码入门,先看看下面的代码:

若是彻底没看过Storm代码的同窗,看到上面的代码会怎么分析?我是这样的:

  • 首先有一个TopologyBuilder的东西,这个东西多是Storm的构造器之类的
  • 而后设置了Spout和Bolt(可是我不知道这两个东西是用来干吗的,可是我能够点进去对象里边看看作了什么)
  • 而后设置了一下Config配置(应该是设置Storm分配多少内存,多少线程之类的,反正跟配置相关)
  • 最后用StormSubmitter提交任务,把配置和TopologyBuilder的内容给提交上去。

咱们简单搜一下,就能够发现它的流程大体是这样的:

Spout是数据的源头,通常咱们用它去接收数据,Spout接收到数据后往Bolt上发送,Bolt处理数据(清洗)。Bolt清洗完数据能够写到一个数据源或者传递给下一个Bolt继续清洗。

Topology关联了咱们在程序中定义好的Spout和Bolt。各类 Spout 和 Bolt 链接在一块儿以后,就成了一个 Topology,一个 Topology 就是一个 Storm 应用。

Spout往Bolt传递数据,Bolt往Bolt传递数据,这个传递的过程叫作Stream,Stream传递的是一个一个Tuple

如今问题来了,咱们的Spout和Bolt之间是怎么关联起来的呢?Bolt和Bolt之间是怎么关联起来的呢?

在上面的图咱们知道一个Topology会有多个Spout和多个Bolt,那我怎么知道这个Spout传递的数据是给这个Bolt,这个Bolt传递的数据是给另一个Bolt?(说白了,就是上面图上的箭头是怎么关联的呢?)

在Storm中,有Grouping的机制,就是决定Spout的数据流向哪一个Bolt,Bolt的数据流向下一个Bolt。

为了提升并发度,咱们在setBolt的时候,能够指定Bolt的线程数,也就是所谓的Executor(Spout也一样能够指定线程数的,只是此次我拿Bolt来举例)。咱们的结构可能会是这样的:

分组的策略有如下:

  • 1)shuffleGrouping(随机分组)
  • 2)fieldsGrouping(按照字段分组,在这里便是同一个单词只能发送给一个Bolt)
  • 3)allGrouping(广播发送,即每个Tuple,每个Bolt都会收到)
  • 4)globalGrouping(全局分组,将Tuple分配到task id值最低的task里面)
  • 5)noneGrouping(随机分派)
  • 6)directGrouping(直接分组,指定Tuple与Bolt的对应发送关系)
  • 7)Local or shuffle Grouping
  • 8)partialKeyGrouping(关键字分组,与按字段分组很类似,但他分配更加均衡)
  • 9)customGrouping (自定义的Grouping)

shuffleGrouping策略咱们是用得最多的,好比上面的图上有两个Spout,咱们会将这两个Spout的Tuple均匀分发到各个Bolt中执行。

说到这里,咱们再回头看看最开始的代码,我给补充一下注释,大家应该就能看得懂了:

我仍是再画一个图吧:

入门的过程复杂吗?不复杂。说白了就是Spout接收到数据,经过grouping机制将Spout的数据传到给Bolt处理,Bolt处理完看还需不须要继续往下处理,若是须要就传递给下一个Bolt,不须要就写到数据源、调接口等等。

Storm架构

当咱们提交任务以后,会发生什么呢?咱们来看看。

  1. 任务提交后,会被上传到Nimbus节点上,它是主控节点,负责分配代码、布置任务及检测故障
  2. Nimbus会去Zookeeper上读取整个集群的信息,将任务交给Supervisor,它是工做节点,负责建立、执行任务
  3. Supervisor建立Worker进程,每一个Worker对应一个Topology的子集。Worker是Task的容器,Task是真正的任务执行者。

流程大体以下:

Nimbus和Supervisor都是节点(服务器),Storm用Zookeeper去管理Supervisor节点的信息。

Supervisor节点下会建立Worker进程,建立多少个Worker进程由Conf配置文件决定。线程Executor,由进程产生,用于执行任务,Executor线程数有多少个是在setBolt、setSpout的时候决定。Task是真正的任务执行者,Task其实就是包装了Bolt/Spout实例。

关于Worker、Executor、Task之间的关系,在官网有一个例子专门说明了,咱们能够看看。先放出代码:

内部的图:

解释一下:

  • 默认状况下:若是不指定Tasks数,那么一个线程会有一个Task
  • conf.setNumWorkers(2)表明会建立两个Worker进程
  • setSpout("blue-spout", new BlueSpout(), 2)蓝色Spout会有两个线程处理,由于有两个进程,因此一个进程会有一个蓝色Spout线程
  • topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2).setNumTasks(4) 绿色Bolt会有两个线程处理,由于有两个进程Worker因此一个进程会有一个绿色Bolt线程。又由于设置了4个Task数,因此一个线程会分配两个绿色的Task
  • topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6).shuffleGrouping("green-bolt")。黄色Bolt会有6个线程处理,由于建立了两个进程,因此一个进程会有3个黄色Bolt线程。没有单独设置Task书,因此一个线程默认有一个Task

从上面咱们能够知道threads ≤ tasks线程数是确定小于等于Task数的。有没有好奇宝宝会问:「Storm用了线程,那么会有线程不安全的状况吗?」(其实这是三歪刚学的疑问)

通常来讲不会,由于不少状况下,一个线程是对应一个Task的(Task你能够理解为Bolt/Spout的实例),既然每一个线程是处理本身的实例了,那固然不会有线程安全的问题啦。(固然了,你若是在Bolt/Spout中设置了静态成员变量,那仍是会有线程安全问题)

最后

这篇文章简单地介绍了一下Storm,Storm的东西其实还有不少,包括ack机制什么的。如今进官方找文档,都在主推Trident了,有兴趣的同窗能够继续往下看。

话又说回来,我司也在主推Flink了,这块后续若是有迁移计划,我也准备学学搞搞,到时候再来分享分享入门文章。

参考资料:

各种知识点总结

下面的文章都有对应的 原创精美PDF,在持续更新中,能够来找我催更~

涵盖Java后端全部知识点的开源项目(已有7 K star):https://github.com/ZhongFuCheng3y/3y

若是你们想要实时关注我更新的文章以及分享的干货的话,微信搜索Java3y

PDF文档的内容均为手打,有任何的不懂均可以直接来问我(公众号有个人联系方式)。

相关文章
相关标签/搜索