sparkstreaming

流(Streaming),在大数据时代为数据流处理,就像水流同样,是数据流;既然是数据流处理,就会想到数据的流入、数据的加工、数据的流出。html

平常工做、生活中数据来源不少不一样的地方。例如:工业时代的汽车制造、监控设备、工业设备会产生不少源数据;信息时代的电商网站、日志服务器、社交网络、金融交易系统、黑客攻击、垃圾邮件、交通监控等;通讯时代的手机、平板、智能设备、物联网等会产生不少实时数据,数据流无处不在。java

在大数据时代Spark Streaming能作什么?web

平时用户都有网上购物的经历,用户在网站上进行的各类操做经过Spark Streaming流处理技术能够被监控,用户的购买爱好、关注度、交易等能够进行行为分析。在金融领域,经过Spark Streaming流处理技术能够对交易量很大的帐号进行监控,防止罪犯洗钱、财产转移、防欺诈等。在网络安全性方面,黑客攻击时有发生,经过Spark Streaming流处理技术能够将某类可疑IP进行监控并结合机器学习训练模型匹配出当前请求是否属于黑客攻击。其余方面,如:垃圾邮件监控过滤、交通监控、网络监控、工业设备监控的背后都是Spark Streaming发挥强大流处理的地方。算法

大数据时代,数据价值通常怎么定义?数据库

全部没通过流处理的数据都是无效数据或没有价值的数据;数据产生以后当即处理产生的价值是最大的,数据放置越久或越滞后其使用价值越低。之前绝大多数电商网站盈利走的是网络流量(即用户的访问量),现在,电商网站不只仅须要关注流量、交易量,更重要的是要经过数据流技术让电商网站的各类数据流动起来,经过实时流动的数据及时分析、挖掘出各类有价值的数据;好比:对不一样交易量的用户指定用户画像,从而提供不一样服务质量;准对用户访问电商网站板块爱好及时推荐相关的信息。apache

SparkStreaming VS Hadoop MR:编程

Spark Streaming是一个准实时流处理框架,而Hadoop MR是一个离线、批处理框架;很显然,在数据的价值性角度,Spark Streaming完胜于Hadoop MR。ubuntu

SparkStreaming VS Storm:api

Spark Streaming是一个准实时流处理框架,处理响应时间通常以分钟为单位,也就是说处理实时数据的延迟时间是秒级别的;Storm是一个实时流处理框架,处理响应是毫秒级的。因此在流框架选型方面要看具体业务场景。须要澄清的是如今不少人认为Spark Streaming流处理运行不稳定、数据丢失、事务性支持很差等等,那是由于不少人不会驾驭Spark Streaming及Spark自己。在Spark Streaming流处理的延迟时间方面,Spark定制版本,会将Spark Streaming的延迟从秒级别推动到100毫秒以内甚至更少。数组

SparkStreaming优势:

一、提供了丰富的API,企业中能快速实现各类复杂的业务逻辑。

二、流入Spark Streaming的数据流经过和机器学习算法结合,完成机器模拟和图计算。

三、Spark Streaming基于Spark优秀的血统。

 

SparkStreaming能不能像Storm同样,一条一条处理数据?

Storm处理数据的方式是以条为单位来一条一条处理的,而Spark Streaming基于单位时间处理数据的,SparkStreaming能不能像Storm同样呢?答案是:能够的。

业界通常的作法是Spark Streaming和Kafka搭档便可达到这种效果,入下图:

 

Kafka业界认同最主流的分布式消息框架,此框架即符合消息广播模式又符合消息队列模式。

Kafka内部使用的技术:

一、  Cache

二、  Interface

三、  Persistence(默认最大持久化一周)

四、  Zero-Copy技术让Kafka每秒吞吐量几百兆,并且数据只须要加载一次到内核提供其余应用程序使用

外部各类源数据推动(Push)Kafka,而后再经过Spark Streaming抓取(Pull)数据,抓取的数据量能够根据本身的实际状况肯定每一秒中要处理多少数据。

 

经过Spark Streaming动手实战wordCount实例

这里是运行一个Spark Streaming的程序:统计这个时间段内流进来的单词出现的次数. 它计算的是:他规定的时间段内每一个单词出现了多少次。

一、先启动下Spark集群:

咱们从集群里面打开下官方网站

 

接受这个数据进行加工,就是流处理的过程,刚才那个WordCount就是以1s作一个单位。

刚才运行的时候,为何没有结果呢?由于须要数据源。

二、获取数据源:

 

新开一个命令终端,而后输入:

$ nc -lk 9999

如今咱们拷贝数据源进入运行:

 

 

而后按回车运行

 

 

DStream和RDD关系:

没有输入数据会打印的是空结果:

 

 可是实际上,Job的执行是Spark Streaming框架帮咱们产生的和开发者本身写的Spark代码业务逻辑没有关系,并且Spark Streaming框架的执行时间间隔能够手动配置,如:每隔一秒钟就会产生一次Job的调用。因此在开发者编写好的Spark代码时(如:flatmap、map、collect),不会致使job的运行,job运行是Spark Streaming框架产生的,能够配置成每隔一秒中都会产生一次job调用。
Spark Streaming流进来的数据是DStream,但Spark Core框架只认RDD,这就产生矛盾了?
Spark Streaming框架中,做业实例的产生都是基于rdd实例来产生,你写的代码是做业的模板,即rdd是做业的模板,模板一运行rdd就会被执行,此时action必须处理数据。RDD的模板就是DStream离散流,RDD之间存在依赖关系,DStream就有了依赖关系,也就构成了DStream 有向无环图。这个DAG图,是模板。Spark Streaming只不过是在附在RDD上面一层薄薄的封装而已。你写的代码不能产生Job,只有框架才能产生Job.
若是一秒内计算不完数据,就只能调优了.

总结:

使用Spark Streaming能够处理各类数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是不少时候你们不会用,其真正缘由是对Spark、spark streaming自己不了解。

 

Scala和Java二种方式实战Spark Streaming开发

1、Java方式开发

一、开发前准备:假定您以搭建好了Spark集群。

二、开发环境采用eclipse maven工程,须要添加Spark Streaming依赖。

三、Spark streaming 基于Spark Core进行计算,须要注意事项:

设置本地master,若是指定local的话,必须配置至少二条线程,也可经过sparkconf来设置,由于Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据,而且至少有一条线程用于处理接收的数据(不然的话没法有线程用于处理数据),随着时间的推移,内存和磁盘都会不堪重负)。

舒适提示:

对于集群而言,每隔exccutor通常确定不仅一个Thread,那对于处理Spark Streaming应用程序而言,每一个executor通常分配多少core比较合适?根据咱们过去的经验,5个左右的core是最佳的(段子:分配为奇数个core的表现最佳,例如:分配3个、5个、7个core等)

接下来,让咱们开始动手写写Java代码吧!

第一步:建立SparkConf对象

 

第二步:建立SparkStreamingContext

咱们采用基于配置文件的方式建立SparkStreamingContext对象:

第三步,建立Spark Streaming输入数据来源:

  咱们将数据来源配置为本地端口9999(注意端口要求没有被占用):

第四步:咱们就像对RDD编程同样,基于DStream进行编程,缘由是DStream是RDD产生的模板,在Spark Streaming发生计算前,其实质是把每一个Batch的DStream的操做翻译成为了RDD操做。

一、flatMap操做:

二、 mapToPair操做:

 

三、reduceByKey操做:

四、print等操做:

舒适提示:

除了print()方法将处理后的数据输出以外,还有其余的方法也很是重要,在开发中须要重点掌握,好比SaveAsTextFile,SaveAsHadoopFile等,最为重要的是foreachRDD方法,这个方法能够将数据写入Redis,DB,DashBoard等,甚至能够随意的定义数据放在哪里,功能很是强大。

1、Scala方式开发

第一步,接收数据源:

第二步,flatMap操做:

第三步,map操做:

第四步,reduce操做:

第五步,print()等操做:

第六步:awaitTermination操做

 

总结:

使用Spark Streaming能够处理各类数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是不少时候你们不会用,其真正缘由是对Spark、spark streaming自己不了解。

 
 

StreamingContext、DStream、Receiver深度剖析

 

1、StreamingContext功能及源码剖析:

一、  经过Spark Streaming对象jssc,建立应用程序主入口,并连上Driver上的接收数据服务端口9999写入源数据:

 

二、  Spark Streaming的主要功能有:

  • 主程序的入口;
  • 提供了各类建立DStream的方法接收各类流入的数据源(例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等);
  • 经过构造函数实例化Spark Streaming对象时,能够指定master URL、appName、或者传入SparkConf配置对象、或者已经建立的SparkContext对象;
  • 将接收的数据流传入DStreams对象中;
  • 经过Spark Streaming对象实例的start方法启动当前应用程序的流计算框架或经过stop方法结束当前应用程序的流计算框架;

 

 2、DStream功能及源码剖析:

一、  DStream是RDD的模板,DStream是抽象的,RDD也是抽象

二、  DStream的具体实现子类以下图所示:

 

三、  以StreamingContext实例的socketTextSteam方法为例,其执行完的结果返回DStream对象实例,其源码调用过程以下图:

socket.getInputStream获取数据,while循环来存储储蓄数据(内存、磁盘)

3、Receiver功能及源码剖析:

一、Receiver表明数据的输入,接收外部输入的数据,如从Kafka上抓取数据;

二、Receiver运行在Worker节点上;

三、Receiver在Worker节点上抓取Kafka分布式消息框架上的数据时,具体实现类是KafkaReceiver;

