2、Storm入门之Hello Storm

本章咱们将会建立一个Storm工程和咱们的第一个Storm topology。html

提示:下述假设你已经安装JRE1.6或者更高级版本。推荐使用Oracle提供的JRE:http://www.java.com/downloads/.java

操做模式

在开始建立项目以前,了解Storm的操做模式(operation modes)是很重要的。Storm有两种运行方式:git

本地模式github

在本地模式下,Storm topologies 运行在本地机器的一个JVM中。由于本地模式下查看全部topology组件共同工做最为简单,因此这种模式被用于开发、测试和调试。例如,咱们能够调整参数,这使得咱们能够看到咱们的topology在不一样的Storm配置环境中是如何运行的。为了以本地模式运行topologies,咱们须要下载Storm的开发依赖包,这是咱们开发、测试topologies所需的全部东西。当咱们创建本身的第一个Storm工程的时候咱们很快就能够看到是怎么回事了。web

提示:本地模式下运行一个topology同Storm集群中运行相似。然而,确保全部组件线程安全很是重要,由于当它们被部署到远程模式时,它们可能运行在不一样的JVM或者不一样的物理机器上,此时,它们之间不能直接交流或者共享内存。apache

在本章的全部示例中,咱们都以本地模式运行。安全

远程模式架构

在远程模式下,咱们将topology提交到Storm集群中,Storm集群由许多进程组成,这些进程一般运行在不一样的机器上。远程模式下不显示调试信息,这也是它被认为是产品模式的缘由。然而,在一台机器上建立一个Storm集群也是可能的,而且在部署至产品前这样作仍是一个好方法,它能够确保未来在一个成熟的产品环境中运行topology不会出现任何问题。并发

译者的话:所谓产品环境/模式,指的是代码比较成熟,能够当成产品发布了,与开发环境相对。app

在第六章中能够了解到更多关于远程模式的内容,我会在附录B中展现如何安装一个集群。

Hello World Storm

在这个项目中,咱们会创建一个简单的topology来统计单词个数,咱们能够将它当作是Storm topologies中的“Hello World”。然而,它又是一个很是强大的topology,由于它几乎能够扩展到无限大小,而且通过小小的修改,咱们甚至可使用它建立一个统计系统。例如,咱们能够修改本项目来找到Twitter上的热门话题。

为了创建这个topology,咱们将使用一个spout来负责从文件中读取单词,第一个bolt来标准化单词,第二个bolt去统计单词个数,如图2-1所示:

topology入门图2-1.topology入门

你能够在https://github.com/storm-book/examples-ch02-getting_started/zipball/master下载本例源码的ZIP文件。

译者的话:本站有备份:http://www.flyne.org/example/storm/storm-book-examples-ch02-getting_started-8e42636.zip

提示:若是你使用git(一个分布式的版本控制和源码管理工具),则能够运行命令:git clone git@github.com:storm-book/examplesch02-getting_started.git进入你想要下载的源码所在的目录。

检查Java安装

搭建环境的第一步就是检查正在运行的Java版本。运行java -version命令,咱们能够看到相似以下信息:

java -version
java version “1.6.0_26″
Java(TM) SE Runtime Environment(build 1.6.0_26-b03)
Java HotSpot(TM) Server VM (build 20.1-b02,mixed mode)

建立项目

首先,建立一个文件夹,用于存放这个应用(就像对于任何Java应用同样),该文件夹包含了整个项目的源代码。

接着咱们须要下载Storm的依赖包——添加到本应用classpath的jar包集合。能够经过下面两种方式完成:

  • 下载依赖包,解压,并将它们加入classpath路径

  • 使用Apache Maven

提示:Maven是一个软件项目管理工具,能够用于管理一个项目开发周期中的多个方面(从从依赖包到发布构建过程),在本书中咱们会普遍使用Maven。可使用mvn命令检查maven是否安装,若是未安装,能够从http://maven.apache.org/download.html下载。

