要实现的功能如上java
而后如今先写几个组件:数组
RandomWordSpout(采集数据,这里为了简单一些,就随机产生一些数据)并发
public class RandomWordSpout extends BaseRichSpout{ private SpoutOutputCollector collector; //模拟一些数组 String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"}; //不断向下一个组件发送 tuple 消息 //这里面是该 spout 组件的核心逻辑 @Override public void nextTuple() { //能够从 kafka 消息队列中拿到数据,简便起见,咱们从 words 数组中随机挑选一个商品名发送出去 Random random = new Random(); int index = random.nextInt(words.length); //经过随机数拿到一个商品名 String godName = words[index]; //将商品名封装成 tuple ,发送消息给下一个组件 collector.emit(new Values(godName)); //无法送一个消息,休眠500ms Utils.sleep(500); } //初始化方法,在 spout 组件实例化时调用一次 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } //声明本 spout 组件发送出去的 tuple 中的数据的字段名 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("orignname")); } }
UpperBolt(转换为大写)dom
public class UpperBolt extends BaseBasicBolt{ //业务逻辑 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //先获取上一个组件传递过来的数据,数据在 tiple 里面 String godName = tuple.getString(0); //将商品名转化成大写 String godName_upper = godName.toUpperCase(); //将转换完成的商品名发送出去 collector.emit(new Values(godName_upper)); } //声明该 blot 组件要发送出去的 tuple 字段 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uppername")); } }
SuffixBolt(添加后缀,写入文件)iphone
public class SuffixBolt extends BaseBasicBolt{ FileWriter fileWriter = null; //该 bolt 组件运行过程当中只会被调用一次 @Override public void prepare(Map stormConf, TopologyContext context) { try { fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID()); } catch (IOException e) { throw new RuntimeException(e); } } //该 blot 组件的核心处理逻辑 //每收到一个 tuple 消息,就会被调用一次 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //先拿到上一个组件发送过来的商品名称 String upper_name = tuple.getString(0); String suffix_name = upper_name + "_itisok"; //为上一个组件发送过来的商品名称添加后缀 try { fileWriter.write(suffix_name); fileWriter.write("\n"); fileWriter.flush(); } catch (IOException e) { throw new RuntimeException(e); } } //本 blot 已经不须要发送 tuple 消息到下一个组件,因此不须要再声明 tuple 字段 @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { } }
TopoMain(把上面三个组件串起来)ide
/** * 组织各个处理组件造成一个完整的处理流程,就是所谓的 topology(相似MapReduce中的 job ) * 而且将该 topology 提交给 storm 集群去运行,topology 提交到集群中,将无间隙的运行,除非人为或者异常退出 * @author duanhaitao@itcast.cn * */ public class TopoMain { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //将咱们的 spout 组件设置到 topology 中去 //parallelism_hint :4 表示用 4 个 excutor 来执行这个组件 //setNumTasks(8) 设置的是该组件执行时,并发task 数量,也就是 1 个 excutor 会运行 2 个task builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8); //将咱们的 spout 组件设置到 topology 中去,而且指定它接受 randomspout 组件的消息 //.shuffleGrouping("upperbolt")有两层含义 //一、upperbolt 组件接受的 tuple 消息必定来自于 randomspout //二、randomspout 组件和 upperbolt 组件的大量并发 task 实例之间收发消息时,采用的分组策略是随机分组shuffleGrouping builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout"); //将添加后缀的 bolt 组件设置到 topology 去,而且指定它接受 upperblit 组件的消息 builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt"); //用 builder 来建立一个 topology StormTopology demotop = builder.createTopology(); //配置一些 topology 在集群中运行时的参数 Config conf = new Config(); //这里设置的是整个 demotop 所占用的槽位数,也就是 workor 数量 conf.setNumWorkers(4); conf.setDebug(true); conf.setNumAckers(0); //将这个 topology 提交给 strom 集群运行 StormSubmitter.submitTopology("demotopo", conf, demotop); } }
能够本地,也可提交到集群oop
先打个包,传到集群中,运行便可ui