Storm 入门的Demo教程

Storm介绍

Storm是Twitter开源的分布式实时大数据处理框架,最先开源于github,从0.9.1版本以后,归于Apache社区,被业界称为实时版Hadoop。随着愈来愈多的场景对Hadoop的MapReduce高延迟没法容忍,好比网站统计、推荐系统、预警系统、金融系统(高频交易、股票)等等,大数据实时处理解决方案(流计算)的应用日趋普遍,目前已经是分布式技术领域最新爆发点,而Storm更是流计算技术中的佼佼者和主流。java

Storm的核心组件

  • Nimbus:即Storm的Master,负责资源分配和任务调度。一个Storm集群只有一个Nimbus。
  • Supervisor:即Storm的Slave,负责接收Nimbus分配的任务,管理全部Worker,一个Supervisor节点中包含多个Worker进程。
  • Worker:工做进程,每一个工做进程中都有多个Task。
  • Task:任务,在 Storm 集群中每一个 Spout 和 Bolt 都由若干个任务(tasks)来执行。每一个任务都与一个执行线程相对应。
  • Topology:计算拓扑,Storm 的拓扑是对实时计算应用逻辑的封装,它的做用与 MapReduce 的任务(Job)很类似,区别在于 MapReduce 的一个 Job 在获得结果以后总会结束,而拓扑会一直在集群中运行,直到你手动去终止它。拓扑还能够理解成由一系列经过数据流(Stream Grouping)相互关联的 Spout 和 Bolt 组成的的拓扑结构。
  • Stream:数据流(Streams)是 Storm 中最核心的抽象概念。一个数据流指的是在分布式环境中并行建立、处理的一组元组(tuple)的无界序列。数据流能够由一种可以表述数据流中元组的域(fields)的模式来定义。
  • Spout:数据源(Spout)是拓扑中数据流的来源。通常 Spout 会从一个外部的数据源读取元组而后将他们发送到拓扑中。根据需求的不一样,Spout 既能够定义为可靠的数据源,也能够定义为不可靠的数据源。一个可靠的 Spout可以在它发送的元组处理失败时从新发送该元组,以确保全部的元组都能获得正确的处理;相对应的,不可靠的 Spout 就不会在元组发送以后对元组进行任何其余的处理。一个 Spout能够发送多个数据流。
  • Bolt:拓扑中全部的数据处理均是由 Bolt 完成的。经过数据过滤(filtering)、函数处理(functions)、聚合(aggregations)、联结(joins)、数据库交互等功能,Bolt 几乎可以完成任何一种数据处理需求。一个 Bolt 能够实现简单的数据流转换,而更复杂的数据流变换一般须要使用多个 Bolt 并经过多个步骤完成。
  • Stream grouping:为拓扑中的每一个 Bolt 的肯定输入数据流是定义一个拓扑的重要环节。数据流分组定义了在 Bolt 的不一样任务(tasks)中划分数据流的方式。在 Storm 中有八种内置的数据流分组方式。
  • Reliability:可靠性。Storm 能够经过拓扑来确保每一个发送的元组都能获得正确处理。经过跟踪由 Spout 发出的每一个元组构成的元组树能够肯定元组是否已经完成处理。每一个拓扑都有一个“消息延时”参数,若是 Storm 在延时时间内没有检测到元组是否处理完成,就会将该元组标记为处理失败,并会在稍后从新发送该元组。

Storm程序再Storm集群中运行的示例图以下:git

Topology

为何把Topology单独提出来呢,由于Topology是咱们开发程序主要的用的组件。
Topology和MapReduce很相像。
MapReduce是Map进行获取数据,Reduce进行处理数据。
而Topology则是使用Spout获取数据,Bolt来进行计算。
总的来讲就是一个Topology由一个或者多个的Spout和Bolt组成。github

具体流程是怎么走,能够经过查看下面这张图来进行了解。
示例图:数据库

注:图片来源http://www.tianshouzhi.com/api/tutorials/storm/52。apache

