本章咱们将会建立一个Storm工程和咱们的第一个Storm topology。html
提示:下述假设你已经安装JRE1.6或者更高级版本。推荐使用Oracle提供的JRE:http://www.java.com/downloads/.java
在开始建立项目以前,了解Storm的操做模式(operation modes)是很重要的。Storm有两种运行方式:git
本地模式github
在本地模式下,Storm topologies 运行在本地机器的一个JVM中。由于本地模式下查看全部topology组件共同工做最为简单,因此这种模式被用于开发、测试和调试。例如,咱们能够调整参数,这使得咱们能够看到咱们的topology在不一样的Storm配置环境中是如何运行的。为了以本地模式运行topologies,咱们须要下载Storm的开发依赖包,这是咱们开发、测试topologies所需的全部东西。当咱们创建本身的第一个Storm工程的时候咱们很快就能够看到是怎么回事了。web
提示:本地模式下运行一个topology同Storm集群中运行相似。然而,确保全部组件线程安全很是重要,由于当它们被部署到远程模式时,它们可能运行在不一样的JVM或者不一样的物理机器上,此时,它们之间不能直接交流或者共享内存。apache
在本章的全部示例中,咱们都以本地模式运行。安全
远程模式架构
在远程模式下,咱们将topology提交到Storm集群中,Storm集群由许多进程组成,这些进程一般运行在不一样的机器上。远程模式下不显示调试信息,这也是它被认为是产品模式的缘由。然而,在一台机器上建立一个Storm集群也是可能的,而且在部署至产品前这样作仍是一个好方法,它能够确保未来在一个成熟的产品环境中运行topology不会出现任何问题。并发
译者的话:所谓产品环境/模式,指的是代码比较成熟,能够当成产品发布了,与开发环境相对。app
在第六章中能够了解到更多关于远程模式的内容,我会在附录B中展现如何安装一个集群。
在这个项目中,咱们会创建一个简单的topology来统计单词个数,咱们能够将它当作是Storm topologies中的“Hello World”。然而,它又是一个很是强大的topology,由于它几乎能够扩展到无限大小,而且通过小小的修改,咱们甚至可使用它建立一个统计系统。例如,咱们能够修改本项目来找到Twitter上的热门话题。
为了创建这个topology,咱们将使用一个spout来负责从文件中读取单词,第一个bolt来标准化单词,第二个bolt去统计单词个数,如图2-1所示:
你能够在https://github.com/storm-book/examples-ch02-getting_started/zipball/master下载本例源码的ZIP文件。
译者的话:本站有备份:http://www.flyne.org/example/storm/storm-book-examples-ch02-getting_started-8e42636.zip
提示:若是你使用git(一个分布式的版本控制和源码管理工具),则能够运行命令:git clone git@github.com:storm-book/examplesch02-getting_started.git进入你想要下载的源码所在的目录。
检查Java安装
搭建环境的第一步就是检查正在运行的Java版本。运行java -version命令,咱们能够看到相似以下信息:
java -version
java version “1.6.0_26″
Java(TM) SE Runtime Environment(build 1.6.0_26-b03)
Java HotSpot(TM) Server VM (build 20.1-b02,mixed mode)
首先,建立一个文件夹,用于存放这个应用(就像对于任何Java应用同样),该文件夹包含了整个项目的源代码。
接着咱们须要下载Storm的依赖包——添加到本应用classpath的jar包集合。能够经过下面两种方式完成:
下载依赖包,解压,并将它们加入classpath路径
使用Apache Maven
提示:Maven是一个软件项目管理工具,能够用于管理一个项目开发周期中的多个方面(从从依赖包到发布构建过程),在本书中咱们会普遍使用Maven。可使用mvn命令检查maven是否安装,若是未安装,能够从http://maven.apache.org/download.html下载。
下一步咱们须要新建一个pom.xml文件(pom:project object model,项目的对象模型)去定义项目的结构,该文件描述了依赖包、封装、源码等等。这里咱们将使用由nathanmarz构建的依赖包和Maven库,这些依赖包能够在https://github.com/nathanmarz/storm/wiki/Maven找到。
提示:Storm的Maven依赖包引用了在本地模式下运行Storm所需的全部函数库。
使用这些依赖包,咱们能够写一个包含运行topology基本的必要组件的pom.xml文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>storm.book</groupId> <artifactId>Getting-Started</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.6</source> <target>1.6</target> <compilerVersion>1.6</compilerVersion> </configuration> </plugin> </plugins> </build> <repositories> <!-- Repository where we can found the storm dependencies --> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <!-- Storm Dependency --> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.6.0</version> </dependency> </dependencies> </project>
前几行指定了项目名称、版本;而后咱们添加了一个编译器插件,该插件告诉Maven咱们的代码应该用Java1.6编译;接着咱们定义库(repository)(Maven支持同一个项目的多个库),clojars是Storm依赖包所在的库,Maven会自动下载本地模式运行Storm须要的全部子依赖包。
本项目的目录结构以下,它是一个典型的Maven Java项目。
java目录下的文件夹包含了咱们的源代码,而且咱们会将咱们的单词文件放到resources文件夹中来处理。
为创建咱们第一个topology,咱们要建立运行本例(统计单词个数)的全部的类。本阶段例子中的有些部分不清楚很正常,咱们将在接下来的几个章节中进一步解释它们。
Spout(WordReader类)
WordReader类实现了IRichSpout接口,该类负责读取文件并将每一行发送到一个bolt中去。
提示:spout发送一个定义字段(field)的列表,这种架构容许你有多种bolt读取相同的spout流,而后这些bolt能够定义字段(field)供其余bolt消费。
例2-1包含WordReader类的完整代码(后面会对代码中的每一个部分进行分析)
例2-1.src/main/java/spouts/WordReader.java
package spouts; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class WordReader implements IRichSpout{ private SpoutOutputCollector collector; private FileReader fileReader; private boolean completed=false; private TopologyContext context; public boolean isDistributed(){return false;} public void ack(Object msgId) { System.out.println("OK:"+msgId); } public void close(){} public void fail(Object msgId) { System.out.println("FAIL:"+msgId); } /** * 该方法用于读取文件并发送文件中的每一行 */ public void nextTuple() { /** * The nextuple it is called forever, so if we have beenreaded the file * we will wait and then return */ if(completed){ try { Thread.sleep(1000); } catch(InterruptedException e) { //Do nothing } return; } String str; //Open the reader BufferedReader reader =new BufferedReader(fileReader); try{ //Read all lines while((str=reader.readLine())!=null){ /** * By each line emmit a new value with the line as a their */ this.collector.emit(new Values(str),str); } }catch(Exception e){ throw new RuntimeException("Errorreading tuple",e); }finally{ completed = true; } } /** * We will create the file and get the collector object */ public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) { try { this.context=context; this.fileReader=new FileReader(conf.get("wordsFile").toString()); } catch(FileNotFoundException e) { throw new RuntimeException("Errorreading file["+conf.get("wordFile")+"]"); } this.collector=collector; } /** * 声明输出字段“line” */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } }
在任何spout中调用的第一个方法都是open()方法,该方法接收3个参数:
TopologyContext:它包含了全部的topology数据
conf对象:在topology定义的时候被建立
SpoutOutputCollector:该类的实例可让咱们发送将被bolts处理的数据。
下面的代码块是open()方法的实现:
public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) { try { this.context=context; this.fileReader=new FileReader(conf.get("wordsFile").toString()); } catch(FileNotFoundException e) { throw new RuntimeException("Errorreading file["+conf.get("wordFile")+"]"); } this.collector=collector; }
在open()方法中,咱们也建立了reader,它负责读文件。接着,咱们须要实现nextTuple()方法,在该方法中发送要被bolt处理的值(values)。在咱们的例子中,这个方法读文件而且每行发送一个值。
public void nextTuple() { if(completed){ try { Thread.sleep(1000); } catch(InterruptedException e) { //Do nothing } return; } String str; //Open the reader BufferedReader reader =new BufferedReader(fileReader); try{ //Read all lines while((str=reader.readLine())!=null){ /** * By each line emmit a new value with the line as a their */ this.collector.emit(new Values(str),str); } }catch(Exception e){ throw new RuntimeException("Errorreading tuple",e); }finally{ completed = true; } }
提示:Values类是ArrayList的一个实现,将列表中的元素传递到构造方法中。
nextTuple()方法被周期性地调用(和ack()、fail()方法相同的循环),当没有工做要作时,nextTuple()方法必须释放对线程的控制,以便其余的方法有机会被调用。所以必须在nextTuple()第一行检查处理是否完成,若是已经完成,在返回前至少应该休眠1秒来下降处理器的负载,若是还有工做要作,则将文件中的每一行读取为一个值并发送出去。
提示:元组(tuple)是一个值的命名列表,它能够是任何类型的Java对象(只要这个对象是能够序列化的)。默认状况下,Storm能够序列化的经常使用类型有strings、byte arrays、ArrayList、HashMap和HashSet。
Bolt(WordNormalizer&WordCounter类)
上面咱们设计了一个spout来读取文件,而且每读取一行发送一个元组(tuple)。如今,咱们须要建立两个bolt处理这些元组(见图2-1)。这些bolt实现了IRichBolt接口。
在bolt中,最重要的方法是execute()方法,每当bolt收到一个元组,该方法就会被调用一次,对于每一个收到的元组,该bolt处理完以后又会发送几个bolt。
提示:一个spout或bolt能够发送多个tuple,当nextTuple()或execute()方法被调用时,它们能够发送0、1或者多个元组。在第五章中你将会了解到更多。
第一个bolt,WordNormalizer,负责接收每一行,而且将行标准化——它将行分解为一个个的单词后转化成小写,而且消除单词先后的空格。
首先,咱们须要声明bolt的输出参数:
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }
这儿,咱们声明bolt发送一个命名为“word”的字段。
接着,咱们实现execute方法,输入的tuple将会在这个方法中被处理:
public void execute(Tuple input) { String sentence = input.getString(0); String[]words= sentence.split(" "); for(String word:words){ word =word.trim(); if(!word.isEmpty()){ word =word.toLowerCase(); //Emit the word List a =new ArrayList(); a.add(input); collector.emit(a,new Values(word)); } } // Acknowledge the tuple collector.ack(input); }
第一行读取元组中的值,能够按照位置或者字段命名读取。值被处理后使用collector对象发送出去。当每一个元组被处理完以后,就会调用collector的ack()方法,代表该tuple成功地被处理。若是tuple不能被处理,则应该调用collector的fail()方法。
例2-2包含这个类的完整代码。
例2-2.src/main/java/bolts/WordNormalizer.java