四、Receiver是抽象类,其抓取数据的实现子类以下图所示:

 

五、  若是上述实现类都知足不了您的要求,您本身能够定义Receiver类,只须要继承Receiver抽象类来实现本身子类的业务需求。

4、StreamingContext、DStream、Receiver结合流程分析:

 

(1)inputStream表明了数据输入流(如:Socket、Kafka、Flume等)

(2)Transformation表明了对数据的一系列操做,如flatMap、map等

(3)outputStream表明了数据的输出,例如wordCount中的println方法:

数据数据在流进来以后最终会生成Job,最终仍是基于Spark Core的RDD进行执行:在处理流进来的数据时是DStream进行Transformation因为是StreamingContext因此根本不会去运行,StreamingContext会根据Transformation生成”DStream的链条”及DStreamGraph,而DStreamGraph就是DAG的模板,这个模板是被框架托管的。当咱们指定时间间隔的时候,Driver端就会根据这个时间间隔来触发Job而触发Job的方法就是根据OutputDStream中指定的具体的function,例如wordcount中print,这个函数必定会传给ForEachDStream,它会把函数交给最后一个DStream产生的RDD,也就是RDD的print操做,而这个操做就是RDD触发Action。

总结:

使用Spark Streaming能够处理各类数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是不少时候你们不会用,其真正缘由是对Spark、spark streaming自己不了解。

 
 

基于HDFS的SparkStreaming案例实战

一:Spark集群开发环境准备

  1. 启动HDFS,以下图所示:

 

经过web端查看节点正常启动,以下图所示:

2.启动Spark集群,以下图所示:

经过web端查看集群启动正常,以下图所示:

3.启动start-history-server.sh,以下图所示:

二:HDFS的SparkStreaming案例实战(代码部分)

package com.dt.spark.SparkApps.sparkstreaming;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.Java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.Arrays;

/**
 * Created by Jonson on 2016/4/17.
 */
public class SparkStreamingOnHDFS {
    public static void main(String[] args){
        /**
         * 第一步:配置SparkConf
         * 1. 至少两条线程:
         * 由于Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据,
         * 而且至少有一条线程用于处理接收的数据(不然的话没法有线程用于处理数据,随着时间的推移,内存和磁盘都不堪重负)
         * 2. 对于集群而言,每一个Executor通常而言确定不止一个线程,对于处理Spark Streaming的应用程序而言,每一个Executor通常
         * 分配多少个Core合适呢?根据咱们过去的经验,5个左右的core是最佳的(分配为奇数个Core为最佳)。
         */
        final SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("SparkOnStreamingOnHDFS");
        /**
         * 第二步:建立SparkStreamingContext,这个是Spark Streaming应用程序全部功能的起始点和程序调度的核心
         * 1,SparkStreamingContext的构建能够基于SparkConf参数,也能够基于持久化SparkStreamingContext的内容
         * 来恢复过来(典型的场景是Driver崩溃后从新启动,因为Spark Streaming具备连续7*24小时不间断运行的特征,
         * 全部须要在Driver从新启动后继续上一次的状态,此时状态的恢复须要基于曾经的checkpoint)
         * 2,在一个Spark Streaming应用程序中能够建立若干个SparkStreamingContext对象,使用下一个SparkStreamingContext
         * 以前须要把前面正在运行的SparkStreamingContext对象关闭掉,由此,咱们得到一个重大启发:SparkStreamingContext
         * 是Spark core上的一个应用程序而已,只不过Spark Streaming框架箱运行的话须要Spark工程师写业务逻辑
         */
//        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));//Durations.seconds(5)设置每隔5秒

        final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/Checkpoint_Data";
        JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
            @Override
            public JavaStreamingContext create() {
                return createContext(checkpointDirectory,conf);
            }
        };
        /**
         * 能够从失败中恢复Driver,不过还须要制定Driver这个进程运行在Cluster,而且提交应用程序的时候
         * 指定 --supervise;
         */
        JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
        /**
         * 如今是监控一个文件系统的目录
         * 此处没有Receiver,Spark Streaming应用程序只是按照时间间隔监控目录下每一个Batch新增的内容(把新增的)
         * 做为RDD的数据来源生成原始的RDD
         */
        //指定从HDFS中监控的目录
        JavaDStream lines = jsc.textFileStream("hdfs://Master:9000/library/SparkStreaming/Data");
        /**
         * 第四步:接下来就像对于RDD编程同样基于DStreaming进行编程!!!
         * 缘由是:
         *  DStreaming是RDD产生的模板(或者说类)。
         *  在Spark Streaming具体发生计算前其实质是把每一个batch的DStream的操做翻译成对RDD的操做!!
         *  对初始的DStream进行Transformation级别的处理,例如Map,filter等高阶函数的编程,来进行具体的数据计算。
         *  第4.1步:将每一行的字符串拆分红单个单词
         */
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });
        /**
         * 第4.2步:对初始的JavaRDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算
         * 在4.1的基础上,在单词拆分的基础上对每一个单词实例计数为1,也就是word => (word,1)
         */
        JavaPairDStream<String,Integer> pairs  = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String,Integer>(word,1);
            }
        });
        /**
         * 第4.3步:在每一个单词实例计数的基础上统计每一个单词在文件中出现的总次数
         */
        JavaPairDStream<String,Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        /**
         * 此处的print并不会直接触发Job的执行,由于如今的一切都是在Spark Streaming框架控制下的,对于Spark而言具体是否
         * 触发真正的Job运行是基于设置的Duration时间间隔的
         * 必定要注意的是:Spark Streaming应用程序要想执行具体的Job,对DStream就必须有output Stream操做,
         * output Stream有不少类型的函数触发,例如:print,saveAsTextFile,saveAsHadoopFiles等,其实最为重要的一个方法是
         * foraeachRDD,由于Spark Streaming处理的结果通常都会放在Redis,DB,DashBoard等上面,foreachRDD主要就是用来完成这些
         * 功能的,并且能够随意的自定义具体数据到底存放在哪里!!!
         */
        wordscount.print();
        /**
         * Spark Streaming执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的。
         * 固然其内部有消息循环体用于接收应用程序自己或者Executor的消息;
         */
        jsc.start();
        jsc.awaitTermination();
        jsc.close();
    }
    /**
     * 工厂化模式构建JavaStreamingContext
     */
    private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf){
        System.out.println("Creating new context");
        SparkConf = conf;
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.seconds(5));
        ssc.checkpoint(checkpointDirectory);
        return ssc;
    }
}

