Storm是Twitter开源的分布式实时大数据处理框架,最先开源于github,从0.9.1版本以后,归于Apache社区,被业界称为实时版Hadoop。随着愈来愈多的场景对Hadoop的MapReduce高延迟没法容忍,好比网站统计、推荐系统、预警系统、金融系统(高频交易、股票)等等,大数据实时处理解决方案(流计算)的应用日趋普遍,目前已经是分布式技术领域最新爆发点,而Storm更是流计算技术中的佼佼者和主流。java
Storm程序再Storm集群中运行的示例图以下:git
为何把Topology单独提出来呢,由于Topology是咱们开发程序主要的用的组件。
Topology和MapReduce很相像。
MapReduce是Map进行获取数据,Reduce进行处理数据。
而Topology则是使用Spout获取数据,Bolt来进行计算。
总的来讲就是一个Topology由一个或者多个的Spout和Bolt组成。github
具体流程是怎么走,能够经过查看下面这张图来进行了解。
示例图:数据库
注:图片来源http://www.tianshouzhi.com/api/tutorials/storm/52。apache
图片有三种模式,解释以下:
第一种比较简单,就是由一个Spout获取数据,而后交给一个Bolt进行处理;
第二种稍微复杂点,由一个Spout获取数据,而后交给一个Bolt进行处理一部分,而后在交给下一个Bolt进行处理其余部分。
第三种则比较复杂,一个Spout能够同时发送数据到多个Bolt,而一个Bolt也能够接受多个Spout或多个Bolt,最终造成多个数据流。可是这种数据流必须是有方向的,有起点和终点,否则会形成死循环,数据永远也处理不完。就是Spout发给Bolt1,Bolt1发给Bolt2,Bolt2又发给了Bolt1,最终造成了一个环状。api
以前已经写过了,这里就不在说明了。
博客地址:http://www.panchengming.com/2018/01/26/pancm70/数组
前面讲了一些Storm概念,可能在理解上不太清楚,那么这里咱们就用一个Hello World代码示例来体验下Storm运做的流程吧。框架
在进行代码开发以前,首先得作好相关的准备。
本项目是使用Maven构建的,使用Storm的版本为1.1.1。
Maven的相关依赖以下:maven
<!--storm相关jar --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.1</version> <scope>provided</scope> </dependency>
在写代码的时候,咱们先来明确要用Storm作什么。
那么第一个程序,就简单的输出下信息。
具体步骤以下:分布式
那么首先开始编写Spout类。通常是实现 IRichSpout 或继承BaseRichSpout该类,而后实现该方法。
这里咱们继承BaseRichSpout这个类,该类须要实现这几个主要的方法:
open()方法中是在ISpout接口中定义,在Spout组件初始化时被调用。
有三个参数,它们的做用分别是:
代码示例:
@Override public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) { System.out.println("open:"+map.get("test")); this.collector = collector; }
nextTuple()方法是Spout实现的核心。
也就是主要执行方法,用于输出信息,经过collector.emit
方法发射。
这里咱们的数据信息已经写死了,因此这里咱们就直接将数据进行发送。
这里设置只发送两次。
代码示例:
@Override public void nextTuple() { if(count<=2){ System.out.println("第"+count+"次开始发送数据..."); this.collector.emit(new Values(message)); } count++; }
declareOutputFields是在IComponent接口中定义,用于声明数据格式。
即输出的一个Tuple中,包含几个字段。
由于这里咱们只发射一个,因此就指定一个。若是是多个,则用逗号隔开。
代码示例:
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("定义格式..."); declarer.declare(new Fields(field)); }
ack是在ISpout接口中定义,用于表示Tuple处理成功。
代码示例:
@Override public void ack(Object obj) { System.out.println("ack:"+obj); }
fail是在ISpout接口中定义,用于表示Tuple处理失败。
代码示例:
@Override public void fail(Object obj) { System.out.println("失败:"+obj); }
close是在ISpout接口中定义,用于表示Topology中止。
代码示例:
@Override public void close() { System.out.println("关闭..."); }
至于还有其余的,这里就不在一一列举了。
Bolt是用于处理数据的组件,主要是由execute方法来进行实现。通常来讲须要实现 IRichBolt 或继承BaseRichBolt该类,而后实现其方法。
须要实现方法以下:
在Bolt启动前执行,提供Bolt启动环境配置的入口。
参数基本和Sqout同样。
通常对于不可序列化的对象进行实例化。
这里的咱们就简单的打印下
@Override public void prepare(Map map, TopologyContext arg1, OutputCollector collector) { System.out.println("prepare:"+map.get("test")); this.collector=collector; }
注:若是是能够序列化的对象,那么最好是使用构造函数。
execute()方法是Bolt实现的核心。
也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。
从tuple中获取消息可使用 tuple.getString()
和tuple.getStringByField();
这两个方法。我的推荐第二种,能够经过field来指定接收的消息。
注:若是继承的是IRichBolt,则须要手动ack。这里就不用了,BaseRichBolt会自动帮咱们应答。
代码示例:
@Override public void execute(Tuple tuple) { // String msg=tuple.getString(0); String msg=tuple.getStringByField("test"); //这里咱们就不作消息的处理,只打印 System.out.println("Bolt第"+count+"接受的消息:"+msg); count++; /** * * 没次调用处理一个输入的tuple,全部的tuple都必须在必定时间内应答。 * 能够是ack或者fail。不然,spout就会重发tuple。 */ // collector.ack(tuple); }
和Spout的同样。
由于到了这里就再也不输出了,因此就什么都没写。
@Override public void declareOutputFields(OutputFieldsDeclarer arg0) { }
cleanup是IBolt接口中定义,用于释放bolt占用的资源。
Storm在终止一个bolt以前会调用这个方法。
由于这里没有什么资源须要释放,因此就简单的打印一句就好了。
@Override public void cleanup() { System.out.println("资源释放"); }
这里咱们就是用main方法进行提交topology。
不过在提交topology以前,须要进行相应的设置。
这里我就不一一细说了,代码的注释已经很详细了。
代码示例:
import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; /** * * Title: App * Description: * storm测试 * Version:1.0.0 * @author pancm * @date 2018年3月6日 */ public class App { private static final String str1="test1"; private static final String str2="test2"; public static void main(String[] args) { // TODO Auto-generated method stub //定义一个拓扑 TopologyBuilder builder=new TopologyBuilder(); //设置一个Executeor(线程),默认一个 builder.setSpout(str1, new TestSpout()); //设置一个Executeor(线程),和一个task builder.setBolt(str2, new TestBolt(),1).setNumTasks(1).shuffleGrouping(str1); Config conf = new Config(); conf.put("test", "test"); try{ //运行拓扑 if(args !=null&&args.length>0){ //有参数时,表示向集群提交做业,并把第一个参数当作topology名称 System.out.println("远程模式"); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else{//没有参数时,本地提交 //启动本地模式 System.out.println("本地模式"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("111" ,conf, builder.createTopology() ); Thread.sleep(10000); // 关闭本地集群 cluster.shutdown(); } }catch (Exception e){ e.printStackTrace(); } } }
运行该方法,输出结果以下:
本地模式 定义格式... open:test 第1次开始发送数据... 第2次开始发送数据... prepare:test Bolt第1接受的消息:这是个测试消息! Bolt第2接受的消息:这是个测试消息! 资源释放 关闭...
到这里,是否是基本上对Storm的运做有些了解了呢。
这个demo达到了上述的三种模式图中的第一种,一个Spout传输数据, 一个Bolt处理数据。
那么若是咱们想达到第二种模式呢,那又该如何作呢?
假如咱们想统计下在一段文本中的单词出现频率的话,咱们只需执行一下步骤就能够了。
1.首先将Spout中的message消息进行更改成数组,并依次将消息发送到TestBolt。
2.而后TestBolt将获取的数据进行分割,将分割的数据发送到TestBolt2。
3.TestBolt2对数据进行统计,在程序关闭的时候进行打印。
4.Topology成功配置而且启动以后,等待20秒左右,关闭程序,而后获得输出的结果。
代码示例以下:
Spout
用于发送消息。
import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; /** * * Title: TestSpout * Description: * 发送信息 * Version:1.0.0 * @author pancm * @date 2018年3月6日 */ public class TestSpout extends BaseRichSpout{ private static final long serialVersionUID = 225243592780939490L; private SpoutOutputCollector collector; private static final String field="word"; private int count=1; private String[] message = { "My nickname is xuwujing", "My blog address is http://www.panchengming.com/", "My interest is playing games" }; /** * open()方法中是在ISpout接口中定义,在Spout组件初始化时被调用。 * 有三个参数: * 1.Storm配置的Map; * 2.topology中组件的信息; * 3.发射tuple的方法; */ @Override public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) { System.out.println("open:"+map.get("test")); this.collector = collector; } /** * nextTuple()方法是Spout实现的核心。 * 也就是主要执行方法,用于输出信息,经过collector.emit方法发射。 */ @Override public void nextTuple() { if(count<=message.length){ System.out.println("第"+count +"次开始发送数据..."); this.collector.emit(new Values(message[count-1])); } count++; } /** * declareOutputFields是在IComponent接口中定义,用于声明数据格式。 * 即输出的一个Tuple中,包含几个字段。 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("定义格式..."); declarer.declare(new Fields(field)); } /** * 当一个Tuple处理成功时,会调用这个方法 */ @Override public void ack(Object obj) { System.out.println("ack:"+obj); } /** * 当Topology中止时,会调用这个方法 */ @Override public void close() { System.out.println("关闭..."); } /** * 当一个Tuple处理失败时,会调用这个方法 */ @Override public void fail(Object obj) { System.out.println("失败:"+obj); } }
TestBolt
用于分割单词。
import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * * Title: TestBolt * Description: * 对单词进行分割 * Version:1.0.0 * @author pancm * @date 2018年3月16日 */ public class TestBolt extends BaseRichBolt{ /** * */ private static final long serialVersionUID = 4743224635827696343L; private OutputCollector collector; /** * 在Bolt启动前执行,提供Bolt启动环境配置的入口 * 通常对于不可序列化的对象进行实例化。 * 注:若是是能够序列化的对象,那么最好是使用构造函数。 */ @Override public void prepare(Map map, TopologyContext arg1, OutputCollector collector) { System.out.println("prepare:"+map.get("test")); this.collector=collector; } /** * execute()方法是Bolt实现的核心。 * 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。 */ @Override public void execute(Tuple tuple) { String msg=tuple.getStringByField("word"); System.out.println("开始分割单词:"+msg); String[] words = msg.toLowerCase().split(" "); for (String word : words) { this.collector.emit(new Values(word));//向下一个bolt发射数据 } } /** * 声明数据格式 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("count")); } /** * cleanup是IBolt接口中定义,用于释放bolt占用的资源。 * Storm在终止一个bolt以前会调用这个方法。 */ @Override public void cleanup() { System.out.println("TestBolt的资源释放"); } }
Test2Bolt
用于统计单词出现次数。
import java.util.HashMap; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; /** * * Title: Test2Bolt * Description: * 统计单词出现的次数 * Version:1.0.0 * @author pancm * @date 2018年3月16日 */ public class Test2Bolt extends BaseRichBolt{ /** * */ private static final long serialVersionUID = 4743224635827696343L; /** * 保存单词和对应的计数 */ private HashMap<String, Integer> counts = null; private long count=1; /** * 在Bolt启动前执行,提供Bolt启动环境配置的入口 * 通常对于不可序列化的对象进行实例化。 * 注:若是是能够序列化的对象,那么最好是使用构造函数。 */ @Override public void prepare(Map map, TopologyContext arg1, OutputCollector collector) { System.out.println("prepare:"+map.get("test")); this.counts=new HashMap<String, Integer>(); } /** * execute()方法是Bolt实现的核心。 * 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。 * */ @Override public void execute(Tuple tuple) { String msg=tuple.getStringByField("count"); System.out.println("第"+count+"次统计单词出现的次数"); /** * 若是不包含该单词,说明在该map是第一次出现 * 不然进行加1 */ if (!counts.containsKey(msg)) { counts.put(msg, 1); } else { counts.put(msg, counts.get(msg)+1); } count++; } /** * cleanup是IBolt接口中定义,用于释放bolt占用的资源。 * Storm在终止一个bolt以前会调用这个方法。 */ @Override public void cleanup() { System.out.println("===========开始显示单词数量============"); for (Map.Entry<String, Integer> entry : counts.entrySet()) { System.out.println(entry.getKey() + ": " + entry.getValue()); } System.out.println("===========结束============"); System.out.println("Test2Bolt的资源释放"); } /** * 声明数据格式 */ @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { } }
Topology
主程序入口。
import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; /** * * Title: App * Description: * storm测试 * Version:1.0.0 * @author pancm * @date 2018年3月6日 */ public class App { private static final String test_spout="test_spout"; private static final String test_bolt="test_bolt"; private static final String test2_bolt="test2_bolt"; public static void main(String[] args) { //定义一个拓扑 TopologyBuilder builder=new TopologyBuilder(); //设置一个Executeor(线程),默认一个 builder.setSpout(test_spout, new TestSpout(),1); //shuffleGrouping:表示是随机分组 //设置一个Executeor(线程),和一个task builder.setBolt(test_bolt, new TestBolt(),1).setNumTasks(1).shuffleGrouping(test_spout); //fieldsGrouping:表示是按字段分组 //设置一个Executeor(线程),和一个task builder.setBolt(test2_bolt, new Test2Bolt(),1).setNumTasks(1).fieldsGrouping(test_bolt, new Fields("count")); Config conf = new Config(); conf.put("test", "test"); try{ //运行拓扑 if(args !=null&&args.length>0){ //有参数时,表示向集群提交做业,并把第一个参数当作topology名称 System.out.println("运行远程模式"); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else{//没有参数时,本地提交 //启动本地模式 System.out.println("运行本地模式"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Word-counts" ,conf, builder.createTopology() ); Thread.sleep(20000); // //关闭本地集群 cluster.shutdown(); } }catch (Exception e){ e.printStackTrace(); } } }
输出结果:
运行本地模式 定义格式... open:test 第1次开始发送数据... 第2次开始发送数据... 第3次开始发送数据... prepare:test prepare:test 开始分割单词:My nickname is xuwujing 开始分割单词:My blog address is http://www.panchengming.com/ 开始分割单词:My interest is playing games 第1次统计单词出现的次数 第2次统计单词出现的次数 第3次统计单词出现的次数 第4次统计单词出现的次数 第5次统计单词出现的次数 第6次统计单词出现的次数 第7次统计单词出现的次数 第8次统计单词出现的次数 第9次统计单词出现的次数 第10次统计单词出现的次数 第11次统计单词出现的次数 第12次统计单词出现的次数 第13次统计单词出现的次数 第14次统计单词出现的次数 ===========开始显示单词数量============ address: 1 interest: 1 nickname: 1 games: 1 is: 3 xuwujing: 1 playing: 1 my: 3 blog: 1 http://www.panchengming.com/: 1 ===========结束============ Test2Bolt的资源释放 TestBolt的资源释放 关闭...
上述的是本地模式运行,若是想在Storm集群中进行使用,只须要将程序打包为jar,而后将程序上传到storm集群中,
输入:
storm jar xxx.jar xxx xxx
说明:第一个xxx是storm程序打包的包名,第二个xxx是运行主程序的路径,第三个xxx则表示主程序输入的参数,这个能够随意。
若是是使用maven打包的话,则须要在pom.xml加上
<plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.pancm.storm.App</mainClass> </manifest> </archive> </configuration> </plugin>
成功运行程序以后,能够在Storm集群的UI界面查看该程序的状态。
到此,本文结束,谢谢阅读! 本篇文章源码地址: https://github.com/xuwujing/java-study