下一步咱们须要新建一个pom.xml文件(pom:project object model,项目的对象模型)去定义项目的结构,该文件描述了依赖包、封装、源码等等。这里咱们将使用由nathanmarz构建的依赖包和Maven库,这些依赖包能够在https://github.com/nathanmarz/storm/wiki/Maven找到。

提示:Storm的Maven依赖包引用了在本地模式下运行Storm所需的全部函数库。

使用这些依赖包,咱们能够写一个包含运行topology基本的必要组件的pom.xml文件:

<project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
    <modelVersion>4.0.0</modelVersion>
    <groupId>storm.book</groupId>
    <artifactId>Getting-Started</artifactId>
    <version>0.0.1-SNAPSHOT</version>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <compilerVersion>1.6</compilerVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>
 
    <repositories>
        <!-- Repository where we can found the storm dependencies -->
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>
 
    <dependencies>
        <!-- Storm Dependency -->
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm</artifactId>
            <version>0.6.0</version>
        </dependency>
    </dependencies>
 
</project>

前几行指定了项目名称、版本;而后咱们添加了一个编译器插件,该插件告诉Maven咱们的代码应该用Java1.6编译;接着咱们定义库(repository)(Maven支持同一个项目的多个库),clojars是Storm依赖包所在的库,Maven会自动下载本地模式运行Storm须要的全部子依赖包。

本项目的目录结构以下,它是一个典型的Maven Java项目。

application structure

java目录下的文件夹包含了咱们的源代码,而且咱们会将咱们的单词文件放到resources文件夹中来处理。

建立第一个topology

为创建咱们第一个topology,咱们要建立运行本例(统计单词个数)的全部的类。本阶段例子中的有些部分不清楚很正常,咱们将在接下来的几个章节中进一步解释它们。

Spout(WordReader类)

WordReader类实现了IRichSpout接口,该类负责读取文件并将每一行发送到一个bolt中去。

提示:spout发送一个定义字段(field)的列表,这种架构容许你有多种bolt读取相同的spout流,而后这些bolt能够定义字段(field)供其余bolt消费。

例2-1包含WordReader类的完整代码(后面会对代码中的每一个部分进行分析)

例2-1.src/main/java/spouts/WordReader.java

package spouts;
 
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
 
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
 
public class WordReader implements IRichSpout{
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed=false;
    private TopologyContext context;
 
    public boolean isDistributed(){return false;}
 
    public void ack(Object msgId) {
        System.out.println("OK:"+msgId);
    }
 
    public void close(){}
 
    public void fail(Object msgId) {
        System.out.println("FAIL:"+msgId);
    }
 
    /**
    * 该方法用于读取文件并发送文件中的每一行
    */
 
    public void nextTuple() {
    /**
    * The nextuple it is called forever, so if we have beenreaded the file
    * we will wait and then return
    */
        if(completed){
            try {
            Thread.sleep(1000);
            } catch(InterruptedException e) {
            //Do nothing
            }
            return;
        }
        String str;
        //Open the reader
        BufferedReader reader =new BufferedReader(fileReader);
        try{
        //Read all lines
            while((str=reader.readLine())!=null){
        /**
        * By each line emmit a new value with the line as a their
        */
                this.collector.emit(new Values(str),str);
            }
        }catch(Exception e){
            throw new RuntimeException("Errorreading tuple",e);
        }finally{
            completed = true;
        }
    }
 
    /**
    * We will create the file and get the collector object
    */
 
    public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) {
        try {
            this.context=context;
            this.fileReader=new FileReader(conf.get("wordsFile").toString());
        } catch(FileNotFoundException e) {
            throw new RuntimeException("Errorreading file["+conf.get("wordFile")+"]");
        }
        this.collector=collector;
    }
 
    /**
    * 声明输出字段“line”
    */
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
 
}

在任何spout中调用的第一个方法都是open()方法,该方法接收3个参数:

  • TopologyContext:它包含了全部的topology数据

  • conf对象:在topology定义的时候被建立

  • SpoutOutputCollector:该类的实例可让咱们发送将被bolts处理的数据。

