Kafka实战-数据持久化

1.概述

  通过前面Kafka实战系列的学习,咱们经过学习《Kafka实战-入门》了解Kafka的应用场景和基本原理,《Kafka实战-Kafka Cluster》一文给你们分享了Kafka集群的搭建部署,让你们掌握了集群的搭建步骤,《Kafka实战-实时日志统计流程》一文给你们讲解一个项目(或者说是系统)的总体流程,《Kafka实战-Flume到Kafka》一文给你们介绍了Kafka的数据生产过程,《Kafka实战-Kafka到Storm》一文给你们介绍了Kafka的数据消费,经过Storm来实时计算处理。今天进入Kafka实战的最后一个环节,那就是Kafka实战的结果的数据持久化。下面是今天要分享的内容目录:html

  • 结果持久化
  • 实现过程
  • 结果预览

  下面开始今天的分享内容。前端

2.结果持久化

  通常,咱们在进行实时计算,将结果统计处理后,须要将结果进行输出,供前端工程师去展现咱们统计的结果(所说的报表)。结果的存储,这里咱们选择的是Redis+MySQL进行存储,下面用一张图来展现这个持久化的流程,以下图所示:java

  从途中能够看出,实时计算的部分由Storm集群去完成,而后将计算的结果输出到Redis和MySQL库中进行持久化,给前端展现提供数据源。接下来,我给你们介绍如何实现这部分流程。mysql

3.实现过程

  首先,咱们去实现Storm的计算结果输出到Redis库中,代码以下所示:redis

package cn.hadoop.hdfs.storm;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import redis.clients.jedis.Jedis;
import cn.hadoop.hdfs.util.JedisFactory;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

/**
 * @Date Jun 10, 2015
 *
 * @Author dengjie
 *
 * @Note Calc WordsCount eg.
 */
public class WordsCounterBlots implements IRichBolt {

    /**
     * 
     */
    private static final long serialVersionUID = -619395076356762569L;

    OutputCollector collector;
    Map<String, Integer> counter;

    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.counter = new HashMap<String, Integer>();
    }

    public void execute(Tuple input) {
        String word = input.getString(0);
        Integer integer = this.counter.get(word);
        if (integer != null) {
            integer += 1;
            this.counter.put(word, integer);
        } else {
            this.counter.put(word, 1);
        }
        for (Entry<String, Integer> entry : this.counter.entrySet()) {
           // write result to redis
            Jedis jedis = JedisFactory.getJedisInstance("real-time");
            jedis.set(entry.getKey(), entry.getValue().toString());
            
            // write result to mysql
            // ...
        }
this.collector.ack(input);
    }

    public void cleanup() {
        // TODO Auto-generated method stub
        
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

}

  注:这里关于输出到MySQL就不赘述了,你们能够按需处理便可。sql

4.结果预览

  在实现持久化到Redis的代码实现后,接下来,咱们经过提交Storm做业,来观察是否将计算后的结果持久化到了Redis集群中。结果以下图所示:前端工程师

  经过Redis的Client来浏览存储的Key值,能够观察统计的结果持久化到来Redis中。oop

5.总结

  咱们在提交做业到Storm集群的时候须要观察做业运行情况,有可能会出现异常,咱们能够经过Storm UI界面来观察,会有提示异常信息的详细描述。如果出错,你们能够经过Storm UI的错误信息和Log日志打印的错误信息来定位出缘由,从而找到对应的解决办法。post

6.结束语

  这篇博客就和你们分享到这里,若是你们在研究学习的过程中有什么问题,能够加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!学习

相关文章
相关标签/搜索