图片有三种模式,解释以下:
第一种比较简单,就是由一个Spout获取数据,而后交给一个Bolt进行处理;
第二种稍微复杂点,由一个Spout获取数据,而后交给一个Bolt进行处理一部分,而后在交给下一个Bolt进行处理其余部分。
第三种则比较复杂,一个Spout能够同时发送数据到多个Bolt,而一个Bolt也能够接受多个Spout或多个Bolt,最终造成多个数据流。可是这种数据流必须是有方向的,有起点和终点,否则会形成死循环,数据永远也处理不完。就是Spout发给Bolt1,Bolt1发给Bolt2,Bolt2又发给了Bolt1,最终造成了一个环状。api

Storm 集群安装

以前已经写过了,这里就不在说明了。
博客地址:http://www.panchengming.com/2018/01/26/pancm70/数组

Storm Hello World

前面讲了一些Storm概念,可能在理解上不太清楚,那么这里咱们就用一个Hello World代码示例来体验下Storm运做的流程吧。框架

环境准备

在进行代码开发以前,首先得作好相关的准备。
本项目是使用Maven构建的,使用Storm的版本为1.1.1。
Maven的相关依赖以下:maven

<!--storm相关jar  -->
  <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.1.1</version>
    <scope>provided</scope>
 </dependency>

具体流程

在写代码的时候,咱们先来明确要用Storm作什么。
那么第一个程序,就简单的输出下信息。
具体步骤以下:分布式

  1. 启动topology,设置好Spout和Bolt。
  2. 将Spout获取的数据传递给Bolt。
  3. Bolt接受Spout的数据进行打印。

Spout

那么首先开始编写Spout类。通常是实现 IRichSpout 或继承BaseRichSpout该类,而后实现该方法。
这里咱们继承BaseRichSpout这个类,该类须要实现这几个主要的方法:

1、open

open()方法中是在ISpout接口中定义,在Spout组件初始化时被调用。
有三个参数,它们的做用分别是:

  1. Storm配置的Map;
  2. topology中组件的信息;
  3. 发射tuple的方法;

代码示例:

@Override
    public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
        System.out.println("open:"+map.get("test"));
        this.collector = collector;
    }
2、nextTuple

nextTuple()方法是Spout实现的核心。
也就是主要执行方法,用于输出信息,经过collector.emit方法发射。

这里咱们的数据信息已经写死了,因此这里咱们就直接将数据进行发送。
这里设置只发送两次。
代码示例:

@Override
    public void nextTuple() {
        if(count<=2){
            System.out.println("第"+count+"次开始发送数据...");
            this.collector.emit(new Values(message));
        }
        count++;
    }
3、declareOutputFields

declareOutputFields是在IComponent接口中定义,用于声明数据格式。
即输出的一个Tuple中,包含几个字段。

由于这里咱们只发射一个,因此就指定一个。若是是多个,则用逗号隔开。
代码示例:

@Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        System.out.println("定义格式...");
        declarer.declare(new Fields(field));
    }
4、ack

ack是在ISpout接口中定义,用于表示Tuple处理成功。

代码示例:

@Override
    public void ack(Object obj) {
        System.out.println("ack:"+obj);
    }
5、fail

fail是在ISpout接口中定义,用于表示Tuple处理失败。

代码示例:

@Override
    public void fail(Object obj) {
        System.out.println("失败:"+obj);
    }
6、close

close是在ISpout接口中定义,用于表示Topology中止。

代码示例:

@Override
    public void close() {
        System.out.println("关闭...");
    }

至于还有其余的,这里就不在一一列举了。

Bolt

Bolt是用于处理数据的组件,主要是由execute方法来进行实现。通常来讲须要实现 IRichBolt 或继承BaseRichBolt该类,而后实现其方法。
须要实现方法以下:

1、prepare

在Bolt启动前执行,提供Bolt启动环境配置的入口。
参数基本和Sqout同样。
通常对于不可序列化的对象进行实例化。
这里的咱们就简单的打印下

@Override
    public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
        System.out.println("prepare:"+map.get("test"));
        this.collector=collector;
    }

注:若是是能够序列化的对象,那么最好是使用构造函数。

2、execute

execute()方法是Bolt实现的核心。
也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。
从tuple中获取消息可使用 tuple.getString()tuple.getStringByField();这两个方法。我的推荐第二种,能够经过field来指定接收的消息。
注:若是继承的是IRichBolt,则须要手动ack。这里就不用了,BaseRichBolt会自动帮咱们应答。
代码示例:

@Override
    public void execute(Tuple tuple) {
//      String msg=tuple.getString(0);
        String msg=tuple.getStringByField("test");
        //这里咱们就不作消息的处理,只打印
        System.out.println("Bolt第"+count+"接受的消息:"+msg); 
        count++;
        /**
         * 
         * 没次调用处理一个输入的tuple,全部的tuple都必须在必定时间内应答。
         * 能够是ack或者fail。不然,spout就会重发tuple。
         */
//      collector.ack(tuple);
    }
3、declareOutputFields

和Spout的同样。
由于到了这里就再也不输出了,因此就什么都没写。

@Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {        
    }
cleanup

cleanup是IBolt接口中定义,用于释放bolt占用的资源。
Storm在终止一个bolt以前会调用这个方法。
由于这里没有什么资源须要释放,因此就简单的打印一句就好了。

@Override
    public void cleanup() {
        System.out.println("资源释放");
    }

Topology

这里咱们就是用main方法进行提交topology。
不过在提交topology以前,须要进行相应的设置。
这里我就不一一细说了,代码的注释已经很详细了。
代码示例:

import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    
    /**
     * 
    * Title: App
    * Description:
    * storm测试 
    * Version:1.0.0  
    * @author pancm
    * @date 2018年3月6日
     */
    public class App {
        
        private static final String str1="test1"; 
        private static final String str2="test2"; 
    
        public static void main(String[] args)  {
            // TODO Auto-generated method stub
            //定义一个拓扑
            TopologyBuilder builder=new TopologyBuilder();
            //设置一个Executeor(线程),默认一个
            builder.setSpout(str1, new TestSpout());
            //设置一个Executeor(线程),和一个task
            builder.setBolt(str2, new TestBolt(),1).setNumTasks(1).shuffleGrouping(str1);
            Config conf = new Config();
            conf.put("test", "test");
            try{
              //运行拓扑
           if(args !=null&&args.length>0){ //有参数时,表示向集群提交做业,并把第一个参数当作topology名称
             System.out.println("远程模式");
                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
          } else{//没有参数时,本地提交
        //启动本地模式
         System.out.println("本地模式");
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("111" ,conf,  builder.createTopology() );
        Thread.sleep(10000);
    //  关闭本地集群
        cluster.shutdown();
          }
            }catch (Exception e){
                e.printStackTrace();
            }   
        }
    }

运行该方法,输出结果以下:

本地模式
定义格式...
open:test
第1次开始发送数据...
第2次开始发送数据...
prepare:test
Bolt第1接受的消息:这是个测试消息!
Bolt第2接受的消息:这是个测试消息!
资源释放
关闭...

到这里,是否是基本上对Storm的运做有些了解了呢。
这个demo达到了上述的三种模式图中的第一种,一个Spout传输数据, 一个Bolt处理数据。

那么若是咱们想达到第二种模式呢,那又该如何作呢?
假如咱们想统计下在一段文本中的单词出现频率的话,咱们只需执行一下步骤就能够了。
1.首先将Spout中的message消息进行更改成数组,并依次将消息发送到TestBolt。
2.而后TestBolt将获取的数据进行分割,将分割的数据发送到TestBolt2。
3.TestBolt2对数据进行统计,在程序关闭的时候进行打印。
4.Topology成功配置而且启动以后,等待20秒左右,关闭程序,而后获得输出的结果。

代码示例以下:

Spout
用于发送消息。

