Storm笔记整理(三):Storm集群安装部署与Topology做业提交

[TOC]html


Storm分布式集群安装部署

概述

Storm笔记整理(三):Storm集群安装部署与Topology做业提交

Storm集群表面相似Hadoop集群。但在Hadoop上你运行的是”MapReduce jobs”,在Storm上你运行的是”topologies”。”Jobs”和”topologies”是大不一样的,一个关键不一样是一个MapReduce的Job最终会结束,而一个topology永远处理消息(或直到你kill它)。java

Storm集群有两种节点:控制(master)节点和工做者(worker)节点。控制节点运行一个称之为”Nimbus”的后台程序,它相似于Haddop的”JobTracker”。Nimbus负责在集群范围内分发代码、为worker分配任务和故障监测。apache

注意:Hadoop 2.0之前使用JobTrack来进行Job的分发,但2.x以后就使用了全新的资源调度框架,即yarn,这点尤为须要注意。api

每一个工做者节点运行一个称之”Supervisor”的后台程序。Supervisor监听分配给它所在机器的工做,基于Nimbus分配给它的事情来决定启动或中止工做者进程。每一个工做者进程执行一个topology的子集(也就是一个子拓扑结构);一个运行中的topology由许多跨多个机器的工做者进程组成。bash

一个Zookeeper集群负责Nimbus和多个Supervisor之间的全部协调工做(一个完整的拓扑可能被分为多个子拓扑并由多个supervisor完成)。app

此外,Nimbus后台程序和Supervisor后台程序都是快速失败(fail-fast)和无状态的;全部状态维持在Zookeeper或本地磁盘。这意味着你能够kill -9杀掉nimbus进程和supervisor进程,而后重启,它们将恢复状态并继续工做,就像什么也没发生。这种设计使storm极其稳定。这种设计中Master并无直接和worker通讯,而是借助一个中介Zookeeper,这样一来能够分离master和worker的依赖,将状态信息存放在zookeeper集群内以快速恢复任何失败的一方。框架

集群安装

能够参考官方文档:http://storm.apache.org/releases/1.0.6/Setting-up-a-Storm-cluster.htmlmaven

官方文档对于配置中的解释是很是清晰明了和容易理解的。分布式

下载地址:https://storm.apache.org/downloads.html
须要确保已经安装好了zookeeper环境,在个人环境中已经搭建好了zookeeper集群环境。

1.解压
[uplooking@uplooking01 soft]$ tar -zxvf apache-storm-1.0.2.tar.gz -C ../app/
[uplooking@uplooking01 app]$ mv apache-storm-1.0.2/ storm

2.修改配置文件
# storm-env.sh
export JAVA_HOME=/opt/jdk
export STORM_CONF_DIR="/home/uplooking/app/storm/conf"

# storm.yaml
storm.zookeeper.servers:
    - "uplooking01"
    - "uplooking02"
    - "uplooking03"

nimbus.seeds: ["uplooking01", "uplooking02"]

storm.local.dir: "/home/uplooking/data/storm"
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

3.建立storm.local.dir
mkdir -p /home/uplooing/data/storm

4.配置环境变量
# .bash_profile
export STORM_HOME=/home/uplooking/app/storm
export PATH=$PATH:$STORM_HOME/bin
# 将其同步到其它节点
scp .bash_profile uplooking@uplooking02:/home/uplooking
scp .bash_profile uplooking@uplooking03:/home/uplooking

5.复制storm安装目录到其它节点
scp -r storm/ uplooking@uplooking02:/home/uplooking/app
scp -r storm/ uplooking@uplooking03:/home/uplooking/app

6.启动storm集群
# uplooking01
storm nimbus &
storm ui &

# uplooking02
storm nimbus &
storm supervisor &

# uplooking03
storm supervisor &

7.启动logviewer(可选)
在全部从节点执行"nohup bin/storm logviewer >/dev/null 2>&1 &"启动log后台程序,并放到后台执行。
(nimbus节点能够不用启动logviewer进程,由于logviewer进程主要是为了方便查看任务的执行日志,这些执行日志都在supervisor节点上)。

由于启动了storm ui,在地址栏中输入:http://uplooking01:8080就能够查看storm集群的相关信息ide

Storm笔记整理(三):Storm集群安装部署与Topology做业提交

Storm笔记整理(三):Storm集群安装部署与Topology做业提交

同时查看其显示的信息,对于咱们前面的配置也有一个十分直观的体现。

提交Topology做业到集群

Topology开发与打包

使用前面的计算总和的例子:

package cn.xpleaf.bigdata.storm.remote;

import cn.xpleaf.bigdata.storm.utils.StormUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Date;
import java.util.Map;

/**
 * 1°、实现数字累加求和的案例:数据源不断产生递增数字,对产生的数字累加求和。
 * <p>
 * Storm组件:Spout、Bolt、数据是Tuple,使用main中的Topology将spout和bolt进行关联
 * MapReduce的组件:Mapper和Reducer、数据是Writable,经过一个main中的job将两者关联
 * <p>
 * 适配器模式(Adapter):BaseRichSpout,其对继承接口中一些不必的方法进行了重写,但其重写的代码没有实现任何功能。
 * 咱们称这为适配器模式
 */
