[转]flume-ng+Kafka+Storm+HDFS 实时系统搭建

一直以来都想接触Storm实时计算这块的东西,以前在弄这个的时候,跟群里的一些人讨论过,有的人说,直接用storm不就能够作实时处理了,用不着那么麻烦;其实否则,作软件开发的都知道模块化思想,这样设计的缘由有两方面:
一方面是能够模块化,功能划分更加清晰,从“数据采集--数据接入--流失计算--数据输出/存储”
 


 html

1).数据采集java

负责从各节点上实时采集数据,选用cloudera的flume来实现mysql

 

2).数据接入git

因为采集数据的速度和数据处理的速度不必定同步,所以添加一个消息中间件来做为缓冲,选用apache的kafkagithub

 

3).流式计算sql

对采集到的数据进行实时分析,选用apache的storm数据库

 

4).数据输出apache

对分析后的结果持久化,暂定用mysql编程

另外一方面是模块化以后,加入当Storm挂掉了以后,数据采集和数据接入仍是继续在跑着,数据不会丢失,storm起来以后能够继续进行流式计算;ubuntu


 

那么接下来咱们来看下总体的架构图

 

 

详细介绍各个组件及安装配置:

操做系统:ubuntu

 

Flume

Flume是Cloudera提供的一个分布式、可靠、和高可用的海量日志采集、聚合和传输的日志收集系统,支持在日志系统中定制各种数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各类数据接受方(可定制)的能力。

 

下图为flume典型的体系结构:

Flume数据源以及输出方式:

 

Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力,在咱们的系统中目前使用exec方式进行日志采集。

Flume的数据接受方,能够是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。在咱们系统中由kafka来接收。
 

Flume下载及文档:

http://flume.apache.org/

Flume安装:

 

  1. $tar zxvf apache-flume-1.4.0-bin.tar.gz/usr/local

复制代码


Flume启动命令:
 

  1. $bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console

复制代码


Kafka
 

kafka是一种高吞吐量的分布式发布订阅消息系统,她有以下特性:

  • 经过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即便数以TB的消息存储也可以保持长时间的稳定性能。
  • 高吞吐量:即便是很是普通的硬件kafka也能够支持每秒数十万的消息。
  • 支持经过kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

