一、这一文开始进入Storm流式计算框架的学习python
一、Storm与Hadoop的区别就是,Hadoop是一个离线执行的做业,执行完毕就结束了,而Storm是能够源源不断的接受数据源,不停的对数据进行处理,而数据就行水流同样不停的流进来,通过处理,再将结果存入数据库或者作其余用途数据库
二、基础概念apache
(1)Tuple(元组):数据流传递的基本单元,至关于数据的流动经过Tuple做为对象来传递json
(2)Spout(龙卷):至关于数据源,经过重写nextTuple()方法,源源不断的将数据流入咱们的处理框架vim
(3)Bolt(闪电):处理数据的节点,经过重写execute()方法,接收Spout送出的数据,并进行任意的业务处理,处理完毕还能够将数据继续流入下一个Bolt,组成一条链框架
(4)Topology(拓扑):链接以上各个组件,使其组成一个拓扑结构,好比将多个Bolt组成一条数据链maven
三、举例说明:好比咱们如今要统计一下《战争与和平》这本书的每一个英文单词出现的数量ide
(1)编写Spout代码,将书的内容源源不断的经过句子输入到咱们的体系中函数
(2)编写多个Bolt来处理数据,好比第一个Bolt专门来将句子切分红单词,第二个Bolt专门来统计每一个单词出现的数量,每一个Bolt之间经过定义Bolt来流动数据,好比统计的Bolt,定义一个二元元组(单词,数量),第一个值就是具体的单词,第二个值就是这个单词出现的数量oop
(3)经过Topology将以上组件链接成一个完整的系统
一、下载Storm的tar.gz包,并解压
tar zxvf /work/soft/installer/apache-storm-1.2.2
二、修改配置文件
(1)storm.ymal,分别配置咱们的Zookeeper集群(前文中已经搭建好了)的各个节点和nimbus节点的高可用性,避免单点故障,咱们的环境有两个机器,因此都写上
vim /work/soft/apache-storm-1.2.2/conf/storm.yaml storm.zookeeper.servers: - "storm1" - "storm2" nimbus.seeds: ["storm1", "storm2"]
三、输入python检查一下机器是否安装了python,若是没有则安装python,安装完毕再执行python,发现能够进入,而后ctrl+D退出便可
apt-get install python-minimal
四、启动Storm集群,经过如下命令分别启动nimbus、supervisor和控制台UI,nohup能够当SSH客户端关闭时,不会将进程杀死,后缀加一个&,能够理解为让进程在后台运行
nohup /work/soft/apache-storm-1.2.2/bin/storm nimbus & nohup /work/soft/apache-storm-1.2.2/bin/storm supervisor & nohup /work/soft/apache-storm-1.2.2/bin/storm ui &
四、经过jps命令查看进程是否正常启动,若是看到config_value,说明还没启动完毕,稍等一下就行了
五、打开8080端口,能够看到控制台,正常运行
咱们根据以上的思路写一个简单的单词统计任务,咱们先放在开发环境上面跑代码,是不须要Storm集群环境的,等咱们写好代码并在本地跑通后,就能够搭建Storm集群,在集群上面跑了,关于单词统计的代码网上很容易找到,下面阐述一下实现的思路,能够对照着如下文字来看代码,更好理解
一、建立一个maven工程,引入如下依赖,因为我这里的思路是:经过Rabbitmq获取消息数据,Storm进行数据流处理,将结果存储为Json格式并存入HBase。因此我须要引入以下依赖
<!-- HBase --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.1</version> </dependency> <!-- Storm --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.4</version> <scope>provided</scope> </dependency> <!-- Json --> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20140107</version> </dependency> <!-- RabbitMQ --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> </dependency>
二、由于要使用HBase,因此参照上文的操做,还要将Hadoop的配置文件拷贝到项目中。环境搭建好后,开始编写代码
三、首先编写Spout,也就是数据的来源,建立一个类实现IRichSpout接口,并重写nextTuple()方法,在这个方法里实现数据的生产,好比读取数据库(RDS或NoSQL),从消息队列获取数据(Kafka、RabbitMQ),将输出的数据定义成Tuple(元组),经过重写declareOutputFields()方法,定义元组的key和数量,而后在nextTuple()方法中将元组的内容经过emit()方法传递到下一个组件
四、编写Bolt,也就是数据的处理者,建立一个类实现IRichBolt接口,并重写execute(Tuple tuple)方法,这个方法就是处理数据的逻辑了,在这里能够编写各类代码对接收到的Tuple进行处理,处理完毕后,和Spout同样,能够将数据经过定义Tuple的方式传递到下一个组件,每一个Bolt会对数据进行特定的处理,而后传递给下一个Bolt,这样就能够组成一条数据流的处理链
五、编写Topology,拓扑将上面编写的组件链接起来,组成一个拓扑图,数据就在这个拓扑图里面持续的“流动”,永不停歇,拓扑也是程序的入口,因此建立一个主函数,在主函数里面建立一个TopologyBuilder对象,经过setSpout()、setBolt()方法将上面的组件链接起来,链接的方式涉及到Storm的八种Grouping策略
(1)Shuffle Grouping(随机分组):最经常使用的分组方式,将Tuple平均随机分配到各个Bolt里面
(2)Fields Grouping(字段分组):根据指定字段进行分组,好比咱们按照word字段进行分组,相同word值的Tuple会被分配到同一个Bolt里面
(3)All Grouping(广播分组):全部的Bolt均可以收到Tuple
(4)None Grouping(无分组):将Tuple随机分配到各个Bolt里面
(5)Global Grouping(全局分组):将Tuple分配到task id值最低的task里面
(6)Direct Grouping(直接分组):生产者Bolt决定消费者Bolt能够接受的Tuple
(7)Local or Shuffle Grouping(本地或者随机分组):Bolt在同一进程或存在多个task,元组会随机分配这些task
(8)Custom Grouping (自定义分组):经过实现CustomStreamGrouping接口来定义自定义分组
六、经过TopologyBuilder链接好各个组件后,就能够提交任务了,提交任务分两种方式:本地提交和集群提交
(1)本地提交:提交到开发环境中,不须要安装Storm环境,只须要引入Storm的依赖包便可,使用LocalCluster类的submitTopology方法提交任务
(2)集群提交:提交到Storm集群中,使用StormSubmitter类的submitTopology方法提交任务
一、首先咱们要修改一下pom文件,将以前引入的storm-core依赖里面加<scope>provided</scope>,目的是storm-core这个依赖排除掉,由于这个依赖只是本地测试调试依赖的,集群中不须要这个依赖,若是不加会报错,还要记得修改拓扑的代码,使用StormSubmitter类来提交
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.4</version> <scope>provided</scope> </dependency>
二、经过编译器将咱们的maven项目打包成jar
mvn clean install
三、将jar包拷贝到集群的集群里面,由于咱们的代码使用到了HBase,因此要记得把项目中的配置文件夹也拷贝过来(core-site.xml、hbase-site.xml、hdfs-site.xml),jar是扫描不到jar包里的配置文件的,把配置文件放到与jar包同级目录下便可
四、执行命令将jar包提交到集群中运行,命令后面要记得指定主函数的全包名
nohup /work/soft/apache-storm-1.2.2/bin/storm jar /work/jar/mytest.jar com.orange.heatmap.Main &
五、进入8080控制台,能够看到咱们刚才提交的拓扑,点击进去能够查看运行的状态