public class StormSumTopology {

    /**
     * 数据源
     */
    static class OrderSpout extends BaseRichSpout {

        private Map conf;   // 当前组件配置信息
        private TopologyContext context;    // 当前组件上下文对象
        private SpoutOutputCollector collector; // 发送tuple的组件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        /**
         * 接收数据的核心方法
         */
        @Override
        public void nextTuple() {
            long num = 0;
            while (true) {
                num++;
                StormUtil.sleep(1000);
                System.out.println("当前时间" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "产生的订单金额:" + num);
                this.collector.emit(new Values(num));
            }
        }

        /**
         * 是对发送出去的数据的描述schema
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("order_cost"));
        }
    }

    /**
     * 计算和的Bolt节点
     */
    static class SumBolt extends BaseRichBolt {

        private Map conf;   // 当前组件配置信息
        private TopologyContext context;    // 当前组件上下文对象
        private OutputCollector collector; // 发送tuple的组件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Long sumOrderCost = 0L;

        /**
         * 处理数据的核心方法
         */
        @Override
        public void execute(Tuple input) {
            Long orderCost = input.getLongByField("order_cost");
            sumOrderCost += orderCost;

            System.out.println("商城网站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品总交易额" + sumOrderCost);
            StormUtil.sleep(1000);
        }

        /**
         * 若是当前bolt为最后一个处理单元,该方法能够不用管
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 构建拓扑,至关于在MapReduce中构建Job
     */
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 设置spout和bolt的dag(有向无环图)
         */
        builder.setSpout("id_order_spout", new OrderSpout());
        builder.setBolt("id_sum_bolt", new SumBolt())
                .shuffleGrouping("id_order_spout"); // 经过不一样的数据流转方式,来指定数据的上游组件
        // 使用builder构建topology
        StormTopology topology = builder.createTopology();
        String topologyName = StormSumTopology.class.getSimpleName();  // 拓扑的名称
        Config config = new Config();   // Config()对象继承自HashMap,但自己封装了一些基本的配置

        // 启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter
        if (args == null || args.length < 1) {  // 没有参数时使用本地模式,有参数时使用集群模式
            LocalCluster localCluster = new LocalCluster(); // 本地开发模式,建立的对象为LocalCluster
            localCluster.submitTopology(topologyName, config, topology);
        } else {
            StormSubmitter.submitTopology(topologyName, config, topology);
        }
    }
}

能够看到区别在于后面做业的提供方式,使用集群的方式为:StormSubmitter.submitTopology(topologyName, config, topology);

这里使用Maven的方式进行打包,确保pom.xml中已经配置了storm-core依赖的可见范围和相关的打包插件:

<!--依赖-->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.2</version>
    <!--可见范围为provided时,打包时不会对依赖进行打包,但在本地测试开发时应该注释掉,不然程序没法运行-->
    <!--另外不须要打包storm的依赖是由于,集群中已经有storm的相关依赖jar包了-->
    <scope>provided</scope>
</dependency>

<!--打包插件-->
<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <descriptorRefs>
            <!-- 将依赖也一块儿打包 -->
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
            <manifest>
                <!-- 能够在这里指定运行的主类,这样在打包为jar包后就能够不用指定须要运行的类 -->
                <mainClass>

