Update(Stage4):Structured Streaming_介绍_案例

 

  • 1. 回顾和展望
    • 1.1. Spark 编程模型的进化过程
    • 1.2. Spark 的 序列化 的进化过程
    • 1.3. Spark Streaming 和 Structured Streaming
  • 2. Structured Streaming 入门案例
    • 2.1. 需求梳理
    • 2.2. 代码实现
    • 2.3. 运行和结果验证
  • 3. Stuctured Streaming 的体系和结构
    • 3.1. 无限扩展的表格
    • 3.2. 体系结构
  • 4. Source
    • 4.1. 从 HDFS 中读取数据
    • 4.2. 从 Kafka 中读取数据
  • 5. Sink
    • 5.1. HDFS Sink
    • 5.2. Kafka Sink
    • 5.3. Foreach Writer
    • 5.4. 自定义 Sink
    • 5.5. Tigger
    • 5.6. 从 Source 到 Sink 的流程
    • 5.7. 错误恢复和容错语义
  • 6. 有状态算子
    • 6.1. 常规算子
    • 6.2. 分组算子
全天目标
  1. 回顾和展望前端

  2. 入门案例java

  3. Stuctured Streaming 的体系和结构node

 

1. 回顾和展望

本章目标

Structured Streaming 是 Spark Streaming 的进化版, 若是了解了 Spark 的各方面的进化过程, 有助于理解 Structured Streaming 的使命和做用python

本章过程
  1. Spark 的 API 进化过程mysql

  2. Spark 的序列化进化过程算法

  3. Spark Streaming 和 Structured Streamingsql

1.1. Spark 编程模型的进化过程

目标和过程
目标

Spark 的进化过程当中, 一个很是重要的组成部分就是编程模型的进化, 经过编程模型能够看得出来内在的问题和解决方案shell

过程
  1. 编程模型 RDD 的优势和缺陷apache

  2. 编程模型 DataFrame 的优势和缺陷编程

  3. 编程模型 Dataset 的优势和缺陷

20190625103618
编程模型 解释

RDD

rdd.flatMap(_.split(" "))
   .map((_, 1))
   .reduceByKey(_ + _)
   .collect
  • 针对自定义数据对象进行处理, 能够处理任意类型的对象, 比较符合面向对象

  • RDD 没法感知到数据的结构, 没法针对数据结构进行编程

DataFrame

spark.read
     .csv("...")
     .where($"name" =!= "")
     .groupBy($"name")
     .show()
  • DataFrame 保留有数据的元信息, API 针对数据的结构进行处理, 例如说能够根据数据的某一列进行排序或者分组

  • DataFrame 在执行的时候会通过 Catalyst 进行优化, 而且序列化更加高效, 性能会更好

  • DataFrame 只能处理结构化的数据, 没法处理非结构化的数据, 由于 DataFrame 的内部使用 Row 对象保存数据

  • Spark 为 DataFrame 设计了新的数据读写框架, 更增强大, 支持的数据源众多

Dataset

spark.read
     .csv("...")
     .as[Person]
     .where(_.name != "")
     .groupByKey(_.name)
     .count()
     .show()
  • Dataset 结合了 RDD 和 DataFrame 的特色, 从 API 上便可以处理结构化数据, 也能够处理非结构化数据

  • Dataset 和 DataFrame 实际上是一个东西, 因此 DataFrame 的性能优点, 在 Dataset 上也有

总结
RDD 的优势
  1. 面向对象的操做方式

  2. 能够处理任何类型的数据

RDD 的缺点
  1. 运行速度比较慢, 执行过程没有优化

  2. API 比较僵硬, 对结构化数据的访问和操做没有优化

DataFrame 的优势
  1. 针对结构化数据高度优化, 能够经过列名访问和转换数据

  2. 增长 Catalyst 优化器, 执行过程是优化的, 避免了由于开发者的缘由影响效率

DataFrame 的缺点
  1. 只能操做结构化数据

  2. 只有无类型的 API, 也就是只能针对列和 SQL 操做数据, API 依然僵硬

Dataset 的优势
  1. 结合了 RDD 和 DataFrame 的 API, 既能够操做结构化数据, 也能够操做非结构化数据

  2. 既有有类型的 API 也有无类型的 API, 灵活选择

1.2. Spark 的 序列化 的进化过程

目标和过程
目标

Spark 中的序列化过程决定了数据如何存储, 是性能优化一个很是重要的着眼点, Spark 的进化并不仅是针对编程模型提供的 API, 在大数据处理中, 也必需要考虑性能

过程
  1. 序列化和反序列化是什么

  2. Spark 中什么地方用到序列化和反序列化

  3. RDD 的序列化和反序列化如何实现

  4. Dataset 的序列化和反序列化如何实现

Step 1: 什么是序列化和序列化

在 Java 中, 序列化的代码大概以下

public class JavaSerializable implements Serializable { NonSerializable ns = new NonSerializable(); } public class NonSerializable { } public static void main(String[] args) throws IOException { // 序列化 JavaSerializable serializable = new JavaSerializable(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream("/tmp/obj.ser")); // 这里会抛出一个 "java.io.NotSerializableException: cn.itcast.NonSerializable" 异常 objectOutputStream.writeObject(serializable); objectOutputStream.flush(); objectOutputStream.close(); // 反序列化 FileInputStream fileInputStream = new FileInputStream("/tmp/obj.ser"); ObjectInputStream objectOutputStream = new ObjectInputStream(fileInputStream); JavaSerializable serializable1 = objectOutputStream.readObject(); }
序列化是什么
  • 序列化的做用就是能够将对象的内容变成二进制, 存入文件中保存

  • 反序列化指的是将保存下来的二进制对象数据恢复成对象

序列化对对象的要求
  • 对象必须实现 Serializable 接口

  • 对象中的全部属性必须都要能够被序列化, 若是出现没法被序列化的属性, 则序列化失败

限制
  • 对象被序列化后, 生成的二进制文件中, 包含了不少环境信息, 如对象头, 对象中的属性字段等, 因此内容相对较大

  • 由于数据量大, 因此序列化和反序列化的过程比较慢

序列化的应用场景
  • 持久化对象数据

  • 网络中不能传输 Java 对象, 只能将其序列化后传输二进制数据

Step 2: 在  Spark 中的序列化和反序列化的应用场景
  • Task 分发

    20190627194356

    Task 是一个对象, 想在网络中传输对象就必需要先序列化

  • RDD 缓存

    val rdd1 = rdd.flatMap(_.split(" "))
       .map((_, 1))
       .reduceByKey(_ + _)
    
    rdd1.cache
    
    rdd1.collect
    • RDD 中处理的是对象, 例如说字符串, Person 对象等

    • 若是缓存 RDD 中的数据, 就须要缓存这些对象

    • 对象是不能存在文件中的, 必需要将对象序列化后, 将二进制数据存入文件

  • 广播变量

    20190627195544
    • 广播变量会分发到不一样的机器上, 这个过程当中须要使用网络, 对象在网络中传输就必须先被序列化

  • Shuffle 过程

    20190627200225
    • Shuffle 过程是由 Reducer 从 Mapper 中拉取数据, 这里面涉及到两个须要序列化对象的缘由

      • RDD 中的数据对象须要在 Mapper 端落盘缓存, 等待拉取

      • Mapper 和 Reducer 要传输数据对象

  • Spark Streaming 的 Receiver

    20190627200730
    • Spark Streaming 中获取数据的组件叫作 Receiver, 获取到的数据也是对象形式, 在获取到之后须要落盘暂存, 就须要对数据对象进行序列化

  • 算子引用外部对象

    class Unserializable(i: Int)
    
    rdd.map(i => new Unserializable(i))
       .collect
       .foreach(println)
    • 在 Map 算子的函数中, 传入了一个 Unserializable 的对象

    • Map 算子的函数是会在整个集群中运行的, 那 Unserializable 对象就须要跟随 Map 算子的函数被传输到不一样的节点上

    • 若是 Unserializable 不能被序列化, 则会报错

Step 3:  RDD 的序列化
20190627202022
RDD 的序列化

RDD 的序列化只能使用 Java 序列化器, 或者 Kryo 序列化器

为何?
  • RDD 中存放的是数据对象, 要保留全部的数据就必需要对对象的元信息进行保存, 例如对象头之类的

  • 保存一整个对象, 内存占用和效率会比较低一些

Kryo 是什么
  • Kryo 是 Spark 引入的一个外部的序列化工具, 能够增快 RDD 的运行速度

  • 由于 Kryo 序列化后的对象更小, 序列化和反序列化的速度很是快

  • 在 RDD 中使用 Kryo 的过程以下

    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("KyroTest")
    
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.registerKryoClasses(Array(classOf[Person]))
    
    val sc = new SparkContext(conf)
    