kafka的目的是提供一个发布订阅解决方案,它能够处理消费者规模的网站中的全部动做流数据。 这种动做(网页浏览,搜索和其余用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据一般是因为吞吐量的要求而经过处理日志和日志聚合来解决。 对于像Hadoop的同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是经过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了经过集群机来提供实时的消费。

 

kafka分布式订阅架构以下图:--取自Kafka官网

 

罗宝兄弟文章上的架构图是这样的

 

其实二者没有太大区别,官网的架构图只是把Kafka简洁的表示成一个Kafka Cluster,而罗宝兄弟的架构图就相对详细一些;

Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx 
1.     项目核心代码结构截图

分布式框架介绍 - kafkaee - kafkaee的博客

   项目模块依赖分布式框架介绍 - kafkaee - kafkaee的博客

特别提醒:开发人员在开发的时候能够将本身的业务REST服务化或者Dubbo服务化

2.    项目依赖介绍

   2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖以下图:
 

分布式框架介绍 - kafkaee - kafkaee的博客

       2.2 Dubbo独立服务项目依赖以下图:

 分布式框架介绍 - kafkaee - kafkaee的博客

3.  项目功能部分截图:

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客
 

zookeeper、dubbo服务启动 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客
 

dubbo管控台 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 REST服务平台

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

分布式框架介绍 - kafkaee - kafkaee的博客

 

Kafka版本:0.8.0

Kafka下载及文档:http://kafka.apache.org/

Kafka安装:

 

  1. > tar xzf kafka-<VERSION>.tgz
  2. > cd kafka-<VERSION>
  3. > ./sbt update
  4. > ./sbt package
  5. > ./sbt assembly-package-dependency

复制代码

 

启动及测试命令:

(1) start server

 

  1. > bin/zookeeper-server-start.shconfig/zookeeper.properties
  2. > bin/kafka-server-start.shconfig/server.properties

复制代码


这里是官网上的教程,kafka自己有内置zookeeper,可是我本身在实际部署中是使用单独的zookeeper集群,因此第一行命令我就没执行,这里只是些出来给你们看下。
配置独立的zookeeper集群须要配置server.properties文件,讲zookeeper.connect修改成独立集群的IP和端口

 

  1. zookeeper.connect=nutch1:2181

复制代码

(2)Create a topic

  1. > bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
  2. > bin/kafka-list-topic.sh --zookeeperlocalhost:2181

复制代码


(3)Send some messages
 

  1. > bin/kafka-console-producer.sh--broker-list localhost:9092 --topic test

复制代码

(4)Start a consumer

  1. > bin/kafka-console-consumer.sh--zookeeper localhost:2181 --topic test --from-beginning

复制代码

 

kafka-console-producer.sh和kafka-console-cousumer.sh只是系统提供的命令行工具。这里启动是为了测试是否能正常生产消费;验证流程正确性

在实际开发中仍是要自行开发本身的生产者与消费者;

kafka的安装也能够参考我以前写的文章:http://blog.csdn.net/weijonathan/article/details/18075967

 

Storm

Twitter将Storm正式开源了,这是一个分布式的、容错的实时计算系统,它被托管在GitHub上,遵循 Eclipse Public License 1.0。Storm是由BackType开发的实时处理系统,BackType如今已在Twitter麾下。GitHub上的最新版本是Storm 0.5.2,基本是用Clojure写的。

 

 

Storm的主要特色以下:

  • 简单的编程模型。相似于MapReduce下降了并行批处理复杂性,Storm下降了进行实时处理的复杂性。
  • 可使用各类编程语言。你能够在Storm之上使用各类编程语言。默认支持Clojure、Java、Ruby和Python。要增长对其余语言的支持,只需实现一个简单的Storm通讯协议便可。
  • 容错性。Storm会管理工做进程和节点的故障。
  • 水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
  • 可靠的消息处理。Storm保证每一个消息至少能获得一次完整处理。任务失败时,它会负责从消息源重试消息。
  • 快速。系统的设计保证了消息能获得快速的处理,使用ØMQ做为其底层消息队列。(0.9.0.1版本支持ØMQ和netty两种模式)
  • 本地模式。Storm有一个“本地模式”,能够在处理过程当中彻底模拟Storm集群。这让你能够快速进行开发和单元测试。

接下来重头戏开始拉!那就是框架之间的整合啦

 

flume和kafka整合

1.下载

flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin

 

2.提取插件中的flume-conf.properties文件

修改该文件:#source section

producer.sources.s.type = exec
producer.sources.s.command = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c

修改全部topic的值改成test

将改后的配置文件放进flume/conf目录下

在该项目中提取如下jar包放入环境中flume的lib下:

 

 

 

注:这里的flumeng-kafka-plugin.jar这个包,后面在github项目中已经移动到package目录了。找不到的童鞋能够到package目录获取。

完成上面的步骤以后,咱们来测试下flume+kafka这个流程有没有走通;

 

咱们先启动flume,而后再启动kafka,启动步骤按以前的步骤执行;接下来咱们使用kafka的kafka-console-consumer.sh脚本查看是否有flume有没有往Kafka传输数据;

 

以上这个是个人test.log文件经过flume抓取传到kafka的数据;说明咱们的flume和kafka流程走通了;

 

你们还记得刚开始咱们的流程图么,其中有一步是经过flume到kafka,还有一步是到hdfs的;而咱们这边尚未提到如何存入kafka且同时存如hdfs;

flume是支持数据同步复制,同步复制流程图以下,取自于flume官网,官网用户指南地址:http://flume.apache.org/FlumeUserGuide.html

 

怎么设置同步复制呢,看下面的配置:

 

  1. #2个channel和2个sink的配置文件 这里咱们能够设置两个sink,一个是kafka的,一个是hdfs的;
  2. a1.sources = r1
  3. a1.sinks = k1 k2
  4. a1.channels = c1 c2

复制代码


具体配置大伙根据本身的需求去设置,这里就不具体举例了
 

kafka和storm的整合

 

1.下载kafka-storm0.8插件:https://github.com/wurstmeister/storm-kafka-0.8-plus

 

2.使用maven package进行编译,获得storm-kafka-0.8-plus-0.3.0-SNAPSHOT.jar包  --有转载的童鞋注意下,这里的包名以前写错了,如今改正确了!很差意思!

 

3.将该jar包及kafka_2.9.2-0.8.0-beta1.jar、metrics-core-2.2.0.jar、scala-library-2.9.2.jar (这三个jar包在kafka项目中能找到)

备注:若是开发的项目须要其余jar,记得也要放进storm的Lib中好比用到了mysql就要添加mysql-connector-java-5.1.22-bin.jar到storm的lib下

那么接下来咱们把storm也重启下;

 

完成以上步骤以后,咱们还有一件事情要作,就是使用kafka-storm0.8插件,写一个本身的Storm程序;

这里我给大伙附上一个我弄的storm程序,百度网盘分享地址:连接: http://pan.baidu.com/s/1dD28mDr 密码: 44r3

 

先稍微看下程序的建立Topology代码

 

 

 

数据操做主要在WordCounter类中,这里只是使用简单JDBC进行插入处理

 

 

 

这里只须要输入一个参数做为Topology名称就能够了!咱们这里使用本地模式,因此不输入参数,直接看流程是否走通;


 

  • storm-0.9.0.1/bin/storm jar storm-start-demo-0.0.1-SNAPSHOT.jar com.storm.topology.MyTopology


 

先看下日志,这里打印出来了往数据库里面插入数据了

 

 

而后咱们查看下数据库;插入成功了!

 
到这里咱们的整个整合就完成了!


 

可是这里还有一个问题,不知道大伙有没有发现。

因为咱们使用storm进行分布式流式计算,那么分布式最须要注意的是数据一致性以及避免脏数据的产生;因此我提供的测试项目只能用于测试,正式开发不能这样处理;

同时给的建议是创建一个zookeeper的分布式全局锁,保证数据一致性,避免脏数据录入!

相关文章
相关标签/搜索