前面说过了Storm的测试项目,那么此时咱们更想本身写一个小项目来练练手,首先咱们本身的Windows系统上首先应该安装好maven,而后启动Eclipse for JavaEE版本,接下来开始创建项目并开发java
注意,在开发过程当中,不管是Windows仍是Linux都要彻底关闭防火墙,避免网络的问题apache
单击"File"->"New"->"Maven Project"数组
接下来的界面默认便可,单击Next服务器
下一步,继续单击Next便可网络
而后,在Group Id输入:org.apache.storm 在Artifact Id输入:firststorm 这里能够本身定义,在Version中输入版本号:0.9.6,这里其实默认0.1.0没有问题,这个和storm的版本号没有任何关系,这里是咱们项目的版本号,由于只是测试,输入0.9.6是为了更简单;Package包名会自动根据输入生成,咱们默认便可,而后单击Finish,稍等右下角滚动条滚动完毕,一个基本的Maven项目就创建成功了,具体结构和上一个测试案例相同,这时在包org.apache.storm.firststorm下有一个默认的类App.java,由Maven自动生成,这个能够忽略,也能够删除dom
而后打开项目根目录下的pom.xml文件,这个就是构建项目的配置文件,咱们在dependencies标签之间,添加一个节点,代码以下:maven
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.6</version> <scope>provided</scope> </dependency>
加入位置以下图所示,其余的不用动便可ide
最终pom.xml的代码以下:oop
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 3 <modelVersion>4.0.0</modelVersion> 4 5 <groupId>org.apache.storm</groupId> 6 <artifactId>firststorm</artifactId> 7 <version>0.9.6</version> 8 <packaging>jar</packaging> 9 10 11 <name>firststorm</name> 12 <url>http://maven.apache.org</url> 13 14 <properties> 15 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 16 </properties> 17 18 <dependencies> 19 <dependency> 20 <groupId>junit</groupId> 21 <artifactId>junit</artifactId> 22 <version>3.8.1</version> 23 <scope>test</scope> 24 </dependency> 25 26 <dependency> 27 <groupId>org.apache.storm</groupId> 28 <artifactId>storm-core</artifactId> 29 <version>0.9.6</version> 30 <scope>provided</scope> 31 </dependency> 32 </dependencies> 33 </project>
更简单的方法咱们能够直接复制上一个案例中的pom.xml文件直接使用,如今咱们保存pom.xml文件,保存的时候maven会自动下载相关依赖并放到Maven Dependencies下,这些jar包能够点击下拉查看,而且会自动添加到项目classpath中,做为编译使用,等jar包所有下载完毕,如今开始编写具体的计算逻辑了,在这个项目中咱们把全部的类都创建在包org.apache.storm.firststorm下测试
首先创建RandomSpout类做为数据源,而且继承于父类BaseRichSpout,肯定后能够看到系统自动补全3个方法:nextTuple,open和declareOutputFields
咱们如今就须要重写这3个方法,open方法是数据源的初始化,nextTuple的做用是把Tuple发送至下游,declareOutputFields用来定义输出字段,下面咱们手动分配一个数组,而且随机取里面的元素,代码以下:
1 package org.apache.storm.firststorm; 2 3 import java.util.Map; 4 import java.util.Random; 5 6 import backtype.storm.spout.SpoutOutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.topology.base.BaseRichSpout; 10 import backtype.storm.tuple.Fields; 11 import backtype.storm.tuple.Values; 12 13 public class RandomSpout extends BaseRichSpout { 14 15 private SpoutOutputCollector collector; 16 private static String[] words = {"Hadoop","Storm","Apache","Linux","Nginx","Tomcat","Spark"}; 17 18 19 public void nextTuple() { 20 String word = words[new Random().nextInt(words.length)]; 21 collector.emit(new Values(word)); 22 23 } 24 25 public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) { 26 this.collector = arg2; 27 } 28 29 public void declareOutputFields(OutputFieldsDeclarer arg0) { 30 arg0.declare(new Fields("randomstring")); 31 } 32 33 }
代码很简单,确定能够看懂,而后新建一个类SenqueceBolt,继承于BaseBasicBolt类,而且重写方法execute和declareOutputFields,这个类就是用于执行具体的做业,准确的说是execute方法用来执行相关的计算,这里只是简单的输出,代码以下:
1 package org.apache.storm.firststorm; 2 3 import backtype.storm.topology.BasicOutputCollector; 4 import backtype.storm.topology.OutputFieldsDeclarer; 5 import backtype.storm.topology.base.BaseBasicBolt; 6 import backtype.storm.tuple.Tuple; 7 8 public class SenqueceBolt extends BaseBasicBolt { 9 10 public void execute(Tuple arg0, BasicOutputCollector arg1) { 11 String word = (String) arg0.getValue(0); 12 String out = "Hello " + word + "!"; 13 System.out.println(out); 14 } 15 16 public void declareOutputFields(OutputFieldsDeclarer arg0) { 17 18 } 19 20 }
最后创建一个类FirstStorm,这个类是主类,在main方法中定义Topology,而且综合设置Spout和Bolt,从而调用其中的方法,这里流式计算时间设置为30s,代码以下:
1 package org.apache.storm.firststorm; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.generated.AlreadyAliveException; 7 import backtype.storm.generated.InvalidTopologyException; 8 import backtype.storm.topology.TopologyBuilder; 9 import backtype.storm.utils.Utils; 10 11 public class FirstStorm { 12 13 public static void main(String[] args) { 14 TopologyBuilder builder = new TopologyBuilder(); 15 builder.setSpout("spout", new RandomSpout()); 16 builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout"); 17 Config conf = new Config(); 18 conf.setDebug(false); 19 if(args != null && args.length > 0) { 20 conf.setNumWorkers(3); 21 try { 22 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 23 } catch (AlreadyAliveException e) { 24 // TODO Auto-generated catch block 25 e.printStackTrace(); 26 } catch (InvalidTopologyException e) { 27 // TODO Auto-generated catch block 28 e.printStackTrace(); 29 } 30 } else { 31 LocalCluster cluster = new LocalCluster(); 32 cluster.submitTopology("firststorm", conf, builder.createTopology()); 33 Utils.sleep(30000); 34 cluster.killTopology("firststorm"); 35 cluster.shutdown(); 36 } 37 } 38 39 }
到这里一个简单的storm项目就开发完毕了,而后能够用本地模式运行,跑起来以后某一时刻输出结果以下:
接下来咱们将这个项目放到Storm服务器集群中运行,这里不要把Storm的jar包加进来,由于运行的时候,Storm环境会自动加载并协调集群运行,方法有不少,可使用插件上传,也可使用本地Storm客户端配置一下numbus.host进行提交,也能够在服务器节点上执行,执行后nimbus会获得任务并分发给各个supervisor去执行,首先咱们应该将项目打包,右击项目,选择Export
而后导出类型选择Java下的JAR file,点击Next
而后单击Brower肯定输出位置和文件名或者直接在输入框输入jar包的名称,而后单击Finish完成打包
打包以后咱们能够在输出位置看见一个jar文件
而后咱们将这个文件上传到服务器,这里上传到了storm安装目录下,而后这个时候在主节点storm安装目录下执行: bin/storm nimbus & 在从节点目录下分别执行 bin/storm supervisor & 启动整个集群的storm服务,也能够执行 bin/storm ui & 启动UI管理界面更直观的看到执行结果,固然对于单机环境启动或者不启动storm服务均可以,这个时候,执行下面命令运行本次项目的程序:
bin/storm jar firststorm.jar org.apache.storm.firststorm.FirstStorm
这里就是调用了FirstStorm类中的main方法,若是程序中对参数进行了处理,后面还能够跟上参数,回车确认执行以后,系统会进行初始化集群的工做,几秒后任务开始执行,执行过程当中某一时刻的滚动输出以下:
到这里,第一个Storm入门项目的开发和测试运行都完毕了,更复杂的计算逻辑模式也基本相同,主要就是Maven项目中出现了更复杂的模块和调用,整个运行的流程其实都是差很少的,如今就算步入Storm流式计算的殿堂的大门了,接下来的精彩还须要慢慢体会