    rdd.map(arr => Person(arr(0), arr(1), arr(2)))
Step 4:  DataFrame 和  Dataset 中的序列化
历史的问题

RDD 中没法感知数据的组成, 没法感知数据结构, 只能以对象的形式处理数据

DataFrame 和  Dataset 的特色
  • DataFrame 和 Dataset 是为结构化数据优化的

  • 在 DataFrame 和 Dataset 中, 数据和数据的 Schema 是分开存储的

    spark.read
         .csv("...")
         .where($"name" =!= "")
         .groupBy($"name")
         .map(row: Row => row)
         .show()
  • DataFrame 中没有数据对象这个概念, 全部的数据都以行的形式存在于 Row 对象中, Row 中记录了每行数据的结构, 包括列名, 类型等

    20190627214134
  • Dataset 中上层能够提供有类型的 API, 用以操做数据, 可是在内部, 不管是什么类型的数据对象 Dataset 都使用一个叫作 InternalRow 的类型的对象存储数据

    val dataset: Dataset[Person] = spark.read.csv(...).as[Person]
优化点 1: 元信息独立
  1. RDD 不保存数据的元信息, 因此只能使用 Java Serializer 或者 Kyro Serializer 保存 整个对象

  2. DataFrame 和 Dataset 中保存了数据的元信息, 因此能够把元信息独立出来分开保存

    20190627233424
  3. 一个 DataFrame 或者一个 Dataset 中, 元信息只须要保存一份, 序列化的时候, 元信息不须要参与

    20190627233851
  4. 在反序列化 ( InternalRow → Object ) 时加入 Schema 信息便可

    20190627234337

元信息再也不参与序列化, 意味着数据存储量的减小, 和效率的增长

优化点 2: 使用堆外内存
  • DataFrame 和 Dataset 再也不序列化元信息, 因此内存使用大大减小. 同时新的序列化方式还将数据存入堆外内存中, 从而避免 GC 的开销.

  • 堆外内存又叫作 Unsafe, 之因此叫不安全的, 由于不能使用 Java 的垃圾回收机制, 须要本身负责对象的建立和回收, 性能很好, 可是不建议普通开发者使用, 毕竟不安全

总结
  1. 当须要将对象缓存下来的时候, 或者在网络中传输的时候, 要把对象转成二进制, 在使用的时候再将二进制转为对象, 这个过程叫作序列化和反序列化

  2. 在 Spark 中有不少场景须要存储对象, 或者在网络中传输对象

    1. Task 分发的时候, 须要将任务序列化, 分发到不一样的 Executor 中执行

    2. 缓存 RDD 的时候, 须要保存 RDD 中的数据

    3. 广播变量的时候, 须要将变量序列化, 在集群中广播

    4. RDD 的 Shuffle 过程当中 Map 和 Reducer 之间须要交换数据

    5. 算子中若是引入了外部的变量, 这个外部的变量也须要被序列化

  3. RDD 由于不保留数据的元信息, 因此必需要序列化整个对象, 常见的方式是 Java 的序列化器, 和 Kyro 序列化器

  4. Dataset 和 DataFrame 中保留数据的元信息, 因此能够再也不使用 Java 的序列化器和 Kyro 序列化器, 使用 Spark 特有的序列化协议, 生成 UnsafeInternalRow 用以保存数据, 这样不只能减小数据量, 也能减小序列化和反序列化的开销, 其速度大概能达到 RDD 的序列化的 20 倍左右

1.3. Spark Streaming 和 Structured Streaming

目标和过程
目标

理解 Spark Streaming 和 Structured Streaming 之间的区别, 是很是必要的, 从这点上能够理解 Structured Streaming 的过去和产生契机

过程
  1. Spark Streaming 时代

  2. Structured Streaming 时代

  3. Spark Streaming 和 Structured Streaming

Spark Streaming 时代
20190628010204
  • Spark Streaming 其实就是 RDD 的 API 的流式工具, 其本质仍是 RDD, 存储和执行过程依然相似 RDD

Structured Streaming 时代
20190628010542
  • Structured Streaming 其实就是 Dataset 的 API 的流式工具, API 和 Dataset 保持高度一致

Spark Streaming 和  Structured Streaming
  • Structured Streaming 相比于 Spark Streaming 的进步就相似于 Dataset 相比于 RDD 的进步

  • 另外还有一点, Structured Streaming 已经支持了连续流模型, 也就是相似于 Flink 那样的实时流, 而不是小批量, 但在使用的时候仍然有限制, 大部分状况仍是应该采用小批量模式

在 2.2.0 之后 Structured Streaming 被标注为稳定版本, 意味着之后的 Spark 流式开发不该该在采用 Spark Streaming 了

2. Structured Streaming 入门案例

目标

了解 Structured Streaming 的编程模型, 为理解 Structured Streaming 时候是什么, 以及核心体系原理打下基础

步骤
  1. 需求梳理

  2. Structured Streaming 代码实现

  3. 运行

  4. 验证结果

2.1. 需求梳理

目标和过程
目标

理解接下来要作的案例, 有的放矢

步骤
  1. 需求

  2. 总体结构

  3. 开发方式

需求
20190628144128
  • 编写一个流式计算的应用, 不断的接收外部系统的消息

  • 对消息中的单词进行词频统计

  • 统计全局的结果

总体结构
20190628131804
  1. Socket Server 等待 Structured Streaming 程序链接

  2. Structured Streaming 程序启动, 链接 Socket Server, 等待 Socket Server 发送数据

  3. Socket Server 发送数据, Structured Streaming 程序接收数据

  4. Structured Streaming 程序接收到数据后处理数据

  5. 数据处理后, 生成对应的结果集, 在控制台打印

开发方式和步骤

Socket server 使用 Netcat nc 来实现

Structured Streaming 程序使用 IDEA 实现, 在 IDEA 中本地运行

  1. 编写代码

  2. 启动 nc 发送 Socket 消息

  3. 运行代码接收 Socket 消息统计词频

总结
  • 简单来讲, 就是要进行流式的词频统计, 使用 Structured Streaming

2.2. 代码实现

目标和过程
目标

实现 Structured Streaming 部分的代码编写

步骤
  1. 建立文件

  2. 建立 SparkSession

  3. 读取 Socket 数据生成 DataFrame

  4. 将 DataFrame 转为 Dataset, 使用有类型的 API 处理词频统计

  5. 生成结果集, 并写入控制台

object SocketProcessor {