代码打包在集群中运行

  1. 建立目录

 

  

2.脚本运行

  脚本内容以下:

此时Spark Streaming会每隔5秒执行一次,不断的扫描监控目录下是否有新的文件。

3.上传文件到HDFS中的Data目录下

4.输出结果

三:Spark Streaming on HDFS源码解密

  1. JavaStreamingContextFactory的create方法能够建立JavaStreamingContext
  2. 而咱们在具体实现的时候覆写了该方法,内部就是调用createContext方法来具体实现。上述实战案例中咱们实现了createContext方法。
/*** Factory interface for creating a new JavaStreamingContext
 */
trait JavaStreamingContextFactory {
  def create(): JavaStreamingContext
}

3.checkpoint:

  一方面:保持容错

  一方面保持状态

  在开始和结束的时候每一个batch都会进行checkpoint

** Sets the context to periodically checkpoint the DStream operations for master

 * fault-tolerance. The graph will be checkpointed every batch interval.
 * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
 */
def checkpoint(directory: String) {
  ssc.checkpoint(directory)
}
4.remember:
流式处理中过一段时间数据就会被清理掉,可是能够经过remember能够延长数据在程序中的生命周期,另外延长RDD更长的时间。

应用场景:

假设数据流进来,进行ML或者Graphx的时候有时须要很长时间,可是bacth定时定条件的清除RDD,因此就能够经过remember使得数据能够延长更长时间。/**

 * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
 * DStreams remember RDDs only for a limited duration of duration and releases them for garbage
 * collection. This method allows the developer to specify how long to remember the RDDs (
 * if the developer wishes to query old data outside the DStream computation).
 * @param duration Minimum duration that each DStream should remember its RDDs
 */
def remember(duration: Duration) {
  ssc.remember(duration)
}
5.在JavaStreamingContext中,getOrCreate方法源码以下:

  若是设置了checkpoint ,重启程序的时候,getOrCreate()会从新从checkpoint目录中初始化出StreamingContext。

/* * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.

 * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
 * recreated from the checkpoint data. If the data does not exist, then the provided factory
 * will be used to create a JavaStreamingContext.
 *
 * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
 * @param factory        JavaStreamingContextFactory object to create a new JavaStreamingContext
 * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor.
 */
@deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0")
def getOrCreate(
    checkpointPath: String,
    factory: JavaStreamingContextFactory
  ): JavaStreamingContext = {
  val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
    factory.create.ssc
  })
  new JavaStreamingContext(ssc)
}
异常问题思考:

为啥会报错?
  1. Streaming会按期的进行checkpoint。
  2. 从新启动程序的时候,他会从曾经checkpoint的目录中,若是没有作额外配置的时候,全部的信息都会放在checkpoint的目录中(包括曾经应用程序信息),所以下次再次启动的时候就会报错,没法初始化ShuffleDStream。

总结:

使用Spark Streaming能够处理各类数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是不少时候你们不会用,其真正缘由是对Spark、spark streaming自己不了解。

 

SparkStreaming数据源Flume实际案例

