本篇Blog是一个简单的Storm入门例子,目的让读者明白Storm是怎样的运行机制。以及后续会放出的几篇Storm高级特性以及最终将Storm融入Hadoop 2.x的YARN中。目的读者是已经进阶大数据的Hadoop,Spark用户,或者了解Storm想深刻理解Storm的读者用户。apache
项目Pom(Storm jar没有提交到Maven中央仓库,须要在项目中加入下面的仓库地址):后端
<repositories> <repository> <id>central</id> <name>Maven Repository Switchboard</name> <layout>default</layout> <url>http://maven.oschina.net/content/groups/public/</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <id>clojars</id> <url>https://clojars.org/repo/</url> <snapshots> <enabled>false</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> </repositories> <dependencies> <dependency> <groupId>org.yaml</groupId> <artifactId>snakeyaml</artifactId> <version>1.13</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.3.3</version> </dependency> <dependency> <groupId>org.clojure</groupId> <artifactId>clojure</artifactId> <version>1.5.1</version> </dependency> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>storm</groupId> <artifactId>libthrift7</artifactId> <version>0.7.0</version> </dependency> </dependencies>
下面是一个Storm的HelloWord的例子,代码有删减,熟悉Storm的读者天然能把代码组织成一个完整的例子。session
public static void main(String[] args) { Config conf = new Config(); conf.put(Config.STORM_LOCAL_DIR, "/Volumes/Study/data/storm"); conf.put(Config.STORM_CLUSTER_MODE, "local"); //conf.put("storm.local.mode.zmq", "false"); conf.put("storm.zookeeper.root", "/storm"); conf.put("storm.zookeeper.session.timeout", 50000); conf.put("storm.zookeeper.servers", "nowledgedata-n15"); conf.put("storm.zookeeper.port", 2181); //conf.setDebug(true); //conf.setNumWorkers(2); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 2); builder.setBolt("exclaim2", new DefaultStringBolt(), 5) .shuffleGrouping("words"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); }
TestWordSpout是一个Storm自带的例子,用来随机的产生<code>new String[] {"nathan", "mike", "jackson", "golda", "bertels"};</code>列表中的字符串,用来提供数据源。多线程
其中DefaultStringBolt的源码:maven
OutputCollector collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { log.info("rev a message: " + tuple.getString(0)); collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); collector.ack(tuple); }
运行日志:分布式
10658 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 10658 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 10758 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike 10758 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 10859 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 10859 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels 10961 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 10961 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 11061 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 11062 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 11162 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels 11163 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jacksonoop
数据由一个Storm叫作喷嘴(Spout,也至关一个水龙头,能产生数据的来源端)产生,而后传递给后端一连串的的Blot,最终被转换和消费。而Spout和Blot都是并行的,并行度均可以本身设置(本地运行是靠多线程模拟的)。如:测试
builder.setSpout("words", new TestWordSpout(), 2); builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
喷嘴TestWordSpout的并行度是2,DefaultStringBolt的并行度是5.大数据
从日志能够看出,数据通过喷嘴到达预先定于的一个Blot,打印了日志。我测试代码设置的并行度是5,日志中统计,确实是5个线程:ui
关于Storm是是什么?http://storm.incubator.apache.org/和http://www.ibm.com/developerworks/cn/opensource/os-twitterstorm/ 有详细的介绍。
借用OSC网友的话说,Hadoop就是商场里自动升降式的电梯,用户须要排队等待,选按楼层,而后到达;而Storm就像是自动扶梯,扶梯预先设置好运行后,来人就当即运走,目的地是明确的。
Storm按个人理解,Storm和Hadoop是彻底不一样的,设计上也没有半点拟合的部分。Storm更像是我以前介绍过的Spring Integration,是一个数据流系统。它能把数据按照预设定的流程,把数据作各类转换,传递,分解,合并,最后数据到达后端存储。只不过Storm是能够分布式,并且分布式的能力也是能够本身设置。
Storm的这种特性很适合大数据类的ETL系统开发。