                </mainClass>
            </manifest>
        </archive>
    </configuration>
    <executions>
        <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
</plugin>

在idea中配置maven打包的命令:

clean package -DskipTests

以后就能够打包并上传到咱们的集群环境中了。

提交做业

[uplooking@uplooking01 storm]$ cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster
-bash: cn.xpleaf.bigdata.storm.remote.StormSumTopology: command not found
[uplooking@uplooking01 storm]$ storm jar storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster
Running: /opt/jdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/uplooking/app/storm -Dstorm.log.dir=/home/uplooking/app/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/uplooking/app/storm/lib/log4j-over-slf4j-1.6.6.jar:/home/uplooking/app/storm/lib/reflectasm-1.10.1.jar:/home/uplooking/app/storm/lib/disruptor-3.3.2.jar:/home/uplooking/app/storm/lib/clojure-1.7.0.jar:/home/uplooking/app/storm/lib/objenesis-2.1.jar:/home/uplooking/app/storm/lib/log4j-slf4j-impl-2.1.jar:/home/uplooking/app/storm/lib/slf4j-api-1.7.7.jar:/home/uplooking/app/storm/lib/log4j-core-2.1.jar:/home/uplooking/app/storm/lib/storm-core-1.0.2.jar:/home/uplooking/app/storm/lib/storm-rename-hack-1.0.2.jar:/home/uplooking/app/storm/lib/kryo-3.0.3.jar:/home/uplooking/app/storm/lib/asm-5.0.3.jar:/home/uplooking/app/storm/lib/log4j-api-2.1.jar:/home/uplooking/app/storm/lib/servlet-api-2.5.jar:/home/uplooking/app/storm/lib/minlog-1.3.0.jar:storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar:/home/uplooking/app/storm/conf:/home/uplooking/app/storm/bin -Dstorm.jar=storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster
842  [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8973061592627522790:-5130577098800003128
934  [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []
1036 [main] INFO  o.a.s.StormSubmitter - Uploading topology jar storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar to assigned location: /home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar
1064 [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar
1064 [main] INFO  o.a.s.StormSubmitter - Submitting topology StormSumTopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8973061592627522790:-5130577098800003128"}
1710 [main] INFO  o.a.s.StormSubmitter - Finished submitting topology: StormSumTopology

注意看输出,jar包被上传到/home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar,后面能够在leader节点中查看到有该jar包:

[uplooking@uplooking02 inbox]$ pwd
/home/uplooking/data/storm/nimbus/inbox
[uplooking@uplooking02 inbox]$ ls
stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar

由于此时uplooking01节点不是leader,因此在其上面是没有该jar包的,这点须要注意。

概念验证

能够在storm ui中查看此时的集群状态信息:

Storm笔记整理(三):Storm集群安装部署与Topology做业提交

再查看详细的Topology信息:

Storm笔记整理(三):Storm集群安装部署与Topology做业提交

再查看spout或者bolt的详细信息:

Storm笔记整理(三):Storm集群安装部署与Topology做业提交

能够看到是在uplooking02上运行的Executors,此时能够到该节点上查看输出信息:

[uplooking@uplooking02 6700]$ pwd
/home/uplooking/app/storm/logs/workers-artifacts/StormSumTopology-1-1523548000/6700
[uplooking@uplooking02 6700]$ tail -5 worker.log
2018-04-13 00:39:56.636 STDIO [INFO] 商城网站到目前20180413003956的商品总交易额5054610
2018-04-13 00:39:57.636 STDIO [INFO] 当前时间20180413003957产生的订单金额:3181
2018-04-13 00:39:57.637 STDIO [INFO] 商城网站到目前20180413003957的商品总交易额5057790
2018-04-13 00:39:58.638 STDIO [INFO] 当前时间20180413003958产生的订单金额:3182
2018-04-13 00:39:58.639 STDIO [INFO] 商城网站到目前20180413003958的商品总交易额5060971

须要注意的是,此时在uplooking03上是没有这些信息的,由于集群将做业交给了uplooking02上的supervisor来运行。此外还须要知道的是,在uplooking02的data目录下也能够查看到有前面的jar包,其是由nimbus分发过来的:

[uplooking@uplooking02 StormSumTopology-1-1523548000]$ pwd
/home/uplooking/data/storm/supervisor/stormdist/StormSumTopology-1-1523548000
[uplooking@uplooking02 StormSumTopology-1-1523548000]$ ls
stormcode.ser  stormconf.ser  stormjar.jar

可是在uplooking03上也是没有的。

另外也能够在uplooking02上使用jps命令查看到有worker进程:

[uplooking@uplooking02 ~]$ jps
2224 QuorumPeerMain
1858 Elasticsearch
27427 logviewer
2291 NameNode
27972 LogWriter
27988 worker
25878 nimbus
28006 Jps
26054 supervisor
2552 DFSZKFailoverController
2365 DataNode
2462 JournalNode

对于输出信息的查看,其实也能够在storm ui上直接进行查看,上面的界面,点击6700的连接,就能够进行查看,可是前提是须要先在uplooking02上运行了logviewer

storm logviewer &

查看到的输出以下:

Storm笔记整理(三):Storm集群安装部署与Topology做业提交

集群健壮性验证

由前面能够知道,目前worker运行在uplooking02上,若是在此节点上直接将该进程kill掉,那么其又会自动进行重启:

[uplooking@uplooking02 ~]$ jps | grep worker
27988 worker
[uplooking@uplooking02 ~]$ kill -9 27988
[uplooking@uplooking02 ~]$ jps | grep worker
kill 27988: 没有那个进程
[uplooking@uplooking02 ~]$ kill 27988: 没有那个进程

[uplooking@uplooking02 ~]$ jps | grep worker
28235 worker

固然若是真的但愿停掉Topology做业,有两种方式:

第一种是在storm ui的topology界面中进行操做:
    Topology actions中有Kill的操做,点击便可

第二种是在命令行中使用命令进行操做:
    [uplooking@uplooking01 ~]$ storm kill
    Syntax: [storm kill topology-name [-w wait-time-secs]]
    -w后接秒数,表示多少秒后将中止该topology做业

再作进一步的验证,若是把三台主机上除了了worker进程(nimbus、supervisor等)都关掉,那么此时worker是能够继续正常运行的,数据也会正常产生,只是此时不一样的是,不可以再向storm集群中添加做业了。

相关文章
相关标签/搜索