[TOC]java
在Strom的API中提供了LocalCluster
对象,这样在不用搭建Storm环境或者Storm集群的状况下也可以开发Storm的程序,很是方便。数据库
基于Maven构建工程项目,其所须要的依赖以下:apache
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.2</version> </dependency>
需求以下:并发
数据源不断产生递增数字,对产生的数字累加求和
分析以下:app
Strom的Topology包含Spout和Bolt两种节点类型,在这个案例中,可使用Spout来对数据源进行处理(模拟产生数据), 而后将其发送到计算和的Bolt中,因此实际上这里只须要使用一个Spout节点和一个Bolt节点就能够了。
在理解了Storm的设计思想后,将其与MapReduce的设计思想进行对比,再看下面的程序代码实际上是很是好理解的。dom
/** * 数据源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 当前组件配置信息 private TopologyContext context; // 当前组件上下文对象 private SpoutOutputCollector collector; // 发送tuple的组件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * 接收数据的核心方法 */ @Override public void nextTuple() { long num = 0; while (true) { num++; StormUtil.sleep(1000); System.out.println("当前时间" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "产生的订单金额:" + num); this.collector.emit(new Values(num)); } } /** * 是对发送出去的数据的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("order_cost")); } }
private Long sumOrderCost = 0L; /** * 计算和的Bolt节点 */ static class SumBolt extends BaseRichBolt { private Map conf; // 当前组件配置信息 private TopologyContext context; // 当前组件上下文对象 private OutputCollector collector; // 发送tuple的组件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 处理数据的核心方法 */ @Override public void execute(Tuple input) { Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; System.out.println("商城网站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品总交易额" + sumOrderCost); StormUtil.sleep(1000); } /** * 若是当前bolt为最后一个处理单元,该方法能够不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
/** * 1°、实现数字累加求和的案例:数据源不断产生递增数字,对产生的数字累加求和。 * <p> * Storm组件:Spout、Bolt、数据是Tuple,使用main中的Topology将spout和bolt进行关联 * MapReduce的组件:Mapper和Reducer、数据是Writable,经过一个main中的job将两者关联 * <p> * 适配器模式(Adapter):BaseRichSpout,其对继承接口中一些不必的方法进行了重写,但其重写的代码没有实现任何功能。 * 咱们称这为适配器模式 */ public class StormLocalSumTopology { /** * 构建拓扑,至关于在MapReduce中构建Job */ public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); /** * 设置spout和bolt的dag(有向无环图) */ builder.setSpout("id_order_spout", new OrderSpout()); builder.setBolt("id_sum_bolt", new SumBolt()) .shuffleGrouping("id_order_spout"); // 经过不一样的数据流转方式,来指定数据的上游组件 // 使用builder构建topology StormTopology topology = builder.createTopology(); // 启动topology LocalCluster localCluster = new LocalCluster(); // 本地开发模式,建立的对象为LocalCluster String topologyName = StormLocalSumTopology.class.getSimpleName(); // 拓扑的名称 Config config = new Config(); // Config()对象继承自HashMap,但自己封装了一些基本的配置 localCluster.submitTopology(topologyName, config, topology); } }
须要说明的是,Spout和Bolt的类都做为StormLocalSumTopology的静态成员变量,这样作是为了开发的方便,固然实际上也能够将其单独做为一个文件。分布式
执行主函数,其输出以下:ide
当前时间20180412213836产生的订单金额:1 商城网站到目前20180412213836的商品总交易额1 当前时间20180412213837产生的订单金额:2 商城网站到目前20180412213837的商品总交易额3 当前时间20180412213838产生的订单金额:3 商城网站到目前20180412213838的商品总交易额6 ......
需求以下:函数
监控一个目录下的文件,当发现有新文件的时候,把文件读取过来,解析文件中的内容,统计单词出现的总次数
分析以下:oop
能够设置三个节点: Spout:用于持续读取目录下须要被监听(经过后缀名标识)的文件,而且将每一行输出到下一个Bolt中 (相似于MapReduce中的FileInputFormat) Bolt1:读取行,并解析其中的单词,将每一个单词输出到下一个Bolt中 (相似于MapReduce中的Mapper) Bolt2:读取单词,进行统计计算 (相似于MapReduce中的Reducer)
/** * Spout,获取数据源,这里是持续读取某一目录下的文件,并将每一行输出到下一个Bolt中 */ static class FileSpout extends BaseRichSpout { private Map conf; // 当前组件配置信息 private TopologyContext context; // 当前组件上下文对象 private SpoutOutputCollector collector; // 发送tuple的组件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } @Override public void nextTuple() { File directory = new File("D:/data/storm"); // 第二个参数extensions的意思就是,只采集某些后缀名的文件 Collection<File> files = FileUtils.listFiles(directory, new String[]{"txt"}, true); for (File file : files) { try { List<String> lines = FileUtils.readLines(file, "utf-8"); for(String line : lines) { this.collector.emit(new Values(line)); } // 当前文件被消费以后,须要重命名,同时为了防止相同文件的加入,重命名后的文件加了一个随机的UUID,或者加入时间戳也能够的 File destFile = new File(file.getAbsolutePath() + "_" + UUID.randomUUID().toString() + ".completed"); FileUtils.moveFile(file, destFile); } catch (IOException e) { e.printStackTrace(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } }
/** * Bolt节点,将接收到的每一行数据切割为一个个单词并发送到下一个节点 */ static class SplitBolt extends BaseRichBolt { private Map conf; // 当前组件配置信息 private TopologyContext context; // 当前组件上下文对象 private OutputCollector collector; // 发送tuple的组件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getStringByField("line"); String[] words = line.split(" "); for (String word : words) { this.collector.emit(new Values(word,1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
/** * Bolt节点,执行单词统计计算 */ static class WCBolt extends BaseRichBolt { private Map conf; // 当前组件配置信息 private TopologyContext context; // 当前组件上下文对象 private OutputCollector collector; // 发送tuple的组件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Map<String, Integer> map = new HashMap<>(); @Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = input.getIntegerByField("count"); /*if (map.containsKey(word)) { map.put(word, map.get(word) + 1); } else { map.put(word, 1); }*/ map.put(word, map.getOrDefault(word, 0) + 1); System.out.println("===================================="); map.forEach((k ,v)->{ System.out.println(k + ":::" +v); }); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
/** * 2°、单词计数:监控一个目录下的文件,当发现有新文件的时候, 把文件读取过来,解析文件中的内容,统计单词出现的总次数 E:\data\storm */ public class StormLocalWordCountTopology { /** * 构建拓扑,组装Spout和Bolt节点,至关于在MapReduce中构建Job */ public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); // dag builder.setSpout("id_file_spout", new FileSpout()); builder.setBolt("id_split_bolt", new SplitBolt()).shuffleGrouping("id_file_spout"); builder.setBolt("id_wc_bolt", new WCBolt()).shuffleGrouping("id_split_bolt"); StormTopology stormTopology = builder.createTopology(); LocalCluster cluster = new LocalCluster(); String topologyName = StormLocalWordCountTopology.class.getSimpleName(); Config config = new Config(); cluster.submitTopology(topologyName, config, stormTopology); } }
执行程序后,往目标目录中添加.txt
文件,程序输出以下:
==================================== hello:::1 ==================================== hello:::1 you:::1 ==================================== hello:::2 you:::1 ==================================== hello:::2 he:::1 you:::1 ==================================== hello:::3 he:::1 you:::1 ==================================== me:::1 hello:::3 he:::1 you:::1
在编写了Storm的程序后,再来看看其相关的术语就容易理解不少了。
Topology用于封装一个实时计算应用程序的逻辑,相似于Hadoop的MapReduce Job
Stream 消息流,是一个没有边界的tuple序列,这些tuples会被以一种分布式的方式并行地建立和处理
Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tuple
Bolts 消息处理者,全部的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生新的输出数据流, 可执行过滤,聚合,查询数据库等操做
Task 每个Spout和Bolt会被看成不少task在整个集群里面执行,每个task对应到一个线程.
Stream groupings 消息分发策略,定义一个Topology的其中一步是定义每一个tuple接受什么样的流做为输入, stream grouping就是用来定义一个stream应该如何分配给Bolts们.