1、什么是Flume?
  flume 做为 cloudera 开发的实时日志收集系统,受到了业界的承认与普遍应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤为是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤其严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另外一缘由是将 Flume 归入 apache 旗下,cloudera Flume 更名为 Apache Flume。

   flume的特色:
  flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各种数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各类数据接受方(好比文本、HDFS、Hbase等)的能力 。
  flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)而且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,而后Source会把事件推入(单个或多个)Channel中。你能够把Channel看做是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另外一个Source。
 flume的可靠性 
  当节点出现故障时,日志可以被传送到其余节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;若是数据发送失败,能够从新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Besteffort(数据发送到接收方后,不会进行确认)。
flume的可恢复性:
  仍是靠Channel。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。 
flume的一些核心概念:
Agent        使用JVM 运行Flume。每台机器运行一个agent,可是能够在一个agent中包含多个sources和sinks。

  1. Client        生产数据,运行在一个独立的线程。
  2. Source        从Client收集数据,传递给Channel。
  3. Sink        从Channel收集数据,运行在一个独立线程。
  4. Channel        链接 sources 和 sinks ,这个有点像一个队列。
  5. Events        能够是日志记录、 avro 对象等。

 Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成,以下图:

  值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不一样类型的Source,Channel和Sink能够自由组合。组合方式基于用户设置的配置文件,很是灵活。好比:Channel能够把事件暂存在内存里,也能够持久化到本地硬盘上。Sink能够把日志写入HDFS, HBase,甚至是另一个Source等等。Flume支持用户创建多级流,也就是说,多个agent能够协同工做,而且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,这也正是NB之处。以下图所示:

2、Flume+Kafka+Spark Streaming应用场景:

一、Flume集群采集外部系统的业务信息,将采集后的信息发生到Kafka集群,最终提供Spark Streaming流框架计算处理,流处理完成后再将最终结果发送给Kafka存储,架构以下图:

二、Flume集群采集外部系统的业务信息,将采集后的信息发生到Kafka集群,最终提供Spark Streaming流框架计算处理,流处理完成后再将最终结果发送给Kafka存储,同时将最终结果经过Ganglia监控工具进行图形化展现,架构以下图:

三、咱们要作:Spark streaming 交互式的360度的可视化,Spark streaming 交互式3D可视化UI;Flume集群采集外部系统的业务信息,将采集后的信息发生到Kafka集群,最终提供Spark Streaming流框架计算处理,流处理完成后再将最终结果发送给Kafka存储,将最终结果同时存储在数据库MySQL)、内存中间件(Redis、MemSQL)中,同时将最终结果经过Ganglia监控工具进行图形化展现,架构以下图:

3、Kafka数据写入Spark Streaming有二种方式:

一种是Receivers,这个方法使用了Receivers来接收数据,Receivers的实现使用到Kafka高层次的消费者API,对于全部的Receivers,接收到的数据将会保存在Spark 分布式的executors中,而后由Spark Streaming启动的Job来处理这些数据;然而,在默认的配置下,这种方法在失败的状况下会丢失数据,为了保证零数据丢失,你能够在Spark Streaming中使用WAL日志功能,这使得咱们能够将接收到的数据保存到WAL中(WAL日志能够存储在HDFS上),因此在失败的时候,咱们能够从WAL中恢复,而不至于丢失数据。

另外一种是DirectAPI,产生数据和处理数据的时候是在两台机器上?实际上是在同一台数据上,因为在一台机器上有Driver和Executor,因此这台机器要足够强悍。

Flume集群将采集的数据放到Kafka集群中,Spark Streaming会实时在线的从Kafka集群中经过DirectAPI拿数据,能够经过Kafka中的topic+partition查询最新的偏移量(offset)来读取每一个batch的数据,即便读取失败也可再根据偏移量来读取失败的数据,保证应用运行的稳定性和数据可靠性。

 

舒适提示:

一、Flume集群数据写入Kafka集群时可能会致使数据存放不均衡,即有些Kafka节点数据量很大、有些不大,后续会对分发数据进行自定义算法来解决数据存放不均衡问题。

二、我的强烈推荐在生产环境下用DirectAPI,可是咱们的发行版,会对DirectAPI进行优化,下降其延迟。

总结:

  实际生产环境下,搜集分布式的日志以Kafka为核心。

使用Spark Streaming能够处理各类数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是不少时候你们不会用,其真正缘由是对Spark、spark streaming自己不了解。

 

Spark Streaming on Kafka解析和安装实战

本课分2部分讲解:

第一部分,讲解Kafka的概念、架构和用例场景;

第二部分,讲解Kafka的安装和实战。

因为时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功。后续课程会接着讲解如何集成Spark Streaming和Kafka。

1、Kafka的概念、架构和用例场景

http://kafka.apache.org/documentation.html#introdution

一、Kafka的概念

Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,以后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。zookeeper和kafka和大数据是不仅能用于大数据的,集群启不启动,它均可以使用,也能够用于Javaserver普通的企业级平台上。

什么是消息组件:

以帅哥和美女聊天为例,帅哥如何和美女交流呢?这中间一般想到的是微信、QQ、电话、邮件等通讯媒介,这些通讯媒介就是消息组件,帅哥把聊天信息发送给消息组件、消息组件将消息推送给美女,这就是常说的生产者、消费者模型,kafka不只仅说是生产者消费者模式中广播的概念,也能够实现队列的方式,kafka的消费者中有一个group的概念,group中能够有不少实体,也能够只有一个实体,group中只有一个实体的话,就是队列的方式,因此从消息驱动的角度讲,它是广播的方式和队列的方式的完美结合体。并且在发送信息时能够将内容进行分类,即所谓的Topic主题。Kafka就是这样的通讯组件,将不一样对象组件粘合起来的纽带, 且是解耦合方式传递数据。

完善的流处理系统的特色:

1)能在线的以很是低的延迟,来处理数据,并且是稳定可靠的

2)能对流进来的数据进行很是复杂的分析,而不是简单的仅仅统计的分析

3)不只能处理当前在线的数据,也能处理过去一天,一周,一个月甚至一年的数据

Apache Kafka与传统消息系统相比,有如下不一样的特色:

  • 分布式系统,易于向外扩展;
  • 在线低延迟,同时为发布和订阅提供高吞吐量;
  • 流进来的数据通常处理完后就消失了,也能够将消息存储到磁盘,所以能够处理1天甚至1周前内容,因此kafka不只是一个消息中间件,仍是一个存储系统

二、Kafka的架构

Kafka既然具有消息系统的基本功能,那么就必然会有组成消息系统的组件:

Topic,Producer和Consumer。Kafka还有其特殊的Kafka Cluster组件。

Topic主题:

表明一种数据的类别或类型,工做、娱乐、生活有不一样的Topic,生产者须要说明把说明数据分别放在那些Topic中,里面就是一个个小对象,并将数据数据推到Kafka,消费者获取数据是pull的过程。一组相同类型的消息数据流。这些消息在Kafka会被分区存放,而且有多个副本,以防数据丢失。每一个分区的消息是顺序写入的,而且不可改写。

-       Producer(生产者):把数据推到Kafka系统的任何对象。

 

- Kafka Cluster(Kafka集群):把推到Kafka系统的消息保存起来的一组服务器,也叫Broker。由于Kafka集群用到了Zookeeper做为底层支持框架,因此由一个选出的服务器做为Leader来处理全部消息的读和写的请求,其余服务器做为Follower接受Leader的广播同步备份数据,以备灾难恢复时用。

- Consumer(消费者):从Kafka系统订阅消息的任何对象。

消费者能够有多个,而且某些消费者还能够组成Consumer Group。多个Consumer Group之间组成消息广播的关系,因此各个Group能够拉相同的消息数据。在Consumer Group内部,各消费者之间对Consumer Group拉出来的消息数据是队列先进先出的关系,某个消息数据只能给该Group的一个消费者使用,同一个Group中的实体是互斥的,对一个消息,这样是避免重复消费。若是有多个group,每一个group中只有一个实体,这就是队列的方式了,由于它是互斥的。若是不是一个实体,则是广播模式,以下图所示,广播只能广播给一个group中的一个消费实体

kafka的数据传输是基于kernel(内核)级别的(传输速度接近0拷贝-ZeroCopy)、没有用户空间的参与。Linux自己是软件,软件启动时第一个启动进程叫init,在init进程启动后会进入用户空间;kafka是用java写的,是基于jvm虚拟机的。例如:在分布式系统中,机器A上的应用程序须要读取机器B上的Java服务数据,因为Java程序对应的JVM是用户空间级别并且数据在磁盘上,A上应用程序读取数据时会首先进入机器B上的内核空间再进入机器B的用户空间,读取用户空间的数据后,数据再通过B机器上的内核空间分发到网络中(之因此要再通过B的内核,由于要经过网络通讯,不经过内核,哪里来的网络通讯),机器A网卡接收到传输过来的数据后再将数据写入A机器的内核空间,从而最终将数据传输给A的用户空间进行处理。以下图:网络自己是一种硬件,磁盘只是硬件的一种。

正常状况下,外部系统从Java程序中读取数据,传输给内核空间并依赖网卡将数据写入到网络中,从而把数据传输出去。其实Java自己是内核的一层外衣,Java Socket编程,操做的各类数据都是在JVM的用户空间中进行的。而Kafka操做数据是放在内核空间的,一般内核空间处理数据的速度比用户空间快上万倍,由于没用用户态和内核态的切换,因此经过kafka能够实现高速读、写数据。只要磁盘空间足够大,能够无限量的存储数据,kafka的数据就是存储在磁盘中的,不是存在内核中的。而不少消息组件是把数据存内存中的。kafka用zookeeperg管理元数据,并且按顺序写数据,比随机写要快不少。又有副本!

