storm demo

 

要实现的功能如上java

而后如今先写几个组件:数组

RandomWordSpout(采集数据,这里为了简单一些,就随机产生一些数据)并发

public class RandomWordSpout extends BaseRichSpout{

	private SpoutOutputCollector collector;
	
	//模拟一些数组
	String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
	
	//不断向下一个组件发送 tuple 消息
	//这里面是该 spout 组件的核心逻辑
	@Override
	public void nextTuple() {

		//能够从 kafka 消息队列中拿到数据,简便起见,咱们从 words 数组中随机挑选一个商品名发送出去
		Random random = new Random();
		int index = random.nextInt(words.length);
		
		//经过随机数拿到一个商品名
		String godName = words[index];
		
		
		//将商品名封装成 tuple ,发送消息给下一个组件
		collector.emit(new Values(godName));
		
		//无法送一个消息,休眠500ms
		Utils.sleep(500);
		
		
	}

	//初始化方法,在 spout 组件实例化时调用一次
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

		this.collector = collector;
		
		
	}

	//声明本 spout 组件发送出去的 tuple 中的数据的字段名
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		declarer.declare(new Fields("orignname"));
		
	}

}

UpperBolt(转换为大写)dom

public class UpperBolt extends BaseBasicBolt{

	
	//业务逻辑
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		
		//先获取上一个组件传递过来的数据,数据在 tiple 里面
		String godName = tuple.getString(0);
		
		//将商品名转化成大写
		String godName_upper = godName.toUpperCase();
		
		//将转换完成的商品名发送出去
		collector.emit(new Values(godName_upper));
		
	}

	
	
	//声明该 blot 组件要发送出去的 tuple 字段
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
		declarer.declare(new Fields("uppername"));
	}

}

 

SuffixBolt(添加后缀,写入文件)iphone

public class SuffixBolt extends BaseBasicBolt{
	
	FileWriter fileWriter = null;
	
	
	//该 bolt 组件运行过程当中只会被调用一次
	@Override
	public void prepare(Map stormConf, TopologyContext context) {

		try {
			fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		
	}
	
	
	
	//该 blot 组件的核心处理逻辑
	//每收到一个 tuple 消息,就会被调用一次
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {

		//先拿到上一个组件发送过来的商品名称
		String upper_name = tuple.getString(0);
		String suffix_name = upper_name + "_itisok";
		
		
		//为上一个组件发送过来的商品名称添加后缀
		
		try {
			fileWriter.write(suffix_name);
			fileWriter.write("\n");
			fileWriter.flush();
			
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		
		
		
	}

	
	
	
	//本 blot 已经不须要发送 tuple 消息到下一个组件,因此不须要再声明 tuple 字段
	@Override
	public void declareOutputFields(OutputFieldsDeclarer arg0) {

		
	}

}

TopoMain(把上面三个组件串起来)ide

/**
 * 组织各个处理组件造成一个完整的处理流程,就是所谓的 topology(相似MapReduce中的 job )
 * 而且将该 topology 提交给 storm 集群去运行,topology 提交到集群中,将无间隙的运行,除非人为或者异常退出
 * @author duanhaitao@itcast.cn
 *
 */
public class TopoMain {

	
	public static void main(String[] args) throws Exception {
		
		TopologyBuilder builder = new TopologyBuilder();
		
		//将咱们的 spout 组件设置到 topology 中去 
        //parallelism_hint :4 表示用 4 个 excutor 来执行这个组件
		//setNumTasks(8) 设置的是该组件执行时,并发task 数量,也就是 1 个 excutor 会运行 2 个task
		builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
		
		//将咱们的 spout 组件设置到 topology 中去,而且指定它接受 randomspout 组件的消息
        //.shuffleGrouping("upperbolt")有两层含义
        //一、upperbolt 组件接受的 tuple 消息必定来自于 randomspout     
        //二、randomspout 组件和 upperbolt 组件的大量并发 task 实例之间收发消息时,采用的分组策略是随机分组shuffleGrouping
		builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
		
		//将添加后缀的 bolt 组件设置到 topology 去,而且指定它接受 upperblit 组件的消息
		builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
		
		//用 builder 来建立一个 topology 
		StormTopology demotop = builder.createTopology();
		
		
		//配置一些 topology 在集群中运行时的参数
		Config conf = new Config();
        //这里设置的是整个 demotop 所占用的槽位数,也就是 workor 数量
		conf.setNumWorkers(4);
		conf.setDebug(true);
		conf.setNumAckers(0);
		
		
		//将这个 topology 提交给 strom 集群运行
		StormSubmitter.submitTopology("demotopo", conf, demotop);
		
	}
}

 

能够本地,也可提交到集群oop

先打个包,传到集群中,运行便可ui

相关文章
相关标签/搜索