storm+kafka集成

POM java

<properties>
        <storm.version>1.1.0</storm.version>
        <kafka.version>0.10.2.0</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!-- 因为storm环境中有该jar,因此不用pack到最终的task.jar中 -->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>${storm.version}</version>
            <scope>${provided.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>${storm.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    </dependencies>

toploployapache

public class KafkaTopology {

    /**
     *  storm jar /opt/storm-jar/storm-1.0-SNAPSHOT.jar com.boyoi.kafka.topology.KafkaTopology x2
     */
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new KafkaSpout<>(getKafkaSpoutConfig()),200);

        /**
         * 1)shuffleGrouping(随机分组)
         * 2)fieldsGrouping(按照字段分组,在这里便是同一个单词只能发送给一个Bolt)
         * 3)allGrouping(广播发送,即每个Tuple,每个Bolt都会收到)
         * 4)globalGrouping(全局分组,将Tuple分配到task id值最低的task里面)
         * 5)noneGrouping(随机分派)
         * 6)directGrouping(直接分组,指定Tuple与Bolt的对应发送关系)
         * 7)Local or shuffle Grouping
         * 8)customGrouping (自定义的Grouping)
         */
        builder.setBolt("filter", new FilterBolt(), 200).shuffleGrouping("spout");

        builder.setBolt("analysis", new AnalysisBlot(), 200).shuffleGrouping("filter");

        builder.setBolt("HBase", new HBaseBlot(),200).shuffleGrouping("analysis");

        Config conf = new Config();
//        conf.setDebug(true);

        if (args != null && args.length > 0) {
            conf.setNumWorkers(5);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("x2", conf, builder.createTopology());
            cluster.shutdown();
        }
    }

    /**
     * 获取kafka spout 配置
     */
    private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig() {
        return KafkaSpoutConfig.builder("192.168.1.9:9092,192.168.1.40:9092", "test2")
                                                                            .setGroupId("kafka")
                                                                            .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
                                                                            .build();
    }

    private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new JustValueFunc();
    /**
     * Needs to be serializable
     */
    private static class JustValueFunc implements Func<ConsumerRecord<String, String>, List<Object>>, Serializable {
        @Override
        public List<Object> apply(ConsumerRecord<String, String> record) {
            return new Values(record.value());
        }
    }
}

Blotapp

public class FilterBolt extends BaseRichBolt{
    
    private OutputCollector outputCollector;

    /**
     * 初始化工做
     */
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

    /**
     * 执行逻辑,目的是过滤无用的字符串
     */
    @Override
    public void execute(Tuple tuple) {
        String value = tuple.getString(0);

        // 提交下一个
        if (null != value && !"".equals(value)){
            try {
                int val = Integer.parseInt(value);
                outputCollector.emit(new Values(val));
            }catch (Exception e){
                // ignore
                System.out.println(value + "不是数字!略过!!!");
            }

        }
        // 返回确认
        outputCollector.ack(tuple);
    }

    /**
     * 申明传入到一个Bolt的字段名称
     * 经过 input.getStringByField("str");input.getIntegerByField("int"); 得到指定的
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("value"));
    }
}

 

其它blot略maven

代码写好后。使用maven打成jar包。ide

放在集群Nimbus的某个目录下ui

storm jar /opt/storm-jar/storm-1.0-SNAPSHOT.jar com.*.kafka.topology.KafkaTopology x2
相关文章
相关标签/搜索