三、Kafka的用例场景 

相似微信,手机和邮箱等等这样你们熟悉的消息组件,Kafka也能够:

-       支持文字/图片

-       能够存储内容

-       分门别类

从内容消费的角度,Kafka把邮箱中的邮件类型当作是Topic。

2、Kafka的安装和实战

http://kafka.apache.org/documentation.html#quickstart

一、安装和配置Zookeeper

Kafka集群模式须要提早安装好Zookeeper。

-       提示:Kafka单例模式不须要安装额外的Zookeeper,可使用内置的Zookeeper。

-       Kafka集群模式须要至少3台服务器。本课实战用到的服务器Hostname:master,slave1,slave2。

-       本课中用到的Zookeeper版本是Zookeeper-3.4.6。

1)    下载Zookeeper

进入http://www.apache.org/dyn/closer.cgi/zookeeper/,你能够选择其余镜像网址去下载,用官网推荐的镜像:http://mirror.bit.edu.cn/apache/zookeeper/。提示:能够直接下载群里的Zookeeper安装文件。

下载zookeeper-3.4.6.tar.gz

1)    安装Zookeeper

提示:下面的步骤发生在master服务器。

以ubuntu14.04举例,把下载好的文件放到/root目录,用下面的命令解压:

cd /root

tar -zxvf zookeeper-3.4.6.tar.gz

解压后在/root目录会多出一个zookeeper-3.4.6的新目录,用下面的命令把它剪切到指定目录即安装好Zookeeper了:

cd /root

mv zookeeper-3.4.6 /usr/local/spark

以后在/usr/local/spark目录会多出一个zookeeper-3.4.6的新目录。下面咱们讲如何配置安装好的Zookeeper。

2)    配置Zookeeper

提示:下面的步骤发生在master服务器。

  1. 配置.bashrc

-       打开文件:vi /root/.bashrc

-       在PATH配置行前添加:

export ZOOKEEPER_HOME=/usr/local/spark/zookeeper-3.4.6

-       最后修改PATH:

export PATH=${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${HIVE_HOME}/bin:${KAFKA_HOME}/bin:$PATH

-       使配置的环境变量当即生效:source /root/.bashrc

  1. 建立data目录

-       cd $ZOOKEEPER_HOME

-       mkdir data

  1. 建立并打开zoo.cfg文件

-       cd $ZOOKEEPER_HOME/conf

-       cp zoo_sample.cfg zoo.cfg

-       vi zoo.cfg

  1. 配置zoo.cfg

# 配置Zookeeper的日志和服务器身份证号等数据存放的目录。

# 千万不要用默认的/tmp/zookeeper目录,由于/tmp目录的数据容易被意外删除。

dataDir=../data

# Zookeeper与客户端链接的端口

clientPort=2181

# 在文件最后新增3行配置每一个服务器的2个重要端口:Leader端口和选举端口

# server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;

# B 是这个服务器的hostname或ip地址;

# C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;

# D 表示的是万一集群中的 Leader 服务器挂了,须要一个端口来从新进行选举,

# 选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通讯的端口。

# 若是是伪集群的配置方式,因为 B 都是同样,因此不一样的 Zookeeper 实例通讯

# 端口号不能同样,因此要给它们分配不一样的端口号。

server.1=master:2888:3888

server.2=slave1:2888:3888

server.3=slave2:2888:3888

改为以下方式:

dataDir=/usr/local/spark/zookeeper-3.4.6/data
dataLogDir=/usr/local/spark/zookeeper-3.4.6/logs
clientPort=2181
server.0=master1:2888:3888
server.1=work1:2888:3888
server.2=work2:2888:3888

 

  1. 建立并打开myid文件

-       cd $ZOOKEEPER_HOME/data

-       touch myid

-       vi myid

  1. 配置myid

按照zoo.cfg的配置,myid的内容就是1。要写成0,和上面zoo.cfg里面的配置server.0,server.1,server.2一致,因此下面work1中myid内容为1,work2中myid内容为2

3)    同步master的安装和配置到slave1和slave2

-       在master服务器上运行下面的命令

cd /root

scp ./.bashrc root@slave1:/root

scp ./.bashrc root@slave2:/root

cd /usr/local/spark

scp -r ./zookeeper-3.4.6 root@slave1:/usr/local/spark

scp -r ./zookeeper-3.4.6 root@slave2:/usr/local/spark

-       在slave1服务器上运行下面的命令

vi $ZOOKEEPER_HOME/data/myid

按照zoo.cfg的配置,myid的内容就是1。

-       在slave2服务器上运行下面的命令

vi $ZOOKEEPER_HOME/data/myid

按照zoo.cfg的配置,myid的内容就是2。

4)    启动Zookeeper服务

-       在master服务器上运行下面的命令

zkServer.sh start

-       在slave1服务器上运行下面的命令

source /root/.bashrc