下面的代码块是open()方法的实现:

public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) {
    try {
        this.context=context;
        this.fileReader=new FileReader(conf.get("wordsFile").toString());
    } catch(FileNotFoundException e) {
        throw new RuntimeException("Errorreading file["+conf.get("wordFile")+"]");
    }
    this.collector=collector;
}

在open()方法中,咱们也建立了reader,它负责读文件。接着,咱们须要实现nextTuple()方法,在该方法中发送要被bolt处理的值(values)。在咱们的例子中,这个方法读文件而且每行发送一个值。

public void nextTuple() {
    if(completed){
        try {
        Thread.sleep(1000);
        } catch(InterruptedException e) {
        //Do nothing
        }
        return;
    }
    String str;
    //Open the reader
    BufferedReader reader =new BufferedReader(fileReader);
    try{
    //Read all lines
        while((str=reader.readLine())!=null){
    /**
    * By each line emmit a new value with the line as a their
    */
            this.collector.emit(new Values(str),str);
        }
    }catch(Exception e){
        throw new RuntimeException("Errorreading tuple",e);
    }finally{
        completed = true;
    }
}

 

提示:Values类是ArrayList的一个实现,将列表中的元素传递到构造方法中。

nextTuple()方法被周期性地调用(和ack()、fail()方法相同的循环),当没有工做要作时,nextTuple()方法必须释放对线程的控制,以便其余的方法有机会被调用。所以必须在nextTuple()第一行检查处理是否完成,若是已经完成,在返回前至少应该休眠1秒来下降处理器的负载,若是还有工做要作,则将文件中的每一行读取为一个值并发送出去。

提示:元组(tuple)是一个值的命名列表,它能够是任何类型的Java对象(只要这个对象是能够序列化的)。默认状况下,Storm能够序列化的经常使用类型有strings、byte arrays、ArrayList、HashMap和HashSet。

 

Bolt(WordNormalizer&WordCounter类)

上面咱们设计了一个spout来读取文件,而且每读取一行发送一个元组(tuple)。如今,咱们须要建立两个bolt处理这些元组(见图2-1)。这些bolt实现了IRichBolt接口。

在bolt中,最重要的方法是execute()方法,每当bolt收到一个元组,该方法就会被调用一次,对于每一个收到的元组,该bolt处理完以后又会发送几个bolt。

提示:一个spout或bolt能够发送多个tuple,当nextTuple()或execute()方法被调用时,它们能够发送0、1或者多个元组。在第五章中你将会了解到更多。

第一个bolt,WordNormalizer,负责接收每一行,而且将行标准化——它将行分解为一个个的单词后转化成小写,而且消除单词先后的空格。

首先,咱们须要声明bolt的输出参数:

public void declareOutputFields(OutputFieldsDeclarer declarer) { 
         declarer.declare(new Fields("word")); 
}

这儿,咱们声明bolt发送一个命名为“word”的字段。

接着,咱们实现execute方法,输入的tuple将会在这个方法中被处理:

public void execute(Tuple input) {
    String sentence = input.getString(0);
    String[]words= sentence.split(" ");
    for(String word:words){
        word =word.trim();
        if(!word.isEmpty()){
            word =word.toLowerCase();
            //Emit the word
            List a =new ArrayList();
            a.add(input);
            collector.emit(a,new Values(word));
        }
    }
    // Acknowledge the tuple
    collector.ack(input);
}

 

第一行读取元组中的值,能够按照位置或者字段命名读取。值被处理后使用collector对象发送出去。当每一个元组被处理完以后,就会调用collector的ack()方法,代表该tuple成功地被处理。若是tuple不能被处理,则应该调用collector的fail()方法。

例2-2包含这个类的完整代码。

例2-2.src/main/java/bolts/WordNormalizer.java

下一页

相关文章
相关标签/搜索