POM java
<properties> <storm.version>1.1.0</storm.version> <kafka.version>0.10.2.0</kafka.version> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <!-- 因为storm环境中有该jar,因此不用pack到最终的task.jar中 --> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${storm.version}</version> <scope>${provided.scope}</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>${storm.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> </dependencies>
toploployapache
public class KafkaTopology { /** * storm jar /opt/storm-jar/storm-1.0-SNAPSHOT.jar com.boyoi.kafka.topology.KafkaTopology x2 */ public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout<>(getKafkaSpoutConfig()),200); /** * 1)shuffleGrouping(随机分组) * 2)fieldsGrouping(按照字段分组,在这里便是同一个单词只能发送给一个Bolt) * 3)allGrouping(广播发送,即每个Tuple,每个Bolt都会收到) * 4)globalGrouping(全局分组,将Tuple分配到task id值最低的task里面) * 5)noneGrouping(随机分派) * 6)directGrouping(直接分组,指定Tuple与Bolt的对应发送关系) * 7)Local or shuffle Grouping * 8)customGrouping (自定义的Grouping) */ builder.setBolt("filter", new FilterBolt(), 200).shuffleGrouping("spout"); builder.setBolt("analysis", new AnalysisBlot(), 200).shuffleGrouping("filter"); builder.setBolt("HBase", new HBaseBlot(),200).shuffleGrouping("analysis"); Config conf = new Config(); // conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(5); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("x2", conf, builder.createTopology()); cluster.shutdown(); } } /** * 获取kafka spout 配置 */ private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig() { return KafkaSpoutConfig.builder("192.168.1.9:9092,192.168.1.40:9092", "test2") .setGroupId("kafka") .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str")) .build(); } private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new JustValueFunc(); /** * Needs to be serializable */ private static class JustValueFunc implements Func<ConsumerRecord<String, String>, List<Object>>, Serializable { @Override public List<Object> apply(ConsumerRecord<String, String> record) { return new Values(record.value()); } } }
Blotapp
public class FilterBolt extends BaseRichBolt{ private OutputCollector outputCollector; /** * 初始化工做 */ @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; } /** * 执行逻辑,目的是过滤无用的字符串 */ @Override public void execute(Tuple tuple) { String value = tuple.getString(0); // 提交下一个 if (null != value && !"".equals(value)){ try { int val = Integer.parseInt(value); outputCollector.emit(new Values(val)); }catch (Exception e){ // ignore System.out.println(value + "不是数字!略过!!!"); } } // 返回确认 outputCollector.ack(tuple); } /** * 申明传入到一个Bolt的字段名称 * 经过 input.getStringByField("str");input.getIntegerByField("int"); 得到指定的 */ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("value")); } }
其它blot略maven
代码写好后。使用maven打成jar包。ide
放在集群Nimbus的某个目录下ui
storm jar /opt/storm-jar/storm-1.0-SNAPSHOT.jar com.*.kafka.topology.KafkaTopology x2