Android RxJava:一文带你全面了解 背压策略

前言

  • Rxjava,因为其基于事件流的链式调用、逻辑简洁 & 使用简单的特色,深受各大 Android开发者的欢迎。

Github截图

 

  • 本文主要讲解的是RxJava中的 背压控制策略,但愿大家会喜欢。
  1.  
  2.  

示意图

 

目录

示意图

1. 引言

1.1 背景

  • 观察者 & 被观察者 之间存在2种订阅关系:同步 & 异步。具体以下:

示意图

  • 对于异步订阅关系,存在 被观察者发送事件速度 与观察者接收事件速度 不匹配的状况 
    1.  
    2.  

 

1.2 问题

  • 被观察者 发送事件速度太快,而观察者 来不及接收全部事件,从而致使观察者没法及时响应 / 处理全部发送过来事件的问题,最终致使缓存区溢出、事件丢失 & OOM 
    1.  
    2.  


下面再举个例子: java

  • 被观察者的发送事件速度 = 10ms / 个 
  • 观察者的接收事件速度 = 5s / 个 

即出现发送 & 接收事件严重不匹配的问题git

 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 结果 
    因为被观察者发送事件速度 > 观察者接收事件速度,因此出现流速不匹配问题,从而致使OOM 
    示意图

1.3 解决方案

采用 背压策略。github

下面,我将开始介绍背压策略。缓存

2. 背压策略简介

2.1 定义

一种 控制事件流速 的策略网络

2.2 做用

在 异步订阅关系 中,控制事件发送 & 接收的速度异步

 

2.3 解决的问题

解决了 因被观察者发送事件速度 与 观察者接收事件速度 不匹配(通常是前者 快于 后者),从而致使观察者没法及时响应 / 处理全部 被观察者发送事件 的问题测试

2.4 应用场景

  • 被观察者发送事件速度 与 观察者接收事件速度 不匹配的场景
  • 具体场景就取决于 该事件的类型,如:网络请求,那么具体场景:有不少网络请求须要执行,但执行者的执行速度没那么快,此时就须要使用背压策略来进行控制。

3. 背压策略的原理

  • 那么,RxJava实现背压策略(Backpressure)的原理是什么呢?
  • 解决方案 & 思想主要以下:

示意图

  • 示意图以下

示意图

  • 与 RxJava1.0 中被观察者的旧实现 Observable 对比

示意图

  • 好了,那么上图中在RxJava 2.0观察者模型中,Flowable究竟是什么呢?它实际上是RxJava 2.0中被观察者的一种新实现,同时也是背压策略实现的承载者
  • 请继续看下一节的介绍:背压策略的具体实现 - Flowable

4. 背压策略的具体实现:Flowable

在 RxJava2.0中,采用 Flowable 实现 背压策略spa

 

4.1 Flowable 介绍

  • 定义:在 RxJava2.0中,被观察者(Observable)的一种新实现 
     
  • 做用:实现 非阻塞式背压 策略

4.2 Flowable 特色

  • Flowable的特色 具体以下

示意图

  • 下面再贴出一张RxJava2.0 与RxJava1.0的观察者模型的对比图 

     


示意图

4.3 与 RxJava1.0 中被观察者的旧实现 Observable 的关系

  • 具体以下图

示意图

  • 那么,为何要采用新实现Flowable实现背压,而不采用旧的Observable呢?
  • 主要缘由:旧实现Observable没法很好解决背压问题。

示意图

4.4 Flowable的基础使用

  • Flowable的基础使用很是相似于 Observable
  • 具体以下
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

示意图

  • 更加优雅的链式调用
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 至此,Flowable的基础使用讲解完
  • 关于更深层次的使用会结合 背压策略的实现 来说解

5. 背压策略的使用

  • 在本节中,我将结合 背压策略的原理 & Flowable的使用,为你们介绍在RxJava 2.0 中该如何使用Flowable来实现背压策略功能,即背压策略的使用
  • FlowableObservable在功能上的区别主要是 多了背压的功能
  • 下面,我将顺着第3节中讲解背压策略实现原理 & 解决方案(以下图),来说解Flowable在背压策略功能上的使用

示意图


 

5.1 控制 观察者接收事件 的速度

