本文简单介绍下怎么使用docker运行storm以及在springboot中使用storm。git
version: '2' services: zookeeper: image: zookeeper ##3.4.10 container_name: zookeeper restart: always ports: - 2181:2181 nimbus: image: storm ## 1.1.1 container_name: nimbus command: storm nimbus depends_on: - zookeeper links: - zookeeper restart: always ports: - 6627:6627 supervisor: image: storm container_name: supervisor command: storm supervisor depends_on: - nimbus - zookeeper links: - nimbus - zookeeper restart: always ui: image: storm container_name: stormui command: storm ui depends_on: - nimbus - zookeeper links: - nimbus - zookeeper restart: always ports: - 8080:8080
启动以后访问192.168.99.100:8080就能够看见storm-ui的界面github
public class TestWordSpout extends BaseRichSpout { public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class); boolean _isDistributed; SpoutOutputCollector _collector; public TestWordSpout() { this(true); } public TestWordSpout(boolean isDistributed) { _isDistributed = isDistributed; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() { } public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } public void ack(Object msgId) { } public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { if(!_isDistributed) { Map<String, Object> ret = new HashMap<String, Object>(); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1); return ret; } else { return null; } } }
public class WordCountBolt extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
public class PrintBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String first = tuple.getString(0); int second = tuple.getInteger(1); System.out.println(first + "," + second); } }
@SpringBootApplication public class StormDemoApplication implements CommandLineRunner{ public static void main(String[] args) { SpringApplication app = new SpringApplication((StormDemoApplication.class)); app.setWebEnvironment(false); app.run(args); } @Override public void run(String... args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //并发度10 builder.setSpout("spout", new TestWordSpout(), 10); builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word")); builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count"); String topologyName = "DemoTopology"; Config conf = new Config(); conf.setDebug(true); //远程提交 mvn clean package -Dmaven.test.skip=true // StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); try { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf,builder.createTopology()); Thread.sleep(60 * 1000); cluster.shutdown(); } catch (Exception e) { e.printStackTrace(); } } }
修改提交方式,而后打jar包spring
//远程提交 mvn clean package -Dmaven.test.skip=true StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
远程提交代码docker
@Test public void remoteSubmit() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { Config conf = new Config(); conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus链接主机地址,好比:192.168.10.1 conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus链接端口,默认 6627 conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper链接主机地址,能够使用集合存放多个 conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper链接端口,默认2181 conf.setDebug(true); conf.setNumWorkers(1); TopologyBuilder builder = new TopologyBuilder(); //并发度10 builder.setSpout("spout", new TestWordSpout(), 10); builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word")); builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count"); String topologyName = "DemoTopology"; //很是关键的一步,使用StormSubmitter提交拓扑时,无论怎么样,都是须要将所需的jar提交到nimbus上去,若是不指定jar文件路径, //storm默认会使用System.getProperty("storm.jar")去取,若是不设定,就不能提交 System.setProperty("storm.jar","/Users/downloads/storm-demo-0.0.1-SNAPSHOT.jar"); StormSubmitter.submitTopology(topologyName, conf, builder.createTopology()); }
修改~/.m2/settings.xmlspringboot
<mirrors> <mirror> <id>nexus-aliyun</id> <mirrorOf>*,!Clojars</mirrorOf> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> </mirror> <repository> <id>Clojars</id> <name>Clojars Repository</name> <url>http://clojars.org/repo/</url> <releases><enabled>true</enabled></releases> <snapshots><enabled>true</enabled></snapshots> </repository>