zkServer.sh start

-       在slave1服务器上运行下面的命令

source /root/.bashrc

zkServer.sh start

5)    验证Zookeeper是否安装和启动成功

-       在master服务器上运行命令:jps和zkServer.sh status

root@master:/usr/local/spark/zookeeper-3.4.6/bin# jps

3844 QuorumPeerMain

4790 Jps

zkServer.sh status

root@master:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status

JMX enabled by default

Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: follower

-       在slave1服务器上运行命令:jps和zkServer.sh status

source /root/.bashrc

root@slave1:/usr/local/spark/zookeeper-3.4.6/bin# jps

3462 QuorumPeerMain

4313 Jps

root@slave1:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status

JMX enabled by default

Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: follower

-       在slave2服务器上运行命令:jps和zkServer.sh status

root@slave2:/usr/local/spark/zookeeper-3.4.6/bin# jps

4073 Jps

3277 QuorumPeerMain

root@slave2:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status

JMX enabled by default

Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: leader

      至此,表明Zookeeper已经安装和配置成功。

二、安装和配置Kafka

本课中用到的Kafka版本是Kafka-2.10-0.9.0.1。

1)    下载Kafka 

进入http://kafka.apache.org/downloads.html,左键单击kafka_2.10-0.9.0.1.tgz。提示:能够直接下载群里的Kafka安装文件。

下载kafka_2.10-0.9.0.1.tgz

1)    安装Kafka

提示:下面的步骤发生在master服务器。

以ubuntu14.04举例,把下载好的文件放到/root目录,用下面的命令解压:

cd /root

tar -zxvf kafka_2.10-0.9.0.1.tgz

解压后在/root目录会多出一个kafka_2.10-0.9.0.1的新目录,用下面的命令把它剪切到指定目录即安装好Kafka了:

cd /root

mv kafka_2.10-0.9.0.1 /usr/local

以后在/usr/local目录会多出一个kafka_2.10-0.9.0.1的新目录。下面咱们讲如何配置安装好的Kafka。

2)    配置Kafka

提示:下面的步骤发生在master服务器。

  1. 配置.bashrc

-       打开文件:vi /root/.bashrc

-       在PATH配置行前添加:

export KAFKA_HOME=/usr/local/kafka_2.10-0.9.0.1

-       最后修改PATH:

export PATH=${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${HIVE_HOME}/bin:${KAFKA_HOME}/bin:$PATH

-       使配置的环境变量当即生效:source /root/.bashrc

  1. 打开server.properties

-       cd $ZOOKEEPER_HOME/config

-       vi server.properties

  1. 配置server.properties

broker.id=0

port=9092

zookeeper.connect=master:2181,slave1:2181,slave2:2181

3)    同步master的安装和配置到slave1和slave2

-       在master服务器上运行下面的命令

cd /root

scp ./.bashrc root@slave1:/root

scp ./.bashrc root@slave2:/root

cd /usr/local

scp -r ./kafka_2.10-0.9.0.1 root@slave1:/usr/local

scp -r ./kafka_2.10-0.9.0.1 root@slave2:/usr/local

-       在slave1服务器上运行下面的命令

vi $KAFKA_HOME/config/server.properties

修改broker.id=1。

-       在slave2服务器上运行下面的命令

vi $KAFKA_HOME/config/server.properties

修改broker.id=2。

4)    启动Kafka服务

-       在master服务器上运行下面的命令,nohup,在集群上终端不输出启动日志

cd $KAFKA_HOME/bin

nohup ./kafka-server-start.sh ../config/server.properties &

-       在slave1服务器上运行下面的命令

source /root/.bashrc

cd $KAFKA_HOME/bin

nohup ./kafka-server-start.sh ../config/server.properties &

-       在slave2服务器上运行下面的命令

source /root/.bashrc

cd $KAFKA_HOME/bin

kafka-server-start.sh ../config/server.properties &

5)    验证Kafka是否安装和启动成功

-       在任意服务器上运行命令建立Topic“HelloKafka”:

kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 1 --topic HelloKafka

-       在任意服务器上运行命令为建立的Topic“HelloKafka”生产一些消息:

kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic HelloKafka

输入下面的消息内容:

This is DT_Spark!

I’m Rocky!

Life is short, you need Spark!

-       在任意服务器上运行命令从指定的Topic“HelloKafka”上消费(拉取)消息:

kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --from-beginning --topic HelloKafka

过一下子,你会看到打印的消息内容:

This is DT_Spark!

I’m Rocky!

Life is short, you need Spark!

-       在任意服务器上运行命令查看全部的Topic名字:

kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181

-       在任意服务器上运行命令查看指定Topic的概况:

kafka-topics.sh --describe --zookeepermaster:2181,slave1:2181,slave2:2181 --topic HelloKafka

至此,表明Kafka已经安装和配置成功。

相关文章
相关标签/搜索