5.1.1 异步订阅状况.net

  • 简介

示意图

  • 具体原理图

示意图

  • 具体使用
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 效果图

示意图

  • 有2个结论是须要你们注意的

示意图

下图 = 当缓存区存满时(128个事件)溢出报错的原理图线程

示意图

  • 代码演示1:观察者不接收事件的状况下,被观察者继续发送事件 & 存放到缓存区;再按需取出
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

示意图

  • 代码演示2:观察者不接收事件的状况下,被观察者继续发送事件至超出缓存区大小(128)
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

示意图

5.1.2 同步订阅状况

同步订阅 & 异步订阅 的区别在于: 
- 同步订阅中,被观察者 & 观察者工做于同1线程 
- 同步订阅关系中没有缓存区

示意图

  • 被观察者在发送1个事件后,必须等待观察者接收后,才能继续发下1个事件
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

示意图

因此,实际上并不会出现被观察者发送事件速度 > 观察者接收事件速度的状况。但是,却会出现被观察者发送事件数量 > 观察者接收事件数量的问题。

  • 如:观察者只能接受3个事件,但被观察者却发送了4个事件,因此出现了不匹配状况
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

示意图

因此,对于没有缓存区概念的同步订阅关系来讲,单纯采用控制观察者的接收事件数量(响应式拉取)实际上就等于 “单相思”,虽然观察者控制了要接收3个事件,但假设被观察者须要发送4个事件,仍是会出现问题。

 

  • 有1个特殊状况须要注意

示意图

  • 代码演示
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

在被观察者发送第1个事件后, 就抛出MissingBackpressureException异常 & 观察者没有收到任何事件

示意图

5.2 控制 被观察者发送事件 的速度

  • 简介

示意图

  • FlowableEmitter类的requested()介绍
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 每一个线程中的requested()的返回值 = 该线程中的request(a)的a值

  • 对应于同步 & 异步订阅状况 的原理图

示意图

为了方便你们理解该策略中的requested()使用,该节会先讲解同步订阅状况,再讲解异步订阅状况

5.2.1 同步订阅状况

  • 原理说明

示意图

即在同步订阅状况中,被观察者 经过 FlowableEmitter.requested()得到了观察者自身接收事件能力,从而根据该信息控制事件发送速度,从而达到了观察者反向控制被观察者的效果

  • 具体使用 
    下面的例子 = 被观察者根据观察者自身接收事件能力(10个事件),从而仅发送10个事件
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

示意图

  • 特别注意 
    在同步订阅状况中使用FlowableEmitter.requested()时,有如下几种使用特性须要注意的:

示意图

状况1:可叠加性

  • 即:观察者可连续要求接收事件,被观察者会进行叠加并一块儿发送
 
  • 1
  • 2
  • 3
  • 4
  • 代码演示
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

示意图

状况2:实时更新性

  • 即,每次发送事件后,emitter.requested()会实时更新观察者能接受的事件 
    1.  
    2.  

 

 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 代码演示
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

示意图

状况3:异常

  • FlowableEmitter.requested()减到0时,则表明观察者已经不可接收事件
  • 此时被观察者若继续发送事件,则会抛出MissingBackpressureException异常 
     
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