import java.util.Map;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    /**
     * 
    * Title: TestSpout
    * Description:
    * 发送信息
    * Version:1.0.0  
    * @author pancm
    * @date 2018年3月6日
     */
    public class TestSpout extends BaseRichSpout{
    
        private static final long serialVersionUID = 225243592780939490L;
    
        private SpoutOutputCollector collector;
        private static final String field="word";
        private int count=1;
        private String[] message =  {
    "My nickname is xuwujing",
    "My blog address is http://www.panchengming.com/",
    "My interest is playing games"
    };
        
        /**
     * open()方法中是在ISpout接口中定义,在Spout组件初始化时被调用。
     * 有三个参数:
     * 1.Storm配置的Map;
     * 2.topology中组件的信息;
     * 3.发射tuple的方法;
     */
        @Override
        public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
            System.out.println("open:"+map.get("test"));
            this.collector = collector;
        }
    
    /**
     * nextTuple()方法是Spout实现的核心。
     * 也就是主要执行方法,用于输出信息,经过collector.emit方法发射。
     */
        @Override
        public void nextTuple() {
                
            if(count<=message.length){
                System.out.println("第"+count +"次开始发送数据...");
                this.collector.emit(new Values(message[count-1]));
            }
            count++;
        }
    
    
        /**
     * declareOutputFields是在IComponent接口中定义,用于声明数据格式。
     * 即输出的一个Tuple中,包含几个字段。
     */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            System.out.println("定义格式...");
            declarer.declare(new Fields(field));
        }
    
        /**
         * 当一个Tuple处理成功时,会调用这个方法
         */
        @Override
        public void ack(Object obj) {
            System.out.println("ack:"+obj);
        }
        
        /**
         * 当Topology中止时,会调用这个方法
         */
        @Override
        public void close() {
            System.out.println("关闭...");
        }
        
        /**
         * 当一个Tuple处理失败时,会调用这个方法
         */
        @Override
        public void fail(Object obj) {
            System.out.println("失败:"+obj);
        }
        
    }

TestBolt

用于分割单词。

import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    
    /**
     * 
    * Title: TestBolt
    * Description: 
    * 对单词进行分割
    * Version:1.0.0  
    * @author pancm
    * @date 2018年3月16日
     */
    public class TestBolt extends BaseRichBolt{
    
        /**
         * 
         */
        private static final long serialVersionUID = 4743224635827696343L;
        
        private OutputCollector collector;
       
        /**
    * 在Bolt启动前执行,提供Bolt启动环境配置的入口
    * 通常对于不可序列化的对象进行实例化。
    * 注:若是是能够序列化的对象,那么最好是使用构造函数。
    */
        @Override
        public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
            System.out.println("prepare:"+map.get("test"));
            this.collector=collector;
        }
      
        /**
         * execute()方法是Bolt实现的核心。
         * 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。
         */
        @Override
        public void execute(Tuple tuple) {
            String msg=tuple.getStringByField("word");
        System.out.println("开始分割单词:"+msg);
    String[] words = msg.toLowerCase().split(" ");
    for (String word : words) {
    this.collector.emit(new Values(word));//向下一个bolt发射数据
    } 
        
        }
    
        /**
         * 声明数据格式
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("count"));
        }
        
        /**
     * cleanup是IBolt接口中定义,用于释放bolt占用的资源。
     * Storm在终止一个bolt以前会调用这个方法。
         */
        @Override
        public void cleanup() {
            System.out.println("TestBolt的资源释放");
        }
    }

Test2Bolt
用于统计单词出现次数。