  def main(args: Array[String]): Unit = {

    // 1. 建立 SparkSession
    val spark = SparkSession.builder()
      .master("local[6]")
      .appName("socket_processor")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")    import spark.implicits._ // 2. 读取外部数据源, 并转为 Dataset[String] val source = spark.readStream .format("socket") .option("host", "127.0.0.1") .option("port", 9999) .load() .as[String]  // 3. 统计词频 val words = source.flatMap(_.split(" ")) .map((_, 1)) .groupByKey(_._1) .count() // 4. 输出结果 words.writeStream .outputMode(OutputMode.Complete())  .format("console")  .start()  .awaitTermination()  } }
  调整 Log 级别, 避免过多的 Log 影响视线
  默认 readStream 会返回 DataFrame, 可是词频统计更适合使用 Dataset 的有类型 API
  统计全局结果, 而不是一个批次
  将结果输出到控制台
  开始运行流式应用
  阻塞主线程, 在子线程中不断获取数据
总结
  • Structured Streaming 中的编程步骤依然是先读, 后处理, 最后落地

  • Structured Streaming 中的编程模型依然是 DataFrame 和 Dataset

  • Structured Streaming 中依然是有外部数据源读写框架的, 叫作 readStream 和 writeStream

  • Structured Streaming 和 SparkSQL 几乎没有区别, 惟一的区别是, readStream 读出来的是流, writeStream是将流输出, 而 SparkSQL 中的批处理使用 read 和 write

2.3. 运行和结果验证

目标和过程
目标

代码已经编写完毕, 须要运行, 并查看结果集, 由于从结果集的样式中能够看到 Structured Streaming 的一些原理

步骤
  1. 开启 Socket server

  2. 运行程序

  3. 查看数据集

开启  Socket server 和运行程序
  1. 在虚拟机 node01 中运行 nc -lk 9999

  2. 在 IDEA 中运行程序

  3. 在 node01 中输入如下内容

    hello world
    hello spark
    hello hadoop
    hello spark
    hello spark
查看结果集
-------------------------------------------
Batch: 4
-------------------------------------------
+------+--------+
| value|count(1)|
+------+--------+
| hello|       5|
| spark|       3|
| world|       1|
|hadoop|       1|
+------+--------+

从结果集中能够观察到如下内容

  • Structured Streaming 依然是小批量的流处理

  • Structured Streaming 的输出是相似 DataFrame 的, 也具备 Schema, 因此也是针对结构化数据进行优化的

  • 从输出的时间特色上来看, 是一个批次先开始, 而后收集数据, 再进行展现, 这一点和 Spark Streaming 不太同样

总结
  1. 运行的时候须要先开启 Socket server

  2. Structured Streaming 的 API 和运行也是针对结构化数据进行优化过的

3. Stuctured Streaming 的体系和结构

目标

了解 Structured Streaming 的体系结构和核心原理, 有两点好处, 一是须要了解原理才好进行性能调优, 二是了解原理后, 才能理解代码执行流程, 从而更好的记忆, 也作到知其然更知其因此然

步骤
  1. WordCount 的执行原理

  2. Structured Streaming 的体系结构

3.1. 无限扩展的表格

目标和过程
目标

Structured Streaming 是一个复杂的体系, 由不少组件组成, 这些组件之间也会进行交互, 若是没法站在总体视角去观察这些组件之间的关系, 也没法理解 Structured Streaming 的全局

步骤
  1. 了解 Dataset 这个计算模型和流式计算的关系

  2. 如何使用 Dataset 处理流式数据?

  3. WordCount 案例的执行过程和原理

Dataset 和流式计算

能够理解为 Spark 中的 Dataset 有两种, 一种是处理静态批量数据的 Dataset, 一种是处理动态实时流的 Dataset, 这两种 Dataset 之间的区别以下

  • 流式的 Dataset 使用 readStream 读取外部数据源建立, 使用 writeStream 写入外部存储

  • 批式的 Dataset 使用 read 读取外部数据源建立, 使用 write 写入外部存储

如何使用  Dataset 这个编程模型表示流式计算?
20190628191649
  • 能够把流式的数据想象成一个不断增加, 无限无界的表

  • 不管是否有界, 全都使用 Dataset 这一套 API

  • 经过这样的作法, 就能彻底保证流和批的处理使用彻底相同的代码, 减小这两种处理方式的差别

WordCount 的原理
20190628232818
  • 整个计算过程大体上分为以下三个部分

    1. Source, 读取数据源

    2. Query, 在流式数据上的查询

    3. Result, 结果集生成

  • 整个的过程以下

    1. 随着时间段的流动, 对外部数据进行批次的划分

    2. 在逻辑上, 将缓存全部的数据, 生成一张无限扩展的表, 在这张表上进行查询

    3. 根据要生成的结果类型, 来选择是否生成基于整个数据集的结果

总结
20190628235321
  • Dataset 不只能够表达流式数据的处理, 也能够表达批量数据的处理

  • Dataset 之因此能够表达流式数据的处理, 由于 Dataset 能够模拟一张无限扩展的表, 外部的数据会不断的流入到其中

3.2. 体系结构

目标和过程
目标

Structured Streaming 是一个复杂的体系, 由不少组件组成, 这些组件之间也会进行交互, 若是没法站在总体视角去观察这些组件之间的关系, 也没法理解 Structured Streaming 的核心原理

步骤
  1. 体系结构

  2. StreamExecution 的执行顺序

体系结构
  • 在 Structured Streaming 中负责总体流程和执行的驱动引擎叫作 StreamExecution

    20190629111018

    StreamExecution 在流上进行基于 Dataset 的查询, 也就是说, Dataset 之因此可以在流上进行查询, 是由于 StreamExecution 的调度和管理

  • StreamExecution 如何工做?

    20190629100439

    StreamExecution 分为三个重要的部分

    • Source, 从外部数据源读取数据

    • LogicalPlan, 逻辑计划, 在流上的查询计划

    • Sink, 对接外部系统, 写入结果

StreamExecution 的执行顺序
20190629113627
  1. 根据进度标记, 从 Source 获取到一个由 DataFrame 表示的批次, 这个 DataFrame 表示数据的源头

    val source = spark.readStream
      .format("socket")
      .option("host", "127.0.0.1")
      .option("port", 9999)
      .load()
      .as[String]

    这一点很是相似 val df = spark.read.csv() 所生成的 DataFrame, 一样都是表示源头

  2. 根据源头 DataFrame 生成逻辑计划

    val words = source.flatMap(_.split(" "))
      .map((_, 1))
      .groupByKey(_._1)
      .count()

    上述代码表示的就是数据的查询, 这一个步骤将这样的查询步骤生成为逻辑执行计划

  3. 优化逻辑计划最终生成物理计划

    67b14d92b21b191914800c384cbed439

    这一步其实就是使用 Catalyst 对执行计划进行优化, 经历基于规则的优化和基于成本模型的优化

  4. 执行物理计划将表示执行结果的 DataFrame / Dataset 交给 Sink

    整个物理执行计划会针对每个批次的数据进行处理, 处理后每个批次都会生成一个表示结果的 Dataset

    Sink 能够将每个批次的结果 Dataset 落地到外部数据源

  5. 执行完毕后, 汇报 Source 这个批次已经处理结束, Source 提交并记录最新的进度

增量查询
  • 核心问题

    20190628232818

    上图中清晰的展现了最终的结果生成是全局的结果, 而不是一个批次的结果, 可是从 StreamExecution 中能够看到, 针对流的处理是按照一个批次一个批次来处理的

    那么, 最终是如何生成全局的结果集呢?

  • 状态记录

    20190629115459

    在 Structured Streaming 中有一个全局范围的高可用 StateStore, 这个时候针对增量的查询变为以下步骤

    1. 从 StateStore 中取出上次执行完成后的状态

    2. 把上次执行的结果加入本批次, 再进行计算, 得出全局结果

    3. 将当前批次的结果放入 StateStore 中, 留待下次使用

    20190629123847
总结
  • StreamExecution 是整个 Structured Streaming 的核心, 负责在流上的查询

  • StreamExecution 中三个重要的组成部分, 分别是 Source 负责读取每一个批量的数据, Sink 负责将结果写入外部数据源, Logical Plan 负责针对每一个小批量生成执行计划

  • StreamExecution 中使用 StateStore 来进行状态的维护

4. Source

目标和过程
目标

流式计算通常就是经过数据源读取数据, 通过一系列处理再落地到某个地方, 因此这一小节先了解一下如何读取数据, 能够整合哪些数据源

过程
  1. 从 HDFS 中读取数据

  2. 从 Kafka 中读取数据

4.1. 从 HDFS 中读取数据

目标和过程
目标
  • 在数据处理的时候, 常常会遇到这样的场景

    20190630160310
  • 有时候也会遇到这样的场景

    20190630160448
  • 以上两种场景有两个共同的特色

    • 会产生大量小文件在 HDFS 上

    • 数据须要处理

  • 经过本章节的学习, 便可以更深入的理解这种结构, 具备使用 Structured Streaming 整合 HDFS, 从其中读取数据的能力

步骤
  1. 案例结构

  2. 产生小文件并推送到 HDFS

  3. 流式计算统计 HDFS 上的小文件

  4. 运行和总结

4.1.1. 案例结构

目标和步骤
目标

经过本章节能够了解案例的过程和步骤, 以及案例的核心意图

步骤
  1. 案例结构

  2. 实现步骤

  3. 难点和易错点

案例流程
20190715111534
  1. 编写 Python 小程序, 在某个目录生成大量小文件

    • Python 是解释型语言, 其程序能够直接使用命令运行无需编译, 因此适合编写快速使用的程序, 不少时候也使用 Python 代替 Shell

    • 使用 Python 程序建立新的文件, 而且固定的生成一段 JSON 文本写入文件

    • 在真实的环境中, 数据也是同样的不断产生而且被放入 HDFS 中, 可是在真实场景下, 多是 Flume 把小文件不断上传到 HDFS 中, 也多是 Sqoop 增量更新不断在某个目录中上传小文件

  2. 使用 Structured Streaming 汇总数据

    • HDFS 中的数据是不断的产生的, 因此也是流式的数据

    • 数据集是 JSON 格式, 要有解析 JSON 的能力

    • 由于数据是重复的, 要对全局的流数据进行汇总和去重, 其实真实场景下的数据清洗大部分状况下也是要去重的

  3. 使用控制台展现数据

    • 最终的数据结果以表的形式呈现

    • 使用控制台展现数据意味着不须要在修改展现数据的代码, 将 Sink 部分的内容放在下一个大章节去说明

    • 真实的工做中, 可能数据是要落地到 MySQLHBaseHDFS 这样的存储系统中

实现步骤
  • Step 1: 编写 Python 脚本不断的产生数据

    1. 使用 Python 建立字符串保存文件中要保存的数据

    2. 建立文件并写入文件内容

    3. 使用 Python 调用系统 HDFS 命令上传文件

  • Step 2: 编写 Structured Streaming 程序处理数据

    1. 建立 SparkSession

    2. 使用 SparkSession 的 readStream 读取数据源

    3. 使用 Dataset 操做数据, 只须要去重

    4. 使用 Dataset 的 writeStream 设置 Sink 将数据展现在控制台中

  • Step 3: 部署程序, 验证结果

    1. 上传脚本到服务器中, 使用 python 命令运行脚本

    2. 开启流计算应用, 读取 HDFS 中对应目录的数据

    3. 查看运行结果

难点和易错点
  1. 在读取 HDFS 的文件时, Source 不只对接数据源, 也负责反序列化数据源中传过来的数据

    • Source 能够从不一样的数据源中读取数据, 如 KafkaHDFS

    • 数据源可能会传过来不一样的数据格式, 如 JSONParquet

  2. 读取 HDFS 文件的这个 Source 叫作 FileStreamSource

    从命名就能够看出来这个 Source 不只支持 HDFS, 还支持本地文件读取, 亚马逊云, 阿里云 等文件系统的读取, 例如: file://s3://oss://

  3. 基于流的 Dataset 操做和基于静态数据集的 Dataset 操做是一致的

总结

整个案例运行的逻辑是

  1. Python 程序产生数据到 HDFS 中

  2. Structured Streaming 从 HDFS 中获取数据

  3. Structured Streaming 处理数据

  4. 将数据展现在控制台

整个案例的编写步骤

  1. Python 程序

  2. Structured Streaming 程序

  3. 运行

4.1.2. 产生小文件并推送到 HDFS

目标和步骤
目标

经过本章节看到 Python 的大体语法, 并了解 Python 如何编写脚本完成文件的操做, 其实不一样的语言使用起来并无那么难, 完成一些简单的任务仍是很简单的

步骤
  1. 建立 Python 代码文件

  2. 编写代码

  3. 本地测试, 可是由于本地环境搭建比较浪费你们时间, 因此暂时再也不本地测试

代码编写
  • 随便在任一目录中建立文件 gen_files.py, 编写如下内容

import os for index in range(100): content = """ {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} """ file_name = "/export/dataset/text{0}.json".format(index) with open(file_name, "w") as file:  file.write(content) os.system("/export/servers/hadoop/bin/hdfs dfs -mkdir -p /dataset/dataset/") os.system("/export/servers/hadoop/bin/hdfs dfs -put {0} /dataset/dataset/".format(file_name))
  建立文件, 使用这样的写法是由于 with 是一种 Python 的特殊语法, 若是使用 with 去建立文件的话, 使用结束后会自动关闭流
总结
  • Python 的语法灵活而干净, 比较易于编写

  • 对于其它的语言能够玩乐性质的去使用, 其实并无很难

4.1.3. 流式计算统计 HDFS 上的小文件

目标和步骤
目标

经过本章节的学习, 你们能够了解到如何使用 Structured Streaming 读取 HDFS 中的文件, 并以 JSON 的形式解析

步骤
  1. 建立文件

  2. 编写代码

代码
val spark = SparkSession.builder()
  .appName("hdfs_source")
  .master("local[6]")
  .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

val userSchema = new StructType()
  .add("name", "string")
  .add("age", "integer")

val source = spark
  .readStream
  .schema(userSchema)
  .json("hdfs://node01:8020/dataset/dataset")

val result = source.distinct()

result.writeStream
  .outputMode(OutputMode.Update())
  .format("console")
  .start()
  .awaitTermination()
总结
  • 以流的形式读取某个 HDFS 目录的代码为

    val source = spark
      .readStream          .schema(userSchema)  .json("hdfs://node01:8020/dataset/dataset") 
      指明读取的是一个流式的 Dataset
      指定读取到的数据的 Schema
      指定目录位置, 以及数据格式

4.1.4. 运行和流程总结

目标和步骤
目标

经过这个小节对案例的部署之后, 不只你们能够学到一种常见的部署方式, 同时也能对案例的执行流程和流计算有更深刻的了解

步骤
  1. 运行 Python 程序

  2. 运行 Spark 程序

  3. 总结

运行 Python 程序
  1. 上传 Python 源码文件到服务器中

  2. 运行 Python 脚本

    # 进入 Python 文件被上传的位置 cd ~  # 建立放置生成文件的目录 mkdir -p /export/dataset  # 运行程序 python gen_files.py
运行 Spark 程序
  1. 使用 Maven 打包

    20190716000942
  2. 上传至服务器

  3. 运行 Spark 程序

    # 进入保存 Jar 包的文件夹
    cd ~
    
    # 运行流程序
    spark-submit --class cn.itcast.structured.HDFSSource ./original-streaming-0.0.1.jar
总结
20190715111534
  1. Python 生成文件到 HDFS, 这一步在真实环境下, 多是由 Flume 和 Sqoop 收集并上传至 HDFS

  2. Structured Streaming 从 HDFS 中读取数据并处理

  3. Structured Streaming 讲结果表展现在控制台

4.2. 从 Kafka 中读取数据

目标和步骤
目标

经过本章节的学习, 即可以理解流式系统和队列间的关系, 同时可以编写代码从 Kafka 以流的方式读取数据

步骤
  1. Kafka 回顾

  2. Structured Streaming 整合 Kafka

  3. 读取 JSON 格式的内容

  4. 读取多个 Topic 的数据

4.2.1 Kafka 的场景和结构

目标和步骤
目标

经过这一个小节的学习, 你们能够理解 Kfaka 在整个系统中的做用, 往后工做的话, 也必需要先站在更高层去理解系统的组成, 才能完成功能和代码

步骤
  1. Kafka 的应用场景

  2. Kafka 的特色

  3. Topic 和 Partitions

Kafka 是一个 Pub / Sub 系统
  • Pub / Sub 是 Publisher / Subscriber 的简写, 中文称做为发布订阅系统

    20190717102628
  • 发布订阅系统能够有多个 Publisher 对应一个 Subscriber, 例如多个系统都会产生日志, 经过这样的方式, 一个日志处理器能够简单的获取全部系统产生的日志

    20190717103721
  • 发布订阅系统也能够一个 Publisher 对应多个 Subscriber, 这样就相似于广播了, 例如经过这样的方式能够很是轻易的将一个订单的请求分发给全部感兴趣的系统, 减小耦合性

    20190717104041
  • 固然, 在大数据系统中, 这样的消息系统每每能够做为整个数据平台的入口, 左边对接业务系统各个模块, 右边对接数据系统各个计算工具

    20190717104853
Kafka 的特色

Kafka 有一个很是重要的应用场景就是对接业务系统和数据系统, 做为一个数据管道, 其须要流通的数据量惊人, 因此 Kafka若是要知足这种场景的话, 就必定具备如下两个特色

  • 高吞吐量

  • 高可靠性

Topic 和 Partitions
  • 消息和事件常常是不一样类型的, 例如用户注册是一种消息, 订单建立也是一种消息

    20190717110142
  • Kafka 中使用 Topic 来组织不一样类型的消息

    20190717110431
  • Kafka 中的 Topic 要承受很是大的吞吐量, 因此 Topic 应该是能够分片的, 应该是分布式的

    20190717122114
总结
  • Kafka 的应用场景

    • 通常的系统中, 业务系统会不止一个, 数据系统也会比较复杂

    • 为了减小业务系统和数据系统之间的耦合, 要将其分开, 使用一个中间件来流转数据

    • Kafka 由于其吞吐量超高, 因此适用于这种场景

  • Kafka 如何保证高吞吐量

    • 由于消息会有不少种类, Kafka 中能够建立多个队列, 每个队列就是一个 Topic, 能够理解为是一个主题, 存放相关的消息

    • 由于 Topic 直接存放消息, 因此 Topic 必需要可以承受很是大的通量, 因此 Topic 是分布式的, 是能够分片的, 使用分布式的并行处理能力来解决高通量的问题

4.2.2. Kafka 和 Structured Streaming 整合的结构

目标和步骤
目标

经过本小节能够理解 Kafka 和 Structured Streaming 整合的结构原理, 同时还能理解 Spark 链接 Kafka 的时候一个很是重要的参数

步骤
  1. Topic 的 Offset

  2. Kafka 和 Structured Streaming 的整合结构

  3. Structured Streaming 读取 Kafka 消息的三种方式

Topic 的 Offset
  • Topic 是分区的, 每个 Topic 的分区分布在不一样的 Broker 上

    20190717161413
  • 每一个分区都对应一系列的 Log 文件, 消息存在于 Log 中, 消息的 ID 就是这条消息在本分区的 Offset 偏移量

    20190717162840
 

Offset 又称做为偏移量, 其实就是一个东西距离另一个东西的距离

20190717165649

Kafka 中使用 Offset 命名消息, 而不是指定 ID 的缘由是想表示永远自增, ID 是能够指定的, 可是 Offset 只能是一个距离值, 它只会愈来愈大, 因此, 叫作 Offset 而不叫 ID 也是这个考虑, 消息只能追加到 Log 末尾, 只能增加不能减小

Kafka 和 Structured Streaming 整合的结构
20190718022525
分析
  • Structured Streaming 中使用 Source 对接外部系统, 对接 Kafka 的 Source 叫作 KafkaSource

  • KafkaSource 中会使用 KafkaSourceRDD 来映射外部 Kafka 的 Topic, 二者的 Partition 一一对应

结论

Structured Streaming 会并行的从 Kafka 中获取数据

Structured Streaming 读取 Kafka 消息的三种方式
20190718023534
  • Earliest 从每一个 Kafka 分区最开始处开始获取

  • Assign 手动指定每一个 Kafka 分区中的 Offset

  • Latest 再也不处理以前的消息, 只获取流计算启动后新产生的数据

总结
  • Kafka 中的消息存放在某个 Topic 的某个 Partition 中, 消息是不可变的, 只会在消息过时的时候从最先的消息开始删除, 消息的 ID 也叫作 Offset, 而且只能正增加

  • Structured Streaming 整合 Kafka 的时候, 会并行的经过 Offset 从全部 Topic 的 Partition 中获取数据

  • Structured Streaming 在从 Kafka 读取数据的时候, 能够选择从最先的地方开始读取, 也能够选择从任意位置读取, 也能够选择只读取最新的

4.2.3. 需求介绍

目标和步骤
目标

经过本章节的学习, 能够掌握一个常见的需求, 而且了解后面案例的编写步骤

步骤
  1. 需求

  2. 数据

需求
  1. 模拟一个智能物联网系统的数据统计

    20190718151808
    • 有一个智能家居品牌叫作 Nest, 他们主要有两款产品, 一个是恒温器, 一个是摄像头

    • 恒温器的主要做用是经过感应器识别家里何时有人, 摄像头主要做用是经过学习算法来识别出如今摄像头中的人是不是家里人, 若是不是则报警

    • 因此这两个设备都须要统计一个指标, 就是家里何时有人, 此需求就是针对这个设备的一部分数据, 来统计家里何时有人

  2. 使用生产者在 Kafka 的 Topic : streaming-test 中输入 JSON 数据

    {
      "devices": { "cameras": { "device_id": "awJo6rH", "last_event": { "has_sound": true, "has_motion": true, "has_person": true, "start_time": "2016-12-29T00:00:00.000Z", "end_time": "2016-12-29T18:42:00.000Z" } } } }
  3. 使用 Structured Streaming 来过滤出来家里有人的数据

    把数据转换为 时间 → 是否有人 这样相似的形式

数据转换
  1. 追踪 JSON 数据的格式

    能够在一个在线的工具 https://jsonformatter.org/ 中格式化 JSON, 会发现 JSON 格式以下

    20190720000717
  2. 反序列化

    JSON 数据本质上就是字符串, 只不过这个字符串是有结构的, 虽然有结构, 可是很难直接从字符串中取出某个值

    而反序列化, 就是指把 JSON 数据转为对象, 或者转为 DataFrame, 能够直接使用某一个列或者某一个字段获取数据, 更加方便

    而想要作到这件事, 必需要先根据数据格式, 编写 Schema 对象, 从而经过一些方式转为 DataFrame

    val eventType = new StructType()
      .add("has_sound", BooleanType, nullable = true)
      .add("has_motion", BooleanType, nullable = true)
      .add("has_person", BooleanType, nullable = true)
      .add("start_time", DateType, nullable = true)
      .add("end_time", DateType, nullable = true)
    
    val camerasType = new StructType()
      .add("device_id", StringType, nullable = true)
      .add("last_event", eventType, nullable = true)
    
    val devicesType = new StructType()
      .add("cameras", camerasType, nullable = true)
    
    val schema = new StructType()
      .add("devices", devicesType, nullable = true)
总结
  1. 业务简单来讲, 就是收集智能家居设备的数据, 经过流计算的方式计算其特征规律

  2. Kafka 常见的业务场景就是对接业务系统和数据系统

    1. 业务系统常常会使用 JSON 做为数据传输格式

    2. 因此使用 Structured Streaming 来对接 Kafka 并反序列化 Kafka 中的 JSON 格式的消息, 是一个很是重要的技能

  3. 不管使用什么方式, 若是想反序列化 JSON 数据, 就必需要先追踪 JSON 数据的结构

4.2.4. 使用 Spark 流计算链接 Kafka 数据源

目标和步骤
目标

经过本章节的数据, 可以掌握如何使用 Structured Streaming 对接 Kafka, 从其中获取数据

步骤
  1. 建立 Topic 并输入数据到 Topic

  2. Spark 整合 kafka

  3. 读取到的 DataFrame 的数据结构

建立 Topic 并输入数据到 Topic
  1. 使用命令建立 Topic

    bin/kafka-topics.sh --create --topic shoppingStreaming --replication-factor 1 --partitions 3 --zookeeper node01:2181,node02:2181,node03:2181/kafka

  2. 开启 Producer

    bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic shoppingStreaming
    测试:开启consumer端: bin/kafka-console-consumer.sh --from-beginning --topicshoppingStreaming--zookeeper node01:2181,node02:2181,node03:2181/kafka
  3. 把 JSON 转为单行输入

    {"devices":{"cameras":{"device_id":"awJo6rH","last_event":{"has_sound":true,"has_motion":true,"has_person":true,"start_time":"2016-12-29T00:00:00.000Z","end_time":"2016-12-29T18:42:00.000Z"}}}}
使用 Spark 读取 Kafka 的 Topic
  1. 编写 Spark 代码读取 Kafka Topic

    val source = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node01:9092,node01:9092,node03:9092")
      .option("subscribe", "streaming_test")
      .option("startingOffsets", "earliest")
      .load()
    • 三个参数

      • kafka.bootstrap.servers : 指定 Kafka 的 Server 地址

      • subscribe : 要监听的 Topic, 能够传入多个, 传入多个 Topic 则监听多个 Topic, 也可使用 topic-* 这样的通配符写法

      • startingOffsets : 从什么位置开始获取数据, 可选值有 earliestassignlatest

    • format 设置为 Kafka 指定使用 KafkaSource 读取数据

  2. 思考: 从 Kafka 中应该获取到什么?

    • 业务系统有不少种类型, 有多是 Web 程序, 有多是物联网

      20190720132133

      前端大多数状况下使用 JSON 作数据交互

    • 问题1: 业务系统如何把数据给 Kafka ?

      20190720134513

      能够主动或者被动的把数据交给 Kafka, 可是不管使用什么方式, 都在使用 Kafka 的 Client 类库来完成这件事, Kafka 的类库调用方式以下

      Producer<String, String> producer = new KafkaProducer<String, String>(properties); producer.send(new ProducerRecord<String, String>("HelloWorld", msg));

      其中发给 Kafka 的消息是 KV 类型的

    • 问题2: 使用 Structured Streaming 访问 Kafka 获取数据的时候, 须要什么东西呢?

      • 需求1: 存储当前处理过的 Kafka 的 Offset

      • 需求2: 对接多个 Kafka Topic 的时候, 要知道这条数据属于哪一个 Topic

    • 结论

      • Kafka 中收到的消息是 KV 类型的, 有 Key, 有 Value

      • Structured Streaming 对接 Kafka 的时候, 每一条 Kafka 消息不能只是 KV, 必需要有 TopicPartition 之类的信息

  3. 从 Kafka 获取的 DataFrame 格式

    source.printSchema()

    结果以下

    root
     |-- key: binary (nullable = true)
     |-- value: binary (nullable = true)
     |-- topic: string (nullable = true)
     |-- partition: integer (nullable = true)
     |-- offset: long (nullable = true)
     |-- timestamp: timestamp (nullable = true)
     |-- timestampType: integer (nullable = true)

    从 Kafka 中读取到的并非直接是数据, 而是一个包含各类信息的表格, 其中每一个字段的含义以下

    Key 类型 解释

    key

    binary

    Kafka 消息的 Key

    value

    binary

    Kafka 消息的 Value

    topic

    string

    本条消息所在的 Topic, 由于整合的时候一个 Dataset 能够对接多个 Topic, 因此有这样一个信息

    partition

    integer

    消息的分区号

    offset

    long

    消息在其分区的偏移量

    timestamp

    timestamp

    消息进入 Kafka 的时间戳

    timestampType

    integer

    时间戳类型

总结
  1. 必定要把 JSON 转为一行, 再使用 Producer 发送, 否则会出现获取多行的状况

  2. 使用 Structured Streaming 链接 Kafka 的时候, 须要配置以下三个参数

    • kafka.bootstrap.servers : 指定 Kafka 的 Server 地址

    • subscribe : 要监听的 Topic, 能够传入多个, 传入多个 Topic 则监听多个 Topic, 也可使用 topic-* 这样的通配符写法

    • startingOffsets : 从什么位置开始获取数据, 可选值有 earliestassignlatest

  3. 从 Kafka 获取到的 DataFrame 的 Schema 以下

    root
     |-- key: binary (nullable = true)
     |-- value: binary (nullable = true)
     |-- topic: string (nullable = true)
     |-- partition: integer (nullable = true)
     |-- offset: long (nullable = true)
     |-- timestamp: timestamp (nullable = true)
     |-- timestampType: integer (nullable = true)

4.2.5. JSON 解析和数据统计

目标和步骤
目标

经过本章的学习, 便可以解析 Kafka 中的 JSON 数据, 这是一个重点中的重点

步骤
  1. JSON 解析

  2. 数据处理

  3. 运行测试

JSON 解析
  1. 准备好 JSON 所在的列

    问题

    由 Dataset 的结构能够知道 key 和 value 列的类型都是 binary 二进制, 因此要将其转为字符串, 才可进行 JSON 解析

    解决方式
    source.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
  2. 编写 Schema 对照 JSON 的格式

    • Key 要对应 JSON 中的 Key

    • Value 的类型也要对应 JSON 中的 Value 类型

    val eventType = new StructType()
      .add("has_sound", BooleanType, nullable = true)
      .add("has_motion", BooleanType, nullable = true)
      .add("has_person", BooleanType, nullable = true)
      .add("start_time", DateType, nullable = true)
      .add("end_time", DateType, nullable = true)
    
    val camerasType = new StructType()
      .add("device_id", StringType, nullable = true)
      .add("last_event", eventType, nullable = true)
    
    val devicesType = new StructType()
      .add("cameras", camerasType, nullable = true)
    
    val schema = new StructType()
      .add("devices", devicesType, nullable = true)
  3. 由于 JSON 中包含 Date 类型的数据, 因此要指定时间格式化方式

    val jsonOptions = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.sss'Z'")
  4. 使用 from_json 这个 UDF 格式化 JSON

    .select(from_json('value, schema, jsonOptions).alias("parsed_value"))
  5. 选择格式化事后的 JSON 中的字段

    由于 JSON 被格式化事后, 已经变为了 StructType, 因此能够直接获取其中某些字段的值

    .selectExpr("parsed_value.devices.cameras.last_event.has_person as has_person",
              "parsed_value.devices.cameras.last_event.start_time as start_time")
数据处理
  1. 统计各个时段有人的数据

    .filter('has_person === true)
    .groupBy('has_person, 'start_time)
    .count()
  2. 将数据落地到控制台

    result.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .start()
      .awaitTermination()
所有代码
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[6]")
  .appName("kafka integration")
  .getOrCreate()

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._

val source = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  .option("subscribe", "streaming-test")
  .option("startingOffsets", "earliest")
  .load()

val eventType = new StructType()
  .add("has_sound", BooleanType, nullable = true)
  .add("has_motion", BooleanType, nullable = true)
  .add("has_person", BooleanType, nullable = true)
  .add("start_time", DateType, nullable = true)
  .add("end_time", DateType, nullable = true)

val camerasType = new StructType()
  .add("device_id", StringType, nullable = true)
  .add("last_event", eventType, nullable = true)

val devicesType = new StructType()
  .add("cameras", camerasType, nullable = true)

val schema = new StructType()
  .add("devices", devicesType, nullable = true)

val jsonOptions = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.sss'Z'")

import org.apache.spark.sql.functions._
import spark.implicits._

val result = source.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
    .select(from_json('value, schema, jsonOptions).alias("parsed_value"))
    .selectExpr("parsed_value.devices.cameras.last_event.has_person as has_person",
      "parsed_value.devices.cameras.last_event.start_time as start_time")
    .filter('has_person === true)
    .groupBy('has_person, 'start_time)
    .count()

result.writeStream
  .outputMode(OutputMode.Complete())
  .format("console")
  .start()
  .awaitTermination()
运行测试
  1. 进入服务器中, 启动 Kafka

  2. 启动 Kafka 的 Producer

    bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic streaming-test
  3. 启动 Spark shell 并拷贝代码进行测试

    ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
    • 由于须要和 Kafka 整合, 因此在启动的时候须要加载和 Kafka 整合的包 spark-sql-kafka-0-10

5. Sink

目标和步骤
目标
  • 可以串联两端, 理解整个流式应用, 以及其中的一些根本的原理, 好比说容错语义

  • 可以知道如何对接外部系统, 写入数据

步骤
  1. HDFS Sink

  2. Kafka Sink

  3. Foreach Sink

  4. 自定义 Sink

  5. Tiggers

  6. Sink 原理

  7. 错误恢复和容错语义

5.1. HDFS Sink

目标和步骤
目标

可以使用 Spark 将流式数据的处理结果放入 HDFS

步骤
  1. 场景和需求

  2. 代码实现

场景和需求
场景
  • Kafka 每每做为数据系统和业务系统之间的桥梁

  • 数据系统通常由批量处理和流式处理两个部分组成

  • 在 Kafka 做为整个数据平台入口的场景下, 须要使用 StructuredStreaming 接收 Kafka 的数据并放置于 HDFS 上, 后续才能够进行批量处理

20190808023517
案例需求
  • 从 Kafka 接收数据, 从给定的数据集中, 裁剪部分列, 落地于 HDFS

代码实现
步骤说明
  1. 从 Kafka 读取数据, 生成源数据集

    1. 链接 Kafka 生成 DataFrame

    2. 从 DataFrame 中取出表示 Kafka 消息内容的 value 列并转为 String 类型

  2. 对源数据集选择列

    1. 解析 CSV 格式的数据

    2. 生成正确类型的结果集

  3. 落地 HDFS

总体代码
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[6]")
  .appName("kafka integration")
  .getOrCreate()

import spark.implicits._

val source = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  .option("subscribe", "streaming-bank")
  .option("startingOffsets", "earliest")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .as[String]

val result = source.map {
  item =>
    val arr = item.replace("\"", "").split(";")
    (arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")

result.writeStream
  .format("parquet") // 也能够是 "orc", "json", "csv" 等
  .option("path", "/dataset/streaming/result/")
  .start()

5.2. Kafka Sink

目标和步骤
目标

掌握何时要将流式数据落地至 Kafka, 以及如何落地至 Kafka

步骤
  1. 场景

  2. 代码

场景
场景
  • 有不少时候, ETL 事后的数据, 须要再次放入 Kafka

  • 在 Kafka 后, 可能会有流式程序统一将数据落地到 HDFS 或者 HBase

20190809014210
案例需求
  • 从 Kafka 中获取数据, 简单处理, 再次放入 Kafka

代码
步骤
  1. 从 Kafka 读取数据, 生成源数据集

    1. 链接 Kafka 生成 DataFrame

    2. 从 DataFrame 中取出表示 Kafka 消息内容的 value 列并转为 String 类型

  2. 对源数据集选择列

    1. 解析 CSV 格式的数据

    2. 生成正确类型的结果集

  3. 再次落地 Kafka

代码
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[6]")
  .appName("kafka integration")
  .getOrCreate()

import spark.implicits._

val source = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  .option("subscribe", "streaming-bank")
  .option("startingOffsets", "earliest")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .as[String]

val result = source.map {
  item =>
    val arr = item.replace("\"", "").split(";")
    (arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")

result.writeStream
  .format("kafka")
  .outputMode(OutputMode.Append())
  .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  .option("topic", "streaming-bank-result")
  .start()
  .awaitTermination()

5.3. Foreach Writer

目标和步骤
目标

掌握 Foreach 模式理解如何扩展 Structured Streaming 的 Sink, 同时可以将数据落地到 MySQL

步骤
  1. 需求

  2. 代码

需求
  • 场景

    • 大数据有一个常见的应用场景

      1. 收集业务系统数据

      2. 数据处理

      3. 放入 OLTP 数据

      4. 外部经过 ECharts 获取并处理数据

    • 这个场景下, StructuredStreaming 就须要处理数据并放入 MySQL 或者 MongoDBHBase 中以供 Web 程序能够获取数据, 图表的形式展现在前端

    20190809115742
  • Foreach 模式::

    • 原由

      • 在 Structured Streaming 中, 并未提供完整的 MySQL/JDBC 整合工具

      • 不止 MySQL 和 JDBC, 可能会有其它的目标端须要写入

      • 不少时候 Structured Streaming 须要对接一些第三方的系统, 例如阿里云的云存储, 亚马逊云的云存储等, 可是 Spark 没法对全部第三方都提供支持, 有时候须要本身编写

    • 解决方案

      20190809122425
      • 既然没法知足全部的整合需求, StructuredStreaming 提供了 Foreach, 能够拿到每个批次的数据

      • 经过 Foreach 拿到数据后, 能够经过自定义写入方式, 从而将数据落地到其它的系统

  • 案例需求::

    20190809122804
    • 从 Kafka 中获取数据, 处理后放入 MySQL

代码
步骤
  1. 建立 DataFrame 表示 Kafka 数据源

  2. 在源 DataFrame 中选择三列数据

  3. 建立 ForeachWriter 接收每个批次的数据落地 MySQL

  4. Foreach 落地数据

代码
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[6]")
  .appName("kafka integration")
  .getOrCreate()

import spark.implicits._

val source = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  .option("subscribe", "streaming-bank")
  .option("startingOffsets", "earliest")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .as[String]

val result = source.map {
  item =>
    val arr = item.replace("\"", "").split(";")
    (arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")

class MySQLWriter extends ForeachWriter[Row] {
  val driver = "com.mysql.jdbc.Driver"
  var statement: Statement = _
  var connection: Connection  = _
  val url: String = "jdbc:mysql://node01:3306/streaming-bank-result"
  val user: String = "root"
  val pwd: String = "root"

  override def open(partitionId: Long, version: Long): Boolean = {
    Class.forName(driver)
    connection = DriverManager.getConnection(url, user, pwd)
    this.statement = connection.createStatement
    true
  }

  override def process(value: Row): Unit = {
    statement.executeUpdate(s"insert into bank values(" +
      s"${value.getAs[Int]("age")}, " +
      s"${value.getAs[Int]("job")}, " +
      s"${value.getAs[Int]("balance")} )")
  }

  override def close(errorOrNull: Throwable): Unit = {
    connection.close()
  }
}

result.writeStream
  .foreach(new MySQLWriter)
  .start()
  .awaitTermination()

5.4. 自定义 Sink

目标和步骤
目标
  • Foreach 倾向于一次处理一条数据, 若是想拿到 DataFrame 幂等的插入外部数据源, 则须要自定义 Sink

  • 了解如何自定义 Sink

步骤
  1. Spark 加载 Sink 流程分析

  2. 自定义 Sink

Spark 加载 Sink 流程分析
  • Sink 加载流程

    1. writeStream 方法中会建立一个 DataStreamWriter 对象

      def writeStream: DataStreamWriter[T] = {
        if (!isStreaming) {
          logicalPlan.failAnalysis(
            "'writeStream' can be called only on streaming Dataset/DataFrame")
        }
        new DataStreamWriter[T](this)
      }
    2. 在 DataStreamWriter 对象上经过 format 方法指定 Sink 的短名并记录下来

      def format(source: String): DataStreamWriter[T] = {
        this.source = source
        this
      }
    3. 最终会经过 DataStreamWriter 对象上的 start 方法启动执行, 其中会经过短名建立 DataSource

      val dataSource =
          DataSource(
            df.sparkSession,
            className = source,  options = extraOptions.toMap, partitionColumns = normalizedParCols.getOrElse(Nil))
        传入的 Sink 短名
    4. 在建立 DataSource 的时候, 会经过一个复杂的流程建立出对应的 Source 和 Sink

      lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
    5. 在这个复杂的建立流程中, 有一行最关键的代码, 就是经过 Java 的类加载器加载全部的 DataSourceRegister

      val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
    6. 在 DataSourceRegister 中会建立对应的 Source 或者 Sink

      trait DataSourceRegister {
      
        def shortName(): String       } trait StreamSourceProvider { def createSource(  sqlContext: SQLContext, metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source } trait StreamSinkProvider { def createSink(  sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink }
        提供短名
        建立 Source
        建立 Sink
  • 自定义 Sink 的方式

    • 根据前面的流程说明, 有两点很是重要

      • Spark 会自动加载全部 DataSourceRegister 的子类, 因此须要经过 DataSourceRegister 加载 Source 和 Sink

      • Spark 提供了 StreamSinkProvider 用以建立 Sink, 提供必要的依赖

    • 因此若是要建立自定义的 Sink, 须要作两件事

      1. 建立一个注册器, 继承 DataSourceRegister 提供注册功能, 继承 StreamSinkProvider 获取建立 Sink的必备依赖

      2. 建立一个 Sink 子类

自定义 Sink
步骤
  1. 读取 Kafka 数据

  2. 简单处理数据

  3. 建立 Sink

  4. 建立 Sink 注册器

  5. 使用自定义 Sink

代码
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[6]")
  .appName("kafka integration")
  .getOrCreate()

import spark.implicits._

val source = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  .option("subscribe", "streaming-bank")
  .option("startingOffsets", "earliest")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .as[String]

val result = source.map {
  item =>
    val arr = item.replace("\"", "").split(";")
    (arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
  .as[(Int, Int, Int)]
  .toDF("age", "job", "balance")

class MySQLSink(options: Map[String, String], outputMode: OutputMode) extends Sink {

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    val userName = options.get("userName").orNull
    val password = options.get("password").orNull
    val table = options.get("table").orNull
    val jdbcUrl = options.get("jdbcUrl").orNull

    val properties = new Properties
    properties.setProperty("user", userName)
    properties.setProperty("password", password)

    data.write.mode(outputMode.toString).jdbc(jdbcUrl, table, properties)
  }
}

class MySQLStreamSinkProvider extends StreamSinkProvider with DataSourceRegister {

  override def createSink(sqlContext: SQLContext,
                          parameters: Map[String, String],
                          partitionColumns: Seq[String],
                          outputMode: OutputMode): Sink = {
    new MySQLSink(parameters, outputMode)
  }

  override def shortName(): String = "mysql"
}

result.writeStream
  .format("mysql")
  .option("username", "root")
  .option("password", "root")
  .option("table", "streaming-bank-result")
  .option("jdbcUrl", "jdbc:mysql://node01:3306/test")
  .start()
  .awaitTermination()

5.5. Tigger

目标和步骤
目标

掌握如何控制 StructuredStreaming 的处理时间

步骤
  1. 微批次处理

  2. 连续流处理

微批次处理
  • 什么是微批次

    20190628144128
    • 并非真正的流, 而是缓存一个批次周期的数据, 后处理这一批次的数据

  • 通用流程

    步骤
    1. 根据 Spark 提供的调试用的数据源 Rate 建立流式 DataFrame

      • Rate 数据源会按期提供一个由两列 timestamp, value 组成的数据, value 是一个随机数

    2. 处理和聚合数据, 计算每一个个位数和十位数各有多少条数据

      • 对 value 求 log10 便可得出其位数

      • 后按照位数进行分组, 最终就能够看到每一个位数的数据有多少个

    代码
    val spark = SparkSession.builder()
      .master("local[6]")
      .appName("socket_processor")
      .getOrCreate()
    
    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    spark.sparkContext.setLogLevel("ERROR")
    
    val source = spark.readStream
      .format("rate")
      .load()
    
    val result = source.select(log10('value) cast IntegerType as 'key, 'value)
        .groupBy('key)
        .agg(count('key) as 'count)
        .select('key, 'count)
        .where('key.isNotNull)
        .sort('key.asc)
  • 默认方式划分批次

    介绍

    默认状况下的 Structured Streaming 程序会运行在微批次的模式下, 当一个批次结束后, 下一个批次会当即开始处理

    步骤
    1. 指定落地到 Console 中, 不指定 Trigger

    代码
    result.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .start()
      .awaitTermination()
  • 按照固定时间间隔划分批次

    介绍

    使用微批次处理数据, 使用用户指定的时间间隔启动批次, 若是间隔指定为 0, 则尽量快的去处理, 一个批次紧接着一个批次

    • 若是前一批数据提早完成, 待到批次间隔达成的时候再启动下一个批次

    • 若是前一批数据延后完成, 下一个批次会在前面批次结束后当即启动

    • 若是没有数据可用, 则不启动处理

    步骤
    1. 经过 Trigger.ProcessingTime() 指定处理间隔

    代码
    result.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .start()
      .awaitTermination()
  • 一次性划分批次

    介绍

    只划分一个批次, 处理完成之后就中止 Spark 工做, 当须要启动一下 Spark 处理遗留任务的时候, 处理完就关闭集群的状况下, 这个划分方式很是实用

    步骤
    1. 使用 Trigger.Once 一次性划分批次

    代码
    result.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .trigger(Trigger.Once())
      .start()
      .awaitTermination()
连续流处理
  • 介绍

    • 微批次会将收到的数据按照批次划分为不一样的 DataFrame, 后执行 DataFrame, 因此其数据的处理延迟取决于每一个 DataFrame 的处理速度, 最快也只能在一个 DataFrame 结束后马上执行下一个, 最快能够达到 100ms 左右的端到端延迟

    • 而连续流处理能够作到大约 1ms 的端到端数据处理延迟

    • 连续流处理能够达到 at-least-once 的容错语义

    • 从 Spark 2.3 版本开始支持连续流处理, 咱们所采用的 2.2 版本尚未这个特性, 而且这个特性截止到 2.4 依然是实验性质, 不建议在生产环境中使用

  • 操做

    步骤
    1. 使用特殊的 Trigger 完成功能

    代码
    result.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .trigger(Trigger.Continuous("1 second"))
      .start()
      .awaitTermination()
  • 限制

    • 只支持 Map 类的有类型操做

    • 只支持普通的的 SQL 类操做, 不支持聚合

    • Source 只支持 Kafka

    • Sink 只支持 KafkaConsoleMemory

5.6. 从 Source 到 Sink 的流程

目标和步骤
目标

理解 Source 到 Sink 的总体原理

步骤
  1. 从 Source 到 Sink 的流程

从 Source 到 Sink 的流程
20190809184239
  1. 在每一个 StreamExecution 的批次最开始, StreamExecution 会向 Source 询问当前 Source 的最新进度, 即最新的 offset

  2. StreamExecution 将 Offset 放到 WAL 里

  3. StreamExecution 从 Source 获取 start offsetend offset 区间内的数据

  4. StreamExecution 触发计算逻辑 logicalPlan 的优化与编译

  5. 计算结果写出给 Sink

    • 调用 Sink.addBatch(batchId: Long, data: DataFrame) 完成

    • 此时才会由 Sink 的写入操做开始触发实际的数据获取和计算过程

  6. 在数据完整写出到 Sink 后, StreamExecution 通知 Source 批次 id 写入到 batchCommitLog, 当前批次结束

5.7. 错误恢复和容错语义

目标和步骤
目标

理解 Structured Streaming 中提供的系统级别容错手段

步骤
  1. 端到端

  2. 三种容错语义

  3. Sink 的容错

端到端
20190809190803
  • Source 多是 KafkaHDFS

  • Sink 也多是 KafkaHDFSMySQL 等存储服务

  • 消息从 Source 取出, 通过 Structured Streaming 处理, 最后落地到 Sink 的过程, 叫作端到端

三种容错语义
  • at-most-once

    20190809192258
    • 在数据从 Source 到 Sink 的过程当中, 出错了, Sink 可能没收到数据, 可是不会收到两次, 叫作 at-most-once

    • 通常错误恢复的时候, 不重复计算, 则是 at-most-once

  • at-least-once

    20190809192258
    • 在数据从 Source 到 Sink 的过程当中, 出错了, Sink 必定会收到数据, 可是可能收到两次, 叫作 at-least-once

    • 通常错误恢复的时候, 重复计算可能完成也可能未完成的计算, 则是 at-least-once

  • exactly-once

    20190809192258
    • 在数据从 Source 到 Sink 的过程当中, 虽然出错了, Sink 必定刚好收到应该收到的数据, 一条不重复也一条都很多, 便是 exactly-once

    • 想作到 exactly-once 是很是困难的

Sink 的容错
20190809192644
  • 故障恢复通常分为 Driver 的容错和 Task 的容错

    • Driver 的容错指的是整个系统都挂掉了

    • Task 的容错指的是一个任务没运行明白, 从新运行一次

  • 由于 Spark 的 Executor 可以很是好的处理 Task 的容错, 因此咱们主要讨论 Driver 的容错, 若是出错的时候

    • 读取 WAL offsetlog 恢复出最新的 offsets

      当 StreamExecution 找到 Source 获取数据的时候, 会将数据的起始放在 WAL offsetlog 中, 当出错要恢复的时候, 就能够从中获取当前处理批次的数据起始, 例如 Kafka 的 Offset

    • 读取 batchCommitLog 决定是否须要重作最近一个批次

      当 Sink 处理完批次的数据写入时, 会将当前的批次 ID 存入 batchCommitLog, 当出错的时候就能够从中取出进行到哪个批次了, 和 WAL 对比便可得知当前批次是否处理完

    • 若是有必要的话, 当前批次数据重作

      • 若是上次执行在 (5) 结束前即失效, 那么本次执行里 Sink 应该完整写出计算结果

      • 若是上次执行在 (5) 结束后才失效, 那么本次执行里 Sink 能够从新写出计算结果 (覆盖上次结果), 也能够跳过写出计算结果(由于上次执行已经完整写出过计算结果了)

    • 这样便可保证每次执行的计算结果, 在 Sink 这个层面, 是 不重不丢 的, 即便中间发生过失效和恢复, 因此 Structured Streaming 能够作到 exactly-once

容错所须要的存储
  • 存储

    • offsetlog 和 batchCommitLog 关乎于错误恢复

    • offsetlog 和 batchCommitLog 须要存储在可靠的空间里

    • offsetlog 和 batchCommitLog 存储在 Checkpoint 中

    • WAL 其实也存在于 Checkpoint 中

  • 指定 Checkpoint

    • 只有指定了 Checkpoint 路径的时候, 对应的容错功能才能够开启

    aggDF
      .writeStream
      .outputMode("complete")
      .option("checkpointLocation", "path/to/HDFS/dir")  .format("memory") .start()
      指定 Checkpoint 的路径, 这个路径对应的目录必须是 HDFS 兼容的文件系统
须要的外部支持

若是要作到 exactly-once, 只是 Structured Streaming 能作到还不行, 还须要 Source 和 Sink 系统的支持

  • Source 须要支持数据重放

    当有必要的时候, Structured Streaming 须要根据 start 和 end offset 从 Source 系统中再次获取数据, 这叫作重放

  • Sink 须要支持幂等写入

    若是须要重作整个批次的时候, Sink 要支持给定的 ID 写入数据, 这叫幂等写入, 一个 ID 对应一条数据进行写入, 若是前面已经写入, 则替换或者丢弃, 不能重复

因此 Structured Streaming 想要作到 exactly-once, 则也须要外部系统的支持, 以下

Source

Sources

是否可重放

原生内置支持

注解

HDFS

能够

已支持

包括但不限于 TextJSONCSVParquetORC

Kafka

能够

已支持

Kafka 0.10.0+

RateStream

能够

已支持

以必定速率产生数据

RDBMS

能够

待支持

预计后续很快会支持

Socket

不能够

已支持

主要用途是在技术会议和讲座上作 Demo

Sink

Sinks

是否幂等写入

原生内置支持

注解

HDFS

能够

支持

包括但不限于 TextJSONCSVParquetORC

ForeachSink

能够

支持

可定制度很是高的 Sink, 是否能够幂等取决于具体的实现

RDBMS

能够

待支持

预计后续很快会支持

Kafka

不能够

支持

Kafka 目前不支持幂等写入, 因此可能会有重复写入

6. 有状态算子

目标和步骤
目标

了解常见的 Structured Streaming 算子, 可以完成常见的流式计算需求

步骤
  1. 常规算子

  2. 分组算子

  3. 输出模式

状态
  • 无状态算子

    20190814171907
    • 无状态

  • 有状态算子

    20190814194604
    • 有中间状态须要保存

    • 增量查询

总结
 

6.1. 常规算子

目标和步骤
目标

了解 Structured Streaming 的常规数据处理方式

步骤
  1. 案例

案例
  • 需求

    • 给定电影评分数据集 ratings.dat, 位置在 Spark/Files/Dataset/Ratings/ratings.dat

    • 筛选评分超过三分的电影

    • 以追加模式展现数据, 以流的方式来一批数据处理一批数据, 最终每一批次展现为以下效果

    +------+-------+
    |Rating|MovieID|
    +------+-------+
    |     5|   1193|
    |     4|   3408|
    +------+-------+
  • 步骤

    1. 建立 SparkSession

    2. 读取并处理数据结构

    3. 处理数据

      1. 选择要展现的列

      2. 筛选超过三分的数据

    4. 追加模式展现数据到控制台

  • 代码

    • 读取文件的时候只能读取一个文件夹, 由于是流的操做, 流的场景是源源不断有新的文件读取

    val source = spark.readStream
      .textFile("dataset/ratings")
      .map(line => {
        val columns = line.split("::")
        (columns(0).toInt, columns(1).toInt, columns(2).toInt, columns(3).toLong)
      })
      .toDF("UserID", "MovieID", "Rating", "Timestamp")
    
    val result = source.select('Rating, 'MovieID)
        .where('Rating > 3)
总结
  • 针对静态数据集的不少转换算子, 均可以应用在流式的 Dataset 上, 例如 MapFlatMapWhereSelect 等

6.2. 分组算子

目标和步骤
目标

可以使用分组完成常见需求, 并了解如何扩展行

步骤
  1. 案例

案例
  • 需求

    • 给定电影数据集 movies.dat, 其中三列 MovieIDTitleGenres

    • 统计每一个分类下的电影数量

  • 步骤

    1. 建立 SparkSession

    2. 读取数据集, 并组织结构

      注意 Genres 是 genres1|genres2 形式, 须要分解为数组

    3. 使用 explode 函数将数组形式的分类变为单值多条形式

    4. 分组聚合 Genres

    5. 输出结果

  • 代码

    val source = spark.readStream
      .textFile("dataset/movies")
      .map(line => {
        val columns = line.split("::")
        (columns(0).toInt, columns(1).toString, columns(2).toString.split("\\|"))
      })
      .toDF("MovieID", "Title", "Genres")
    
    val result = source.select(explode('Genres) as 'Genres)
        .groupBy('Genres)
        .agg(count('Genres) as 'Count)
    
    result.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .queryName("genres_count")
      .start()
      .awaitTermination()
总结
  • Structured Streaming 不只支持 groupBy, 还支持 groupByKey

相关文章
相关标签/搜索