![示意图](http://upload-images.jianshu.io/upload_images/944365-acbf17d1cf200c0c.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

额外

  • 若观察者没有设置可接收事件数量,即无调用Subscription.request()
  • 那么被观察者默认观察者可接收事件数量 = 0,即FlowableEmitter.requested()的返回值 = 0

5.2.2 异步订阅状况

  • 原理说明

示意图

从上面能够看出,因为两者处于不一样线程,因此被观察者 没法经过 FlowableEmitter.requested()知道观察者自身接收事件能力,即 被观察者不能根据 观察者自身接收事件的能力 控制发送事件的速度。具体请看下面例子

 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

示意图

而在异步订阅关系中,反向控制的原理是:经过RxJava内部固定调用被观察者线程中的request(n) 从而 反向控制被观察者的发送事件速度

那么该何时调用被观察者线程中的request(n) & n 的值该是多少呢?请继续往下看。

  • 具体使用

关于RxJava内部调用request(n)(n = 12八、9六、0)的逻辑以下:

示意图

 

  • 代码演示

下面我将用一个例子来演示该原理的逻辑

 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

整个流程 & 测试结果 请看下图

示意图

5.3 采用背压策略模式:BackpressureStrategy

5.3.1 背压模式介绍

在Flowable的使用中,会被要求传入背压模式参数

示意图

  • 面向对象:针对缓存区
  • 做用:当缓存区大小存满、被观察者仍然继续发送下1个事件时,该如何处理的策略方式 
     

5.3.2 背压模式类型

![示意图](http://upload-images.jianshu.io/upload_images/944365-47b55edec299faea.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 下面我将对每种模式逐一说明。 **模式1:BackpressureStrategy.ERROR** - 问题:发送事件速度 > 接收事件 速度,即流速不匹配

 

 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

![示意图](http://upload-images.jianshu.io/upload_images/944365-0c56eb0868106c41.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) **模式2:BackpressureStrategy.MISSING** - 问题:发送事件速度 > 接收事件 速度,即流速不匹配

 

 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

![示意图](http://upload-images.jianshu.io/upload_images/944365-8f27f7fe6258bea6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) **模式3:BackpressureStrategy.BUFFER**

  • 问题:发送事件速度 > 接收事件 速度,即流速不匹配 
     
  • 处理方式:将缓存区大小设置成无限大 
    1. 即 被观察者可无限发送事件 观察者,但其实是存放在缓存区 
    2. 但要注意内存状况,防止出现OOM
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

能够接收超过原先缓存区大小(128)的事件数量了 ![示意图](http://upload-images.jianshu.io/upload_images/944365-f1fffce5c7925567.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) **模式4: BackpressureStrategy.DROP**

  • 问题:发送事件速度 > 接收事件 速度,即流速不匹配 
     
  • 处理方式:超过缓存区大小(128)的事件丢弃 
    如发送了150个事件,仅保存第1 - 第128个事件,第129 -第150事件将被丢弃
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

被观察者一会儿发送了150个事件,点击按钮接收时观察者接收了128个事件;再次点击接收时却没法接受事件,这说明超过缓存区大小的事件被丢弃了。 ![示意图](http://upload-images.jianshu.io/upload_images/944365-6b601cfdcaa0eb29.gif?imageMogr2/auto-orient/strip) **模式5:BackpressureStrategy.LATEST**

  • 问题:发送事件速度 > 接收事件 速度,即流速不匹配 
     
  • 处理方式:只保存最新(最后)事件,超过缓存区大小(128)的事件丢弃 
    即若是发送了150个事件,缓存区里会保存129个事件(第1-第128 + 第150事件)
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 被观察者一会儿发送了150个事件,点击按钮接收时观察者接收了128个事件;
  • 再次点击接收时却接收到1个事件(第150个事件),这说明超过缓存区大小的事件仅保留最后的事件(第150个事件)

示意图

5.3.3 特别注意

在使用背压策略模式的时候,有1种状况是须要注意的:

a. 背景 
FLowable 可经过本身建立(如上面例子),或经过其余方式自动建立,如interval操做符



 

b. 冲突 
- 对于自身手动建立FLowable的状况,可经过传入背压模式参数选择背压策略 
(即上面描述的)

  • 但是对于自动建立FLowable,却没法手动传入传入背压模式参数,那么出现流速不匹配的状况下,该如何选择 背压模式呢?
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

示意图

c. 解决方案 
RxJava 2.0内部提供 封装了背压策略模式的方法 
onBackpressureBuffer() 
onBackpressureDrop() 
onBackpressureLatest()

 

具体使用以下:

 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

从而很好地解决了发送事件 & 接收事件 速度不匹配的问题。

封装方法的示意图.gif

其他方法的做用相似于上面的说背压模式参数,此处不做过多描述。

背压策略模式小结

示意图

6. 总结

  • 本文主要对 Rxjava 的背压模式知识进行讲解

  • 接下来的时间,我将持续推出 Android中 Rxjava 2.0 的一系列文章,包括原理、操做符、应用场景、背压等等 ,有兴趣能够继续关注Carson_Ho的安卓开发笔记!!

示意图

相关文章
相关标签/搜索