import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    
    /**
     * 
    * Title: Test2Bolt
    * Description:
    * 统计单词出现的次数 
    * Version:1.0.0  
    * @author pancm
    * @date 2018年3月16日
     */
    public class Test2Bolt extends BaseRichBolt{
    
        /**
         * 
         */
        private static final long serialVersionUID = 4743224635827696343L;
        
        
        /**
         * 保存单词和对应的计数
         */
        private HashMap<String, Integer> counts = null;
         
        private long count=1;
        /**
    * 在Bolt启动前执行,提供Bolt启动环境配置的入口
    * 通常对于不可序列化的对象进行实例化。
    * 注:若是是能够序列化的对象,那么最好是使用构造函数。
    */
        @Override
        public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
            System.out.println("prepare:"+map.get("test"));
            this.counts=new HashMap<String, Integer>();
        }
      
        /**
         * execute()方法是Bolt实现的核心。
         * 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。
         * 
         */
        @Override
        public void execute(Tuple tuple) {
            String msg=tuple.getStringByField("count");
            System.out.println("第"+count+"次统计单词出现的次数");
            /**
             * 若是不包含该单词,说明在该map是第一次出现
             * 不然进行加1
             */
            if (!counts.containsKey(msg)) {
                counts.put(msg, 1);
            } else {
                counts.put(msg, counts.get(msg)+1);
            }
            count++;
        }
    
        
        /**
     * cleanup是IBolt接口中定义,用于释放bolt占用的资源。
     * Storm在终止一个bolt以前会调用这个方法。
         */
        @Override
        public void cleanup() {
            System.out.println("===========开始显示单词数量============");
            for (Map.Entry<String, Integer> entry : counts.entrySet()) {
                System.out.println(entry.getKey() + ": " + entry.getValue());
            }
            System.out.println("===========结束============");
           System.out.println("Test2Bolt的资源释放");
        }
        
        /**
         * 声明数据格式
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer arg0) {
            
        }
    }

Topology

主程序入口。

import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    /**
     * 
    * Title: App
    * Description:
    * storm测试 
    * Version:1.0.0  
    * @author pancm
    * @date 2018年3月6日
     */
    public class App {
        
        private static final String test_spout="test_spout"; 
        private static final String test_bolt="test_bolt"; 
        private static final String test2_bolt="test2_bolt"; 
    
        public static void main(String[] args)  {
            //定义一个拓扑
            TopologyBuilder builder=new TopologyBuilder();
            //设置一个Executeor(线程),默认一个
            builder.setSpout(test_spout, new TestSpout(),1);
            //shuffleGrouping:表示是随机分组
            //设置一个Executeor(线程),和一个task
            builder.setBolt(test_bolt, new TestBolt(),1).setNumTasks(1).shuffleGrouping(test_spout);
            //fieldsGrouping:表示是按字段分组
            //设置一个Executeor(线程),和一个task
            builder.setBolt(test2_bolt, new Test2Bolt(),1).setNumTasks(1).fieldsGrouping(test_bolt, new Fields("count"));
            Config conf = new Config();
            conf.put("test", "test");
            try{
              //运行拓扑
           if(args !=null&&args.length>0){ //有参数时,表示向集群提交做业,并把第一个参数当作topology名称
             System.out.println("运行远程模式");
                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
          } else{//没有参数时,本地提交
        //启动本地模式
            System.out.println("运行本地模式");
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Word-counts" ,conf,  builder.createTopology() );
        Thread.sleep(20000);
    //  //关闭本地集群
        cluster.shutdown();
          }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

输出结果:

运行本地模式
定义格式...
open:test
第1次开始发送数据...
第2次开始发送数据...
第3次开始发送数据...
prepare:test
prepare:test
开始分割单词:My nickname is xuwujing
开始分割单词:My blog address is http://www.panchengming.com/
开始分割单词:My interest is playing games
第1次统计单词出现的次数
第2次统计单词出现的次数
第3次统计单词出现的次数
第4次统计单词出现的次数
第5次统计单词出现的次数
第6次统计单词出现的次数
第7次统计单词出现的次数
第8次统计单词出现的次数
第9次统计单词出现的次数
第10次统计单词出现的次数
第11次统计单词出现的次数
第12次统计单词出现的次数
第13次统计单词出现的次数
第14次统计单词出现的次数
===========开始显示单词数量============
address: 1
interest: 1
nickname: 1
games: 1
is: 3
xuwujing: 1
playing: 1
my: 3
blog: 1
http://www.panchengming.com/: 1
===========结束============
Test2Bolt的资源释放
TestBolt的资源释放
关闭...

上述的是本地模式运行,若是想在Storm集群中进行使用,只须要将程序打包为jar,而后将程序上传到storm集群中,
输入:

storm jar xxx.jar xxx xxx
说明:第一个xxx是storm程序打包的包名,第二个xxx是运行主程序的路径,第三个xxx则表示主程序输入的参数,这个能够随意。

若是是使用maven打包的话,则须要在pom.xml加上

<plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
              <manifest>
                <mainClass>com.pancm.storm.App</mainClass>
              </manifest>
            </archive>
          </configuration>
      </plugin>

成功运行程序以后,能够在Storm集群的UI界面查看该程序的状态。

到此,本文结束,谢谢阅读! 本篇文章源码地址: https://github.com/xuwujing/java-study

相关文章
相关标签/搜索