在大数据领域中存在三大计算中心和三大计算引擎java
Apache Storm是Twitter开源的一个相似于Hadoop的实时数据处理框架,它原来是由BackType开发,后BackType被Twitter收购,将Storm做为Twitter的实时数据分析系统。sql
数据来源shell
hadoop数据库
stormapache
处理过程安全
hadoopbash
storm架构
处理速度框架
hadoopssh
storm
适用场景
Spout
Bolt
apache-storm-1.0.2.tar.gz
[root@uplooking01 /soft] tar -zxvf apache-storm-1.0.2.tar.gz -C /opt mv apache-storm-1.0.2/ storm
storm-env.sh
[root@uplooking01 /soft] export JAVA_HOME=/opt/jdk export STORM_CONF_DIR="/opt/storm/conf"
storm.yaml
[root@uplooking01 /opt/storm/conf] storm.zookeeper.servers: - "uplooking03" - "uplooking04" - "uplooking05" #配置两个主节点,实现主节点的单点故障 nimbus.seeds: ["uplooking01", "uplooking02"] storm.local.dir: "/opt/storm/storm-local" #配置从节点的槽数 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
[root@uplooking01 /] scp -r /opt/storm uplooking02:/opt scp -r /opt/storm uplooking03:/opt scp -r /opt/storm uplooking04:/opt scp -r /opt/storm uplooking05:/opt
[root@uplooking01 /] #启动主进程和ui进程 nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 & nohup /opt/storm/bin/storm ui >/dev/null 2>&1 & nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking02 /] #启动主进程(numbus) nohup /opt/storm/bin/storm numbus >/dev/null 2>&1 & nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
#启动从节点进程(supervisor) [root@uplooking03 /] nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 & nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 & [root@uplooking04 /] nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 & nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 & [root@uplooking05 /] nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 & nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
#!/bin/bash #启动nimbus for nimbusHost in `cat /opt/shell/nimbus.host` do #-T 进制分配伪终端 通常自动化脚本不须要分配伪终端 ssh -T root@${nimbusHost} << eeooff nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 & eeooff done #启动supervisor for supervisorHost in `cat /opt/shell/supervisor.host` do #-T 进制分配伪终端 通常自动化脚本不须要分配伪终端 ssh -T root@${supervisorHost} << eeooff nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 & eeooff done #启动logviewer for logviewerHost in `cat /opt/shell/logviewer.host` do #-T 进制分配伪终端 通常自动化脚本不须要分配伪终端 ssh -T root@${logviewerHost} << eeooff nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 & eeooff done #启动ui for uiHost in `cat /opt/shell/ui.host` do #-T 进制分配伪终端 通常自动化脚本不须要分配伪终端 ssh -T root@${uiHost} << eeooff nohup /opt/storm/bin/storm ui >/dev/null 2>&1 & eeooff done
public class MySpout extends BaseRichSpout { private SpoutOutputCollector collector; //初始化累加的数字 int num = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { collector.emit(new Values(num)); num++; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("mynum")); } }
public class MyBolt extends BaseRichBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple tuple) { Integer num = tuple.getIntegerByField("mynum"); System.out.println(num); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
public class MyTopology { public static void main(String[] args) { //建立自定义的spout MySpout mySpout = new MySpout(); //建立自定义的bolt MyBolt myBolt = new MyBolt(); //建立topology名称 String topologyName = "MyNumTopology"; //建立topology的配置对象 Map conf = new Config(); //建立topology的构造器 TopologyBuilder topologyBuilder = new TopologyBuilder(); //为topology设置spout和bolt topologyBuilder.setSpout("myspout", mySpout); topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout"); //建立本地的topology提交器 StormTopology stormTopology = topologyBuilder.createTopology(); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topologyName, conf, stormTopology); } }
public class MyBolt02 extends BaseRichBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple tuple) { System.out.println(tuple.getIntegerByField("mynum02") + "....."); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
public class MyBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { Integer num = tuple.getIntegerByField("mynum"); System.out.println(num); collector.emit(new Values(num)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("mynum02")); } }
public class MyTopology { public static void main(String[] args) { //建立自定义的spout MySpout mySpout = new MySpout(); //建立自定义的bolt MyBolt myBolt = new MyBolt(); MyBolt02 myBolt02 = new MyBolt02(); //建立topology名称 String topologyName = "MyNumTopology"; //建立topology的配置对象 Map conf = new Config(); //建立topology的构造器 TopologyBuilder topologyBuilder = new TopologyBuilder(); //为topology设置spout和bolt topologyBuilder.setSpout("myspout", mySpout); topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout"); topologyBuilder.setBolt("mybolt02", myBolt02).shuffleGrouping("mybolt"); //建立本地的topology提交器 StormTopology stormTopology = topologyBuilder.createTopology(); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(topologyName, conf, stormTopology); } }
StormSubmitter.submitTopology(topologyName, conf, stormTopology);
在storm中的并行度说的就是一个进程的运行须要多少个线程来参与,若是storm运行的线程个数+1,则并行度+1
Worker :