[TOC]html
Storm集群表面相似Hadoop集群。但在Hadoop上你运行的是”MapReduce jobs”,在Storm上你运行的是”topologies”。”Jobs”和”topologies”是大不一样的,一个关键不一样是一个MapReduce的Job最终会结束,而一个topology永远处理消息(或直到你kill它)。java
Storm集群有两种节点:控制(master)节点和工做者(worker)节点。控制节点运行一个称之为”Nimbus”的后台程序,它相似于Haddop的”JobTracker”。Nimbus负责在集群范围内分发代码、为worker分配任务和故障监测。apache
注意:Hadoop 2.0之前使用JobTrack来进行Job的分发,但2.x以后就使用了全新的资源调度框架,即yarn,这点尤为须要注意。api
每一个工做者节点运行一个称之”Supervisor”的后台程序。Supervisor监听分配给它所在机器的工做,基于Nimbus分配给它的事情来决定启动或中止工做者进程。每一个工做者进程执行一个topology的子集(也就是一个子拓扑结构);一个运行中的topology由许多跨多个机器的工做者进程组成。bash
一个Zookeeper集群负责Nimbus和多个Supervisor之间的全部协调工做(一个完整的拓扑可能被分为多个子拓扑并由多个supervisor完成)。app
此外,Nimbus后台程序和Supervisor后台程序都是快速失败(fail-fast)和无状态的;全部状态维持在Zookeeper或本地磁盘。这意味着你能够kill -9杀掉nimbus进程和supervisor进程,而后重启,它们将恢复状态并继续工做,就像什么也没发生。这种设计使storm极其稳定。这种设计中Master并无直接和worker通讯,而是借助一个中介Zookeeper,这样一来能够分离master和worker的依赖,将状态信息存放在zookeeper集群内以快速恢复任何失败的一方。框架
能够参考官方文档:http://storm.apache.org/releases/1.0.6/Setting-up-a-Storm-cluster.htmlmaven
官方文档对于配置中的解释是很是清晰明了和容易理解的。分布式
下载地址:https://storm.apache.org/downloads.html 须要确保已经安装好了zookeeper环境,在个人环境中已经搭建好了zookeeper集群环境。 1.解压 [uplooking@uplooking01 soft]$ tar -zxvf apache-storm-1.0.2.tar.gz -C ../app/ [uplooking@uplooking01 app]$ mv apache-storm-1.0.2/ storm 2.修改配置文件 # storm-env.sh export JAVA_HOME=/opt/jdk export STORM_CONF_DIR="/home/uplooking/app/storm/conf" # storm.yaml storm.zookeeper.servers: - "uplooking01" - "uplooking02" - "uplooking03" nimbus.seeds: ["uplooking01", "uplooking02"] storm.local.dir: "/home/uplooking/data/storm" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 3.建立storm.local.dir mkdir -p /home/uplooing/data/storm 4.配置环境变量 # .bash_profile export STORM_HOME=/home/uplooking/app/storm export PATH=$PATH:$STORM_HOME/bin # 将其同步到其它节点 scp .bash_profile uplooking@uplooking02:/home/uplooking scp .bash_profile uplooking@uplooking03:/home/uplooking 5.复制storm安装目录到其它节点 scp -r storm/ uplooking@uplooking02:/home/uplooking/app scp -r storm/ uplooking@uplooking03:/home/uplooking/app 6.启动storm集群 # uplooking01 storm nimbus & storm ui & # uplooking02 storm nimbus & storm supervisor & # uplooking03 storm supervisor & 7.启动logviewer(可选) 在全部从节点执行"nohup bin/storm logviewer >/dev/null 2>&1 &"启动log后台程序,并放到后台执行。 (nimbus节点能够不用启动logviewer进程,由于logviewer进程主要是为了方便查看任务的执行日志,这些执行日志都在supervisor节点上)。
由于启动了storm ui,在地址栏中输入:http://uplooking01:8080就能够查看storm集群的相关信息:ide
同时查看其显示的信息,对于咱们前面的配置也有一个十分直观的体现。
使用前面的计算总和的例子:
package cn.xpleaf.bigdata.storm.remote; import cn.xpleaf.bigdata.storm.utils.StormUtil; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Date; import java.util.Map; /** * 1°、实现数字累加求和的案例:数据源不断产生递增数字,对产生的数字累加求和。 * <p> * Storm组件:Spout、Bolt、数据是Tuple,使用main中的Topology将spout和bolt进行关联 * MapReduce的组件:Mapper和Reducer、数据是Writable,经过一个main中的job将两者关联 * <p> * 适配器模式(Adapter):BaseRichSpout,其对继承接口中一些不必的方法进行了重写,但其重写的代码没有实现任何功能。 * 咱们称这为适配器模式 */ public class StormSumTopology { /** * 数据源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 当前组件配置信息 private TopologyContext context; // 当前组件上下文对象 private SpoutOutputCollector collector; // 发送tuple的组件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * 接收数据的核心方法 */ @Override public void nextTuple() { long num = 0; while (true) { num++; StormUtil.sleep(1000); System.out.println("当前时间" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "产生的订单金额:" + num); this.collector.emit(new Values(num)); } } /** * 是对发送出去的数据的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("order_cost")); } } /** * 计算和的Bolt节点 */ static class SumBolt extends BaseRichBolt { private Map conf; // 当前组件配置信息 private TopologyContext context; // 当前组件上下文对象 private OutputCollector collector; // 发送tuple的组件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 处理数据的核心方法 */ @Override public void execute(Tuple input) { Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; System.out.println("商城网站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品总交易额" + sumOrderCost); StormUtil.sleep(1000); } /** * 若是当前bolt为最后一个处理单元,该方法能够不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 构建拓扑,至关于在MapReduce中构建Job */ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 设置spout和bolt的dag(有向无环图) */ builder.setSpout("id_order_spout", new OrderSpout()); builder.setBolt("id_sum_bolt", new SumBolt()) .shuffleGrouping("id_order_spout"); // 经过不一样的数据流转方式,来指定数据的上游组件 // 使用builder构建topology StormTopology topology = builder.createTopology(); String topologyName = StormSumTopology.class.getSimpleName(); // 拓扑的名称 Config config = new Config(); // Config()对象继承自HashMap,但自己封装了一些基本的配置 // 启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter if (args == null || args.length < 1) { // 没有参数时使用本地模式,有参数时使用集群模式 LocalCluster localCluster = new LocalCluster(); // 本地开发模式,建立的对象为LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } }
能够看到区别在于后面做业的提供方式,使用集群的方式为:StormSubmitter.submitTopology(topologyName, config, topology);
。
这里使用Maven的方式进行打包,确保pom.xml中已经配置了storm-core
依赖的可见范围和相关的打包插件:
<!--依赖--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.2</version> <!--可见范围为provided时,打包时不会对依赖进行打包,但在本地测试开发时应该注释掉,不然程序没法运行--> <!--另外不须要打包storm的依赖是由于,集群中已经有storm的相关依赖jar包了--> <scope>provided</scope> </dependency> <!--打包插件--> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <!-- 将依赖也一块儿打包 --> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- 能够在这里指定运行的主类,这样在打包为jar包后就能够不用指定须要运行的类 --> <mainClass> </mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
在idea中配置maven打包的命令:
clean package -DskipTests
以后就能够打包并上传到咱们的集群环境中了。
[uplooking@uplooking01 storm]$ cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster -bash: cn.xpleaf.bigdata.storm.remote.StormSumTopology: command not found [uplooking@uplooking01 storm]$ storm jar storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster Running: /opt/jdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/uplooking/app/storm -Dstorm.log.dir=/home/uplooking/app/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/uplooking/app/storm/lib/log4j-over-slf4j-1.6.6.jar:/home/uplooking/app/storm/lib/reflectasm-1.10.1.jar:/home/uplooking/app/storm/lib/disruptor-3.3.2.jar:/home/uplooking/app/storm/lib/clojure-1.7.0.jar:/home/uplooking/app/storm/lib/objenesis-2.1.jar:/home/uplooking/app/storm/lib/log4j-slf4j-impl-2.1.jar:/home/uplooking/app/storm/lib/slf4j-api-1.7.7.jar:/home/uplooking/app/storm/lib/log4j-core-2.1.jar:/home/uplooking/app/storm/lib/storm-core-1.0.2.jar:/home/uplooking/app/storm/lib/storm-rename-hack-1.0.2.jar:/home/uplooking/app/storm/lib/kryo-3.0.3.jar:/home/uplooking/app/storm/lib/asm-5.0.3.jar:/home/uplooking/app/storm/lib/log4j-api-2.1.jar:/home/uplooking/app/storm/lib/servlet-api-2.5.jar:/home/uplooking/app/storm/lib/minlog-1.3.0.jar:storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar:/home/uplooking/app/storm/conf:/home/uplooking/app/storm/bin -Dstorm.jar=storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster 842 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8973061592627522790:-5130577098800003128 934 [main] INFO o.a.s.s.a.AuthUtils - Got AutoCreds [] 1036 [main] INFO o.a.s.StormSubmitter - Uploading topology jar storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar to assigned location: /home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar 1064 [main] INFO o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar 1064 [main] INFO o.a.s.StormSubmitter - Submitting topology StormSumTopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8973061592627522790:-5130577098800003128"} 1710 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: StormSumTopology
注意看输出,jar包被上传到/home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar
,后面能够在leader
节点中查看到有该jar包:
[uplooking@uplooking02 inbox]$ pwd /home/uplooking/data/storm/nimbus/inbox [uplooking@uplooking02 inbox]$ ls stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar
由于此时uplooking01
节点不是leader
,因此在其上面是没有该jar包的,这点须要注意。
能够在storm ui中查看此时的集群状态信息:
再查看详细的Topology信息:
再查看spout或者bolt的详细信息:
能够看到是在uplooking02
上运行的Executors
,此时能够到该节点上查看输出信息:
[uplooking@uplooking02 6700]$ pwd /home/uplooking/app/storm/logs/workers-artifacts/StormSumTopology-1-1523548000/6700 [uplooking@uplooking02 6700]$ tail -5 worker.log 2018-04-13 00:39:56.636 STDIO [INFO] 商城网站到目前20180413003956的商品总交易额5054610 2018-04-13 00:39:57.636 STDIO [INFO] 当前时间20180413003957产生的订单金额:3181 2018-04-13 00:39:57.637 STDIO [INFO] 商城网站到目前20180413003957的商品总交易额5057790 2018-04-13 00:39:58.638 STDIO [INFO] 当前时间20180413003958产生的订单金额:3182 2018-04-13 00:39:58.639 STDIO [INFO] 商城网站到目前20180413003958的商品总交易额5060971
须要注意的是,此时在uplooking03
上是没有这些信息的,由于集群将做业交给了uplooking02
上的supervisor
来运行。此外还须要知道的是,在uplooking02
的data目录下也能够查看到有前面的jar包,其是由nimbus
分发过来的:
[uplooking@uplooking02 StormSumTopology-1-1523548000]$ pwd /home/uplooking/data/storm/supervisor/stormdist/StormSumTopology-1-1523548000 [uplooking@uplooking02 StormSumTopology-1-1523548000]$ ls stormcode.ser stormconf.ser stormjar.jar
可是在uplooking03
上也是没有的。
另外也能够在uplooking02
上使用jps命令查看到有worker
进程:
[uplooking@uplooking02 ~]$ jps 2224 QuorumPeerMain 1858 Elasticsearch 27427 logviewer 2291 NameNode 27972 LogWriter 27988 worker 25878 nimbus 28006 Jps 26054 supervisor 2552 DFSZKFailoverController 2365 DataNode 2462 JournalNode
对于输出信息的查看,其实也能够在storm ui上直接进行查看,上面的界面,点击6700
的连接,就能够进行查看,可是前提是须要先在uplooking02
上运行了logviewer
:
storm logviewer &
查看到的输出以下:
由前面能够知道,目前worker
运行在uplooking02
上,若是在此节点上直接将该进程kill掉,那么其又会自动进行重启:
[uplooking@uplooking02 ~]$ jps | grep worker 27988 worker [uplooking@uplooking02 ~]$ kill -9 27988 [uplooking@uplooking02 ~]$ jps | grep worker kill 27988: 没有那个进程 [uplooking@uplooking02 ~]$ kill 27988: 没有那个进程 [uplooking@uplooking02 ~]$ jps | grep worker 28235 worker
固然若是真的但愿停掉Topology做业,有两种方式:
第一种是在storm ui的topology界面中进行操做: Topology actions中有Kill的操做,点击便可 第二种是在命令行中使用命令进行操做: [uplooking@uplooking01 ~]$ storm kill Syntax: [storm kill topology-name [-w wait-time-secs]] -w后接秒数,表示多少秒后将中止该topology做业
再作进一步的验证,若是把三台主机上除了了worker
进程(nimbus、supervisor等)都关掉,那么此时worker
是能够继续正常运行的,数据也会正常产生,只是此时不一样的是,不可以再向storm集群中添加做业了。