rxjs在mpvue中的应用

前言

rxjs是一个响应式编程的库,它使异步编程和回调代码变得更为简单。 mpvue是一个小程序的框架。它使用vue的语法,使小程序的开发更加快捷简单,让前端开发人员的一套代码同时用于web端和小程序端变成了现实。php

先来聊聊rxjs

响应式编程是rxjs的核心概念之一。在主流的三大框架之中都获得了应用:vue的底层就采用了reactive programming.angular2+ 也全面引用了rxjs,,不论是在 http 仍是 animation 都用了 RxJS 的 Observable.Redux 从3.5版本开始,也加入了对Observable 操做的支持.甚至于主流的编程语言都有rx的Library,,好比RxRuby, RxPy, RxJava...等 RxJS 提供了一套完整的非同步解决方案,让咱们在面对各类非同步行为,不论是 Event, AJAX, 仍是 Animation 等,咱们均可以使用相同的 API 作开发。html

1.网页的世界是异步的

在前端开发过程当中,咱们会用到各类的js异步,如callback 或是 Promise 物件甚至是async/await,单随着应用愈来愈复杂,编写异步代码愈来愈困难和繁琐。异步常见的问题有: * 竞态条件 (Race Condition) * 内存泄漏 (Memory Leak) * 复杂的状态 (Complex State) * 异常处理 (Exception Handling)前端

  • 竞态条件: 发送了第一个请求以后又发送了第二条请求,这两条请求的顺序会影响到最终接收的不一样结果.
  • 内存泄露: 在单页面应用中,若是有对dom注册监听事件,而没有在适当的时机点把监听的事件移除,就会形成Memory Leak.
  • 复杂的状态: 好比说咱们有一支付费用户才能播放的影片,首先可能要先抓取这部影片的资讯,接着咱们要在播放时去验证使用者是否有权限播放,而使用者也有可能再按下播放后又当即按了取消,而这些都是非同步执行,这时就会各类复杂的状态须要处理.
  • 异常处理: JavaScript 的try/catch能够捕捉同步的例外,但非同步的程式就没这么容易,尤为当咱们的非同步行为很复杂时,这个问题就越发明显。

若是咱们用rxjs来处理,彷佛就变得方便了许多:vue

var handler = (e) => {
  console.log(e);
  document.body.removeEventListener('click', handler); 
 } 
 document.body.addEventListener('click', handler);
`
能够写成:
 `
Rx.Observable
  .fromEvent(document.body, 'click') // 注册监听
  .take(1) // 只取一次
  .subscribe(console.log);
复制代码

总之,RxJS 是一套藉由 Observable sequences 来组合非同步行为和事件基础程序的 Library,能够把 RxJS 想成处理 非同步行为 的 Lodash.是Functional Programming 及 Reactive Programming 两个编程思想的结合.react

2.聊聊Observable

RxJS 的基础就是 Observable,只要弄懂 Observable 就等于学会一半的 RxJS.Observable 就是观察者模式(Observer) 和 迭代器模式(Iterator) 两种思想的结合。web

  • Observer Patternajax

    观察者模式在api设计上获得普遍应用,常见的一个例子是编程

    function clickHandler(event) {
        console.log('user click!');
    }
    
    document.body.addEventListener('click', clickHandler)
    复制代码

    让咱们本身来实现一个:json

    //定义
    class Producer {
    	constructor() {
    		this.listeners = [];
    	}
    	addListener(listener) {
    		if(typeof listener === 'function') {
    			this.listeners.push(listener)
    		} else {
    			throw new Error('listener 必須是 function')
    		}
    	}
    	removeListener(listener) {
    		this.listeners.splice(this.listeners.indexOf(listener), 1)
    	}
    	notify(message) {
    		this.listeners.forEach(listener => {
    			listener(message);
    		})
    	}
    }
    
    //应用
    var egghead = new Producer(); 
    
    function listener1(message) {
    	console.log(message + 'from listener1');
    }
    
    function listener2(message) {
    	console.log(message + 'from listener2');
    }
    
    egghead.addListener(listener1); // 注册监听
    egghead.addListener(listener2);
    egghead.notify('A new course!!') // 执行
    
    //输出结果:
    a new course!! from listener1
    a new course!! from listener2
    复制代码

    这个例子很好的说明了 Observer Pattern 如何在event 跟 listeners的应用中作到解耦小程序

  • Iterator Pattern

    下面是一个使用Iterator的例子

    var arr = [1, 2, 3];
    
    var iterator = arr[Symbol.iterator]();
    
    iterator.next();
    // { value: 1, done: false }
    iterator.next();
    // { value: 2, done: false }
    iterator.next();
    // { value: 3, done: false }
    iterator.next();
    // { value: undefined, done: true }
    复制代码

    让咱们来动手制做一个:

    //定义
    class IteratorFromArray {
    	constructor(arr) {
    		this._array = arr;
    		this._cursor = 0;
    	}
      
    	next() {
    		return this._cursor < this._array.length ?
    		{ value: this._array[this._cursor++], done: false } :
    		{ done: true };
    	}
    	
    	map(callback) {
    		const iterator = new IteratorFromArray(this._array);
    		return {
    			next: () => {
    				const { done, value } = iterator.next();
    				return {
    					done: done,
    					value: done ? undefined : callback(value)
    				}
    			}
    		}
    	}
    }
    
    //使用
    var iterator = new IteratorFromArray([1,2,3]);
    var newIterator = iterator.map(value => value + 3);
    
    newIterator.next();
    // { value: 4, done: false }
    newIterator.next();
    // { value: 5, done: false }
    newIterator.next();
    // { value: 6, done: false }
    复制代码

    相似的还有generator的例子:

    function* getNumbers(words) {
    	for (let word of words) {
    		if (/^[0-9]+$/.test(word)) {
    		    yield parseInt(word, 10);
    		}
    	}
    }
    
    const iterator = getNumbers('12咱们3学习4');
    
    iterator.next();
    // { value: 1, done: false }
    iterator.next();
    // { value: 2, done: false }
    iterator.next();
    // { value: 3, done: false }
    iterator.next();
    // { value: 4, done: false }
    iterator.next();
    // { value: undefined, done: true }
    复制代码

总之,Observer 与 Iterator 都有共同的特性,就是渐进式的获取数据信息,差异在于Observer 是生产者push数据,Iterator 是消费者pull数据。而Observable 具有生产者推送数据的特性,同时能像序列,拥有序列处理数据的方法

3.建立Observable

RxJS 有一个核心和三个重点。核心就是Observable(map, filter...),三个重点分别是:Observer,Subject,Schedulers, 先来说讲Observable的用法。

创建 Observable: create

var observable = Rx.Observable
  .create(function(observer) {
  	observer.next('Jerry'); 
  	observer.next('Anna');
  })
复制代码

让咱们订阅observable,来接收数据

observable.subscribe(function(value) {
  console.log(value);
})
复制代码

须要注意的是,Observable不只能够处理异步状况,同步行为也是能够的。此外,观察者(Observer) 有三个方法:

  • next:每当 Observable 发送出新的值,next 方法就会被调用。
  • complete:在 Observable 没有其余的数据能够取得时,complete 方法就会被调用,在 complete 被调用以后,next 方法就不会再起做用。
  • error:每当 Observable 内发生错误时,error 方法就会被调用。
var observable = Rx.Observable
 	.create(function(observer) {
 			observer.next('Jerry');
 			observer.next('Anna');
 			observer.complete();
 			observer.next('not work');
 	})
 	
 var observer = {
 	next: function(value) {
 		console.log(value);
 	},
 	error: function(error) {
 		console.log(error)
 	},
 	complete: function() {
 		console.log('complete')
 	}
 }
 
 observable.subscribe(observer)
 
 //输出
 Jerry
 Anna
 complete
复制代码

4.Observable的经常使用方法

Observable 的经常使用方法包括create,of,from,fromEvent,fromPromise,never,empty,throw,interval,timer等等

of的用法,能够和上面的create方法作一个对比

var observable  = Rx.Observable.of('Jerry', 'Anna');
  observable.subscribe({
      next: function(value) {
          console.log(value)
      },
      complete: function() {
          console.log('complete!');
      },
      error: function(error) {
          console.log(error)
      }
  });
  
  // Jerry
  // Anna
  // complete!
复制代码

from的用法,参数是数组,对比of传入的是一个个参数

var arr = ['Jerry', 'Anna', 2016, 2017, '30 days'] 
  var observable = Rx.Observable.from(arr);
  
  observable.subscribe({
      next: function(value) {
          console.log(value)
      },
      complete: function() {
          console.log('complete!');
      },
      error: function(error) {
          console.log(error)
      }
  });
  
  // Jerry
  // Anna
  // 2016
  // 2017
  // 30 days
  // complete!
复制代码

此外,from的参数还能够是字符串或者promise

var observable = Rx.Observable
    .from(new Promise((resolve, reject) => {
      setTimeout(() => {
        resolve('Hello RxJS!');
      },3000)
    }))
    
  observable.subscribe({
      next: function(value) {
      	console.log(value)
      },
      complete: function() {
      	console.log('complete!');
      },
      error: function(error) {
      console.log(error)
      }
  });
  
  // Hello RxJS!
  // complete!
复制代码

fromEvent的用法,第一个参数是dom元素,第二个参数是要监听的事件名,例如对body作点击事件监听:

var observable = Rx.Observable.fromEvent(document.body, 'click');

  observable.subscribe({
      next: function(value) {
          console.log(value)
      },
      complete: function() {
          console.log('complete!');
      },
      error: function(error) {
          console.log(error)
      }
  });

复制代码

fromEventPattern,这个方法是给类事件使用。所谓的类事件就是指其行为跟事件相像,同时具备注册监听及移除监听两种行为,就像 DOM Event 有 addEventListener 及 removeEventListener

class Producer {
  	constructor() {
  		this.listeners = [];
  	}
  	addListener(listener) {
  		if(typeof listener === 'function') {
  			this.listeners.push(listener)
  		} else {
  			throw new Error('listener 必須是 function')
  		}
  	}
  	removeListener(listener) {
  		this.listeners.splice(this.listeners.indexOf(listener), 1)
  	}
  	notify(message) {
  		this.listeners.forEach(listener => {
  			listener(message);
  		})
  	}
  }
  
  var egghead = new Producer(); 
  // egghead 同時有 addEventListener 及 removeEventListener方法
  
  var observable = Rx.Observable
      .fromEventPattern(
          (handler) => egghead.addListener(handler), 
          (handler) => egghead.removeListener(handler)
      );
    
  observable.subscribe({
      next: function(value) {
          console.log(value)
      },
      complete: function() {
          console.log('complete!');
      },
      error: function(error) {
          console.log(error)
      }
  })
  
  egghead.notify('Hello! Can you hear me?');
  // Hello! Can you hear me?

复制代码

empty方法,会返回一个空的observable,当即执行complete

var observable = Rx.Observable.empty();

  observable.subscribe({
      next: function(value) {
          console.log(value)
      },
      complete: function() {
          console.log('complete!');
      },
      error: function(error) {
          console.log(error)
      }
  });
  // complete!

复制代码

never方法,会返回一个无穷的observable,就是一个一直存在,但什么都不作的observable

var observable = Rx.Observable.never();

  observable.subscribe({
      next: function(value) {
          console.log(value)
      },
      complete: function() {
          console.log('complete!');
      },
      error: function(error) {
          console.log(error)
      }
  });

复制代码

throw方法的做用就是抛出错误

var observable = Rx.Observable.throw('Oop!');

  observable.subscribe({
  	next: function(value) {
  		console.log(value)
  	},
  	complete: function() {
  		console.log('complete!');
  	},
  	error: function(error) {
          console.log('Throw Error: ' + error)
  	}
  });
  // Throw Error: Oop!

复制代码

interval方法,会发送一个从零开始依次递增的整数,它的参数是间隔时间,单位是毫秒

var observable = Rx.Observable.interval(1000);

  observable.subscribe({
  	next: function(value) {
  		console.log(value)
  	},
  	complete: function() {
  		console.log('complete!');
  	},
  	error: function(error) {
          console.log('Throw Error: ' + error)
  	}
  });
  // 0
  // 1
  // 2
  // ...

复制代码

timer方法与 interval有所不一样,它有两个参数。第一个参数表明发出第一个值的等待时间,第二个参数表明每次发出值的间隔时间

var observable = Rx.Observable.timer(1000, 5000);

  observable.subscribe({
  	next: function(value) {
  		console.log(value)
  	},
  	complete: function() {
  		console.log('complete!');
  	},
  	error: function(error) {
          console.log('Throw Error: ' + error)
  	}
  });
  //先等一秒
  // 0  
  // 1
  // 2 ...

复制代码

unsubscribe方法: 在订阅observable后,会返回一个subscription,它有一个能够释放资源的unsubscribe方法

var observable = Rx.Observable.timer(1000, 1000);

  // 取得 subscription
  var subscription = observable.subscribe({
  	next: function(value) {
  		console.log(value)
  	},
  	complete: function() {
  		console.log('complete!');
  	},
  	error: function(error) {
      console.log('Throw Error: ' + error)
  	}
  });
  
  setTimeout(() => {
      subscription.unsubscribe() // 中止订阅
  }, 5000);
  // 0
  // 1
  // 2
  // 3
  // 4

复制代码

map方法:参数是一个回调函数,对数据进行操做以后,再返回新的observable

var observable = Rx.Observable.interval(1000);
  var newest = observable.map(x => x + 2); 
  
  newest.subscribe(console.log);
  // 2
  // 3
  // 4
  // 5..
复制代码

让咱们本身动手来实现一下:

function map(callback) {
      return Rx.Observable.create((observer) => {
          return this.subscribe(
              (value) => { 
                  try{
                      observer.next(callback(value));
                  } catch(e) {
                      observer.error(e);
                  }
              },
              (err) => { observer.error(err); },
              () => { observer.complete() }
          )
      })
  }
  Rx.Observable.prototype.map = map;
  var people = Rx.Observable.of('Jerry', 'Anna');
  var helloPeople = people.map((item) => item + ' Hello~');
  
  helloPeople.subscribe(console.log);
  // Jerry Hello~
  // Anna Hello~
复制代码

mapTo方法:把原来的值都改为一个固定值

var observable = Rx.Observable.interval(1000);
  var newest = observable.mapTo(2); 
  
  newest.subscribe(console.log);
  // 2
  // 2
  // 2
  // 2..
复制代码

filter方法:类型Array的filter方法,过滤出一些值.

var observable = Rx.Observable.interval(1000);
  var newest = observable.filter(x => x % 2 === 0); 
  
  newest.subscribe(console.log);
  // 0
  // 2
  // 4
  // 6..
复制代码

take方法:取前多少个元素就结束.

var observable = Rx.Observable.interval(1000);
  var example = observable.take(3);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 1
  // 2
  // complete
复制代码

first方法:至关于take(1),取出第一个元素后就结束

var observable = Rx.Observable.interval(1000);
  var example = observable.first();
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  
  // 0
  // complete
复制代码

takeUntil方法:在某个事件发生时,结束.

var observable = Rx.Observable.interval(1000);
  var click = Rx.Observable.fromEvent(document.body, 'click');
  var example = observable.takeUntil(click);     
     
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 1
  // 2
  // 3
  // complete //点击body元素时
复制代码

concatAll方法:当Observable传递的元素仍是observable时,相似于二维数组,咱们经过这个方法把它扁平化为一维数组(concatAll 会一个一个处理,必定是等前一个 observable 完成(complete)才会处理下一个 observable)

var observable = Rx.Observable.fromEvent(document.body, 'click');
  var source = observable.map(e => Rx.Observable.of(1,2,3));
  
  var example = source.concatAll();
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
复制代码

switch方法:同concatAll相似,扁平化observable。(它会在新的 observable 送出后直接处理新的 observable 无论前一个 observable 是否完成,每当有新的 observable 送出就会直接把旧的 observable 退订(unsubscribe),永远只处理最新的 observable!)

var click = Rx.Observable.fromEvent(document.body, 'click');
  var source = click.map(e => Rx.Observable.interval(1000));
  
  var example = source.switch();
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
复制代码

mergeAll:同concatAll,switch相似,扁平化observable。mergeAll 能够传入一个数值,这个数值表明他能够同时处理的 observable 数量(不会像 switch 同样退订(unsubscribe)原先的 observable 而是并行处理多个 observable)

var click = Rx.Observable.fromEvent(document.body, 'click');
  var source = click.map(e => Rx.Observable.interval(1000));
  
  var example = source.mergeAll();
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  //----------------00---11---22---33---(04)4--...
复制代码

下面来实现一个拖拉功能:页面上有个元素(#drag),在该元素上按下左键(mousedown)时,开始监听鼠标滑动的位置。当鼠标释放时(mouseup),结束监听。当鼠标移动时(mousemove),改变元素的位置

const dragDOM = document.getElementById('drag');
  const body = document.body;
  
  const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown');
  const mouseUp = Rx.Observable.fromEvent(body, 'mouseup');
  const mouseMove = Rx.Observable.fromEvent(body, 'mousemove');
  
  mouseDown
    .map(event => mouseMove.takeUntil(mouseUp))
    .concatAll()
    .map(event => ({ x: event.clientX, y: event.clientY }))
    .subscribe(pos => {
    	dragDOM.style.left = pos.x + 'px';
      dragDOM.style.top = pos.y + 'px';
    })
复制代码

skip方法:跳过前几个元素,从后面继续取值

var observable = Rx.Observable.interval(1000);
  var example = observable.skip(3);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 3
  // 4
  // 5...
复制代码

takeLast方法:从后面取值

var observable = Rx.Observable.interval(1000).take(6);
  var example = observable.takeLast(2);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 4
  // 5
  // complete
复制代码

last方法:用来取到最后的元素

var observable = Rx.Observable.interval(1000).take(6);
  var example = observable.last();
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 5
  // complete
复制代码

concat方法:把多个observable合并成一个

var source = Rx.Observable.interval(1000).take(3);
  var source2 = Rx.Observable.of(3)
  var source3 = Rx.Observable.of(4,5,6)
  var example = source.concat(source2, source3);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 1
  // 2
  // 3
  // 4
  // 5
  // 6
  // complete
复制代码

startWith方法:在observable最开始插入元素

var observable = Rx.Observable.interval(1000);
  var example = observable.startWith(0);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 0
  // 1
  // 2
  // 3...
复制代码

merge方法:合并observable,和concat不一样的是:merge把多个observable同时处理,而concat处理完一个以后才会处理接下来的observable

var source = Rx.Observable.interval(500).take(3);
  var source2 = Rx.Observable.interval(300).take(6);
  var example = source.merge(source2);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 0
  // 1
  // 2
  // 1
  // 3
  // 2
  // 4
  // 5
  // complete
复制代码

combineLatest方法:它会取到各个observable的值,通过处理后再输出

var observale = Rx.Observable.interval(500).take(3);
  var newest = Rx.Observable.interval(300).take(6);
  
  var example = observale.combineLatest(newest, (x, y) => x + y);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 1
  // 2
  // 3
  // 4
  // 5
  // 6
  // 7
  // complete
复制代码

分析:combineLatest会等两个observable都有传送值的时候才会执行callback

zip方法:会取每一个observable相同位置的元素传入callback

var observale = Rx.Observable.interval(500).take(3);
  var newest = Rx.Observable.interval(300).take(6);
  
  var example = observale.zip(newest, (x, y) => x + y);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 2
  // 4
  // complete
复制代码

分析:zip 会等到 observale 跟 newest 都送出了第一个元素,再传入 callback,下次则等到 observale 跟 newest 都送出了第二个元素再一块儿传入 callback

withLatestFrom方法:在主observable送出新值时,才会执行callback

var main = Rx.Observable.from('hello').zip(Rx.Observable.interval(500), (x, y) => x);
  var some = Rx.Observable.from([0,1,0,0,0,1]).zip(Rx.Observable.interval(300), (x, y) => x);
  
  var example = main.withLatestFrom(some, (x, y) => {
      return y === 1 ? x.toUpperCase() : x;
  });
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // h
  // e
  // l
  // L
  // O
  // complete
复制代码

分析:withLatestFrom 会在 main 送出值的时候执行 callback,但请注意若是 main 送出值时 some 以前没有送出过任何值 callback 仍然不会执行

用学到的方法实现一个较为复杂的拖拉功能

const video = document.getElementById('video');
   const anchor = document.getElementById('anchor');
   const scroll = Rx.Observable.fromEvent(document, 'scroll');
   scroll.map(e => anchor.getBoundingClientRect().bottom < 0)
   .subscribe(bool => {
       if(bool) {
           video.classList.add('video-fixed');
       } else {
           video.classList.remove('video-fixed');
       }
   })
   
   const mouseDown = Rx.Observable.fromEvent(video, 'mousedown')
   const mouseUp = Rx.Observable.fromEvent(document, 'mouseup')
   const mouseMove = Rx.Observable.fromEvent(document, 'mousemove')
   const validValue = (value, max, min) => {
       return Math.min(Math.max(value, min), max)
   }
   mouseDown
       .filter(e => video.classList.contains('video-fixed'))
       .map(e => mouseMove.takeUntil(mouseUp))
       .concatAll()
       .withLatestFrom(mouseDown, (move, down) => {
           return {
               x: validValue(move.clientX - down.offsetX, window.innerWidth - 320, 0),
               y: validValue(move.clientY - down.offsetY, window.innerHeight - 180, 0)
           }
       })
       .subscribe(pos => {
           video.style.top = pos.y + 'px';
           video.style.left = pos.x + 'px';
       })

复制代码

scan方法:相似Array的reduce方法,第一个参数传入callback,第二个参数传入初始值(能够没有)。返回一个observable 实例

var source = Rx.Observable.from('hello')
           .zip(Rx.Observable.interval(600), (x, y) => x);

  var observable = source.scan((origin, next) => origin + next, '');
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // h
  // he
  // hel
  // hell
  // hello
  // complete
复制代码

buffer方法:它会把本来的 observable (source)送出的元素缓存在数组中,等到传入的 observable(source2) 送出元素时,就会触发把缓存的元素送出。

var source = Rx.Observable.interval(300);
  var source2 = Rx.Observable.interval(1000);
  var observable = source.buffer(source2);
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // [0,1,2]
  // [3,4,5]
  // [6,7,8]...
复制代码

bufferCount方法:逢n个数缓存在数组中,并输出

var source = Rx.Observable.interval(300);
  var observable = source.bufferCount(3);
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // [0,1,2]
  // [3,4,5]
  // [6,7,8]...
复制代码

bufferTime方法:以下例子,鼠标在500ms内连续点两下才输出

const button = document.getElementById('demo');
  const click = Rx.Observable.fromEvent(button, 'click')
  const observable = click
                  .bufferTime(500)
                  .filter(arr => arr.length >= 2);
  
  observable.subscribe({
      next: (value) => { console.log('success'); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
复制代码

delay方法:延迟多久以后再输出元素,参数能够是数字(ms),也能够是日期格式

var source = Rx.Observable.interval(300).take(5);
  var observable = source.delay(500);
  // observable = source.delay(new Date(new Date().getTime() + 1000));
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  //延迟500ms以后
  // 0
  // 1
  // 2
  // 3
  // 4
复制代码

delayWhen方法:delayWhen 能够影响每一个元素,并且须要传一个 callback 并回传一个 observable

var source = Rx.Observable.interval(300).take(5);
  var observable = source
                .delayWhen(
                    x => Rx.Observable.empty().delay(100 * x * x)
                );
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
复制代码

用学到的方法实现一个小功能:许多图片跟着鼠标跑,可是不能跑的同样快

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width">
    <title>JS Bin</title>
    <style>
        * {
            margin: 0;
            padding: 0;
            cursor: pointer;
        }
        img {
          width: 50px;
          position: absolute;
          border-radius: 50%;
          border: 3px white solid;
          transform: translate3d(0,0,0);        
        }
    </style>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.js"></script>
  </head>
  <body>
    <img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover6.jpg" alt="">
    <img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover5.jpg" alt="">
    <img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover4.jpg" alt="">
    <img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover3.jpg" alt="">
    <img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover2.jpg" alt="">
    <img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover1.jpg" alt="">
    <script>
          var imgList = document.getElementsByTagName('img');
          var movePos = Rx.Observable.fromEvent(document, 'mousemove')
          .map(e => ({ x: e.clientX, y: e.clientY }))
          
          function followMouse(DOMArr) {
            const delayTime = 600;
            DOMArr.forEach((item, index) => {
              movePos
                .delay(delayTime * (Math.pow(0.65, index) + Math.cos(index / 4)) / 2) //时间规则能够换成其余的
                .subscribe(function (pos){
                  item.style.transform = 'translate3d(' + (pos.x-25) + 'px, ' + (pos.y-25) + 'px, 0)';
                });
            });
          }
          followMouse(Array.from(imgList))
      </script>
  </body>
</html>
复制代码

debounce 方法:和防抖函数功能一致,debounce 跟 debounceTime 一个是传入 observable 另外一个则是传入毫秒,比较经常使用到的是 debounceTime:会先把元素cache 住并等待一段时间,若是这段时间内已经没有收到任何元素,则把元素送出;若是这段时间内又收到新的元素,则会把本来cache 住的元素释放 掉并从新计时,不断反复

const searchInput = document.getElementById('searchInput');
  const theRequestValue = document.getElementById('theRequestValue');
  
  Rx.Observable.fromEvent(searchInput, 'input')
    .debounceTime(300)
    .map(e => e.target.value)
    .subscribe((value) => {
      theRequestValue.textContent = value;
      // 在这请求接口
    })
复制代码

throttle 方法:和节流函数功能一致,throttle 跟 throttleTime 一个是传入 observable 另外一个则是传入毫秒,比较经常使用到的是throttleTime:会先开放送出元素,等到有元素被送出就会沉默一段时间,等到时间过了又会开放发送元素,throttle 是在控制行为的最高频率,更适合用在连续性行为

var source = Rx.Observable.interval(300).take(5);
  var observable = source.throttleTime(1000);
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 4
  // complete
复制代码

distinct方法:相同的值只留一个

var source = Rx.Observable.from([{ value: 'a'}, { value: 'b' }, { value: 'c' }, { value: 'a' }, { value: 'c' }])
              .zip(Rx.Observable.interval(300), (x, y) => x);
  var observable = source.distinct((x) => {
      return x.value
  });
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // {value: "a"}
  // {value: "b"}
  // {value: "c"}
  // complete
复制代码

distinctUntilChanged方法:跟 distinct 同样会把相同的元素过滤掉,但 distinctUntilChanged 只会跟最后一次送出的元素比较,不会每一个都比

var source = Rx.Observable.from(['a', 'b', 'c', 'c', 'b'])
          .zip(Rx.Observable.interval(300), (x, y) => x);
  var observable = source.distinctUntilChanged()
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // a
  // b
  // c
  // b
  // complete
复制代码

catch方法:用来处理错误

var source = Rx.Observable.from(['a','b','c',2])
          .zip(Rx.Observable.interval(500), (x,y) => x);

  var observable = source
                  .map(x => x.toUpperCase())
                  .catch(error => Rx.Observable.of('h'));
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });    
  // A
  // B
  // C
  // h
  // complete
复制代码

retry方法:当某个Observable发生错误时,从头尝试循环Observable.参数为循环的次数

var source = Rx.Observable.from(['a','b',2])
          .zip(Rx.Observable.interval(500), (x,y) => x);

  var observable = source
                  .map(x => x.toUpperCase())
                  .retry(1);
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  }); 
  // A
  // B
  // A
  // B
  // Error: TypeError: x.toUpperCase is not a function
复制代码

retryWhen方法:当某个Observable发生错误时,去作某些处理以后再去循环尝试

var source = Rx.Observable.from(['a','b','c','d',2])
          .zip(Rx.Observable.interval(500), (x,y) => x);

  var observable = source
                  .map(x => x.toUpperCase())
                  .retryWhen(
                  errorObs => errorObs.map(err => fetch('...')));
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  }); 
复制代码

repeat方法:和retry相似,是一直重复订阅的效果,但没有错误发生。参数是循环次数,没有参数就默认无限循环

var source = Rx.Observable.from(['a','b','c'])
          .zip(Rx.Observable.interval(500), (x,y) => x);

  var observable = source.repeat(1);
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
复制代码

concatMap方法:等同于 map 加上 concatAll,在前一个observable执行完成后再执行下一个observable

var source = Rx.Observable.fromEvent(document.body, 'click');
  var observable = source
                  .concatMap(
                      e => Rx.Observable.interval(100).take(3)
                  );
                  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
复制代码

此外,concatMap 还有第二个参数是一个 selector callback,这个 callback 会传入四个参数,分别是 外部 observable 送出的元素、内部 observable 送出的元素、外部 observable 送出元素的 index、 内部 observable 送出元素的 index

function getPostData() {
      return fetch('https://jsonplaceholder.typicode.com/posts/1')
      .then(res => res.json())
  }
  var source = Rx.Observable.fromEvent(document.body, 'click');
  
  var observable = source.concatMap(
                  e => Rx.Observable.from(getPostData()), 
                  (e, res, eIndex, resIndex) => res.title); //res就是Rx.Observable.from(getPostData())的observable
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
复制代码

switchMap方法:等同于 map 加上 switch,会在下一个 observable 被送出后直接退订前一个未处理完的 observable

function getPostData() {
      return fetch('https://jsonplaceholder.typicode.com/posts/1')
      .then(res => res.json())
  }
  var source = Rx.Observable.fromEvent(document.body, 'click');
  
  var observable = source.switchMap(
                      e => Rx.Observable.from(getPostData()));
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
复制代码

mergeMap方法:等同于 map 加上 mergeAll ,能够并行处理多个 observable(也能传入第二个参数 selector callback,这个 selector callback 跟 concatMap 第二个参数也是彻底同样的,但 mergeMap 的重点是咱们能够传入第三个参数,来限制并行处理的数量)

var source = Rx.Observable.fromEvent(document.body, 'click');

  var observable = source
                  .mergeMap(
                      e => Rx.Observable.interval(100).take(3)
                  );
                  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
复制代码

总之

*  concatMap 用在能够肯定内部的 observable 结束时间比外部 observable 发送时间来快的情境,而且不但愿有任何并行处理行为,适合少数要一次一次完成到底的的 UI 动画或特别的 HTTP request 行为。
*  switchMap 用在只要最后一次行为的结果,适合绝大多数的使用情境。
*  mergeMap 用在并行处理多个 observable,适合须要并行处理的行为,像是多个 I/O 的并行处理。
复制代码

小范例(制做一个简易的autocomplete)

<!DOCTYPE html>
  <html>
  <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width">
    <title>JS Bin</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.js"></script>
    <style>
          html, body {
              height: 100%;
              background-color: white;
              padding: 0;
              margin: 0;
          }
          .autocomplete {
              position: relative;
              display: inline-block;
              margin: 20px;
          }
          .input {
              width: 200px;
              border: none;
              border-bottom: 1px solid black;
              padding: 0;
              line-height: 24px;
              font-size: 16px;
          }
          .input:focus {
              outline: none;
              border-bottom-color: blue;
          }
          .suggest {
              width: 200px;
              list-style: none;
              padding: 0;
              margin: 0;
              -webkit-box-shadow: 0 2px 4px rgba(0,0,0,0.2);
              
          }
          .suggest li {
                  cursor: pointer;
                  padding: 5px;
          }
          .suggest li:hover {
              background-color: lightblue;
          }
    </style>
  </head>
  <body>
    <div class="autocomplete">
      <input class="input" type="search" id="search" autocomplete="off">
      <ul id="suggest-list" class="suggest">
      </ul>
    </div>
    <script>
          const url = 'https://zh.wikipedia.org/w/api.php?action=opensearch&format=json&limit=5&origin=*';
  
          const getSuggestList = (keyword) => fetch(url + '&search=' + keyword, { method: 'GET', mode: 'cors' })
                                              .then(res => res.json())
          const searchInput = document.getElementById('search');
          const suggestList = document.getElementById('suggest-list');
  
          const keyword = Rx.Observable.fromEvent(searchInput, 'input');
          const selectItem = Rx.Observable.fromEvent(suggestList, 'click');
          const render = (suggestArr = []) => suggestList.innerHTML = suggestArr.map(item => '<li>'+ item +'</li>').join('')
  
          keyword
          // .filter(e => e.target.value.length > 2) 使用者打了 2 个字以上在发送 request
          .debounceTime(100)
          .switchMap(
              e => getSuggestList(e.target.value),//.retry(3) 在 API 失败的时候从新尝试 3 次
              (e, res) => res[1]
          )
          .subscribe(list => render(list))
  
          selectItem
          .filter(e => e.target.matches('li'))
          .map(e => e.target.innerText)
          .subscribe(text => { 
              searchInput.value = text;
              render();
          })
    </script>
  </body>
  </html>
复制代码

window方法:把拆出来的元素放入observable并送出observable,相似buffer拆分出來的元素放到数组并送出数组

var click = Rx.Observable.fromEvent(document, 'click');
  var source = Rx.Observable.interval(1000);
  var observable = source.window(click);
  
  observable
    .switch()
    .subscribe(console.log);
  // 0
  // 1
  // 2
  // 3
  // 4
  // 5 ...
复制代码

windowToggle方法,传入两个参数:第一个是开始的observable,第二个是一个回调函数回传一个结束的observable

var source = Rx.Observable.interval(1000);
  var mouseDown = Rx.Observable.fromEvent(document, 'mousedown');
  var mouseUp = Rx.Observable.fromEvent(document, 'mouseup');
  
  var observable = source
    .windowToggle(mouseDown, () => mouseUp)
    .switch();
    
  observable.subscribe(console.log);
复制代码

groupBy方法:把相同条件元素拆分红一个observable

var people = [
  {name: 'Anna', score: 100, subject: 'English'},
  {name: 'Anna', score: 90, subject: 'Math'},
  {name: 'Anna', score: 96, subject: 'Chinese' }, 
  {name: 'Jerry', score: 80, subject: 'English'},
  {name: 'Jerry', score: 100, subject: 'Math'},
  {name: 'Jerry', score: 90, subject: 'Chinese' }, 
  ];
  var source = Rx.Observable.from(people)
      .zip(
       Rx.Observable.interval(300), 
       (x, y) => x);
  
  var observable = source
    .groupBy(person => person.name)
    .map(group => group.reduce((acc, curr) => ({ 
  	    name: curr.name,
  	    score: curr.score + acc.score 
  	})))
  	.mergeAll();
  	
  observable.subscribe(console.log);
  // { name: "Anna", score: 286 }
  // { name: 'Jerry', score: 270 }
复制代码

5.Observable的特性

Observable 与数组相比,有两大不一样:1.延迟运算 2.渐进式取值

1.延迟运算:observable会等到订阅后才开始对元素作运算,若是没有订阅就不会有运算的行为

var source = Rx.Observable.from([1,2,3,4,5]);
  var example = source.map(x => x + 1);
  //上面的代码就不会去作运算
  
  var source = [1,2,3,4,5];
  var example = source.map(x => x + 1);
  //数组执行完以后,已经作了运算
复制代码

2.渐进式取值:observable的每次运算会一算到底,并非数组的运算彻底部的元素以后再返回

var source = Rx.Observable.from([1,2,3]);
  var example = source
                .filter(x => x % 2 === 0)
                .map(x => x + 1)
  
  example.subscribe(console.log);
  //1到filter被过滤掉;2到filter再到mapb变成3返回并打印;3到filter被过滤掉
  
  var source = [1,2,3];
  var example = source
                .filter(x => x % 2 === 0) 
                .map(x => x + 1)
  //数组执行到filter会返回完整的数组[2],再到map返回完整的数组[3]
复制代码

6.Subject是什么

Subject 同时是 Observable 又是 Observer,Subject 会对内部的 observers 清单进行组播(multicast)。是 Observer Pattern 的实例而且继承自 Observable。

动手实现一个Subject

var source = Rx.Observable.interval(1000).take(3);

  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  var subject = {
      observers: [],
      subscribe: function(observer) {
          this.observers.push(observer)
      },
      next: function(value) {
          this.observers.forEach(o => o.next(value))    
      },
      error: function(error){
          this.observers.forEach(o => o.error(error))
      },
      complete: function() {
          this.observers.forEach(o => o.complete())
      }
  }
  
  subject.subscribe(observerA)
  
  source.subscribe(subject);
  
  setTimeout(() => {
      subject.subscribe(observerB);
  }, 1000);
  
  // "A next: 0"
  // "A next: 1"
  // "B next: 1"
  // "A next: 2"
  // "B next: 2"
  // "A complete!"
  // "B complete!"
复制代码

对比一下真正的subject:能够看出,Subject 能够拿去订阅 Observable(source) 表明他是一个 Observer,同时 Subject 又能够被 Observer(observerA, observerB) 订阅,表明他是一个 Observable。

var source = Rx.Observable.interval(1000).take(3);

  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  var subject = new Rx.Subject()
  
  subject.subscribe(observerA)
  
  source.subscribe(subject);
  
  setTimeout(() => {
      subject.subscribe(observerB);
  }, 1000);
  
  // "A next: 0"
  // "A next: 1"
  // "B next: 1"
  // "A next: 2"
  // "B next: 2"
  // "A complete!"
  // "B complete!"
复制代码

在某些没法直接使用Observable的前端框架中,咱们能够用subject

class MyButton extends React.Component {
      constructor(props) {
          super(props);
          this.state = { count: 0 };
          this.subject = new Rx.Subject();
          
          this.subject
              .mapTo(1)
              .scan((origin, next) => origin + next)
              .subscribe(x => {
                  this.setState({ count: x })
              })
      }
      render() {
          return <button onClick={event => this.subject.next(event)}>{this.state.count}</button>
      }
  }
复制代码

BehaviorSubject: 但愿 Subject 能表明当下的状态,而不是简单的事件发送

var subject = new Rx.BehaviorSubject(0); // 0 为起始值
  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  subject.subscribe(observerA);
  // "A next: 0"  若是是普通的subject则不会输出此行
  subject.next(1);
  // "A next: 1"
  subject.next(2);
  // "A next: 2"
  subject.next(3);
  // "A next: 3"
  
  setTimeout(() => {
      subject.subscribe(observerB); 
      // "B next: 3"   若是是普通的subject则不会输出此行
  },3000)
复制代码

ReplaySubject:在某些时候咱们会但愿 Subject 表明事件,但又能在新订阅时从新发送最后的几个元素

var subject = new Rx.ReplaySubject(2); // 重复发送最后两个元素
  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  subject.subscribe(observerA);
  subject.next(1);
  // "A next: 1"
  subject.next(2);
  // "A next: 2"
  subject.next(3);
  // "A next: 3"
  
  setTimeout(() => {
      subject.subscribe(observerB);
      // "B next: 2"
      // "B next: 3"
  },3000) //ReplaySubject 只是事件的重放而已。
复制代码

AsyncSubject: 会在subject结束后送出最后一个值,行为和promise很像

var subject = new Rx.AsyncSubject();
  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  subject.subscribe(observerA);
  subject.next(1);
  subject.next(2);
  subject.next(3);
  subject.complete();
  // "A next: 3"
  // "A complete!"
  
  setTimeout(() => {
      subject.subscribe(observerB);
      // "B next: 3"
      // "B complete!"
  },3000)
复制代码

multicast:能够用来挂载 subject 并回传一个可连结(connectable)的 observable

var source = Rx.Observable.interval(1000)
           .take(3)
           .multicast(new Rx.Subject());

  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  source.subscribe(observerA); // subject.subscribe(observerA)
  
  source.connect(); // source.subscribe(subject)
  
  setTimeout(() => {
      source.subscribe(observerB); // subject.subscribe(observerB)
  }, 1000);
复制代码

refCount:需和 multicast一块儿使用,它能够创建一个只须要订阅就自动connect的observable

var source = Rx.Observable.interval(1000)
               .do(x => console.log('send: ' + x))
               .multicast(new Rx.Subject())
               .refCount();
  
  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  var subscriptionA = source.subscribe(observerA);
  // 订阅数 0 => 1
  
  var subscriptionB;
  setTimeout(() => {
      subscriptionB = source.subscribe(observerB);
      // 订阅数 0 => 2
  }, 1000);
复制代码

publish:是multicast(new Rx.Subject())的简化写法

var source = Rx.Observable.interval(1000)
           .publish() 
           .refCount();
           
  // var source = Rx.Observable.interval(1000)
  //             .multicast(new Rx.Subject()) 
  //             .refCount();
  
  还有另外三种等价关系:
  
  publishReplay:
  var source = Rx.Observable.interval(1000)
               .publishReplay(1) 
               .refCount();
               
  // var source = Rx.Observable.interval(1000)
  //             .multicast(new Rx.ReplaySubject(1)) 
  //             .refCount();
  
  publishBehavior:
  var source = Rx.Observable.interval(1000)
               .publishBehavior(0) 
               .refCount();
               
  // var source = Rx.Observable.interval(1000)
  //             .multicast(new Rx.BehaviorSubject(0)) 
  //             .refCount();
  
  publishLast:
  var source = Rx.Observable.interval(1000)
               .publishLast() 
               .refCount();
               
  // var source = Rx.Observable.interval(1000)
  //             .multicast(new Rx.AsyncSubject(1)) 
  //             .refCount();
复制代码

share:是publish + refCount的简化

var source = Rx.Observable.interval(1000)
               .share();
               
  // var source = Rx.Observable.interval(1000)
  //             .publish() 
  //             .refCount();
  
  // var source = Rx.Observable.interval(1000)
  //             .multicast(new Rx.Subject()) 
  //             .refCount();
复制代码

7.Subject与Observable的区别

Subject 能够简单理解为为了在多个订阅中共用执行结果而存在的

Subject 可让咱们用命令的方式发送值到一个observable串流中。若是由于框架限制,咱们没法直接建立observable,如React的Event中就可使用Subject

Subject是 Observer Design Pattern的实例。当observer订阅subject时,subject会把订阅者push进一个订阅者清单中,在元素发送时就去遍历这份清单把元素一一送出。这跟observable像一个function执行是彻底不一样的。

Subject继承了Observable,因此才会有Observable的全部方法。主要的只有next、error、 complete、subscribe 及 unsubscribe 五个方法

Subject同时具备Observer和Observable的特性,跟 Observable 最大的区别就是它是有状态的,也就是存储的那份清单。

当一个observable操做过程当中发生了 side-effect,而咱们不但愿由于多个subscribe而被触发屡次,就须要用到Subject

//不用subject,A和B打印的值不一致
 var result = Rx.Observable.interval(1000).take(6)
               .map(x => Math.random());// side-effect
  var subA = result.subscribe(x => console.log('A: ' + x));
  var subB = result.subscribe(x => console.log('B: ' + x));
  
  //subject,A和B打印的值一致
  var result = Rx.Observable.interval(1000).take(6)
               .map(x => Math.random()) // side-effect
               .multicast(new Rx.Subject())
               .refCount();
  
  var subA = result.subscribe(x => console.log('A: ' + x));
  var subB = result.subscribe(x => console.log('B: ' + x));
复制代码

8.简易版 Observable 的实现

// 空的 observer 
  const emptyObserver = {
    next: () => {},
    error: (err) => { throw err; },
    complete: () => {}
  }
  
  class Observer {
    constructor(destinationOrNext, error, complete) {
      switch (arguments.length) {
        case 0:
          // 空的 observer
          this.destination = this.safeObserver(emptyObserver);
          break;
        case 1:
          if (!destinationOrNext) {
            // 空的 observer
            this.destination = this.safeObserver(emptyObserver);
            break;
          }
          if (typeof destinationOrNext === 'object') {
            // 传入 observer 对象
            this.destination = this.safeObserver(destinationOrNext);
            break;
          }
        default:
          // 若是上面都不是,表示传入了一到三个 function
          this.destination = this.safeObserver(destinationOrNext, error, complete);
          break;
      }
    }
    safeObserver(observerOrNext, error, complete) {
      let next;
  
      if (typeof (observerOrNext) === 'function') {
        // observerOrNext 是 next function
        next = observerOrNext;
      } else if (observerOrNext) {
        // observerOrNext 是 observer 
        next = observerOrNext.next || () => {};
        error = observerOrNext.error || function(err) { 
          throw err 
        };
        complete = observerOrNext.complete || () => {};
      }
      // 返回预期的 observer 对象
      return {
        next: next,
        error: error,
        complete: complete
      };
    }
    
    next(value) {
      if (!this.isStopped && this.next) {
        // 判断是否中止过
        try {
          this.destination.next(value); // 传送值
        } catch (err) {
          this.unsubscribe();
          throw err;
        }
      }
    }
    
    error(err) {
      if (!this.isStopped && this.error) {
        // 判断是否中止过
        try {
          this.destination.error(err); // 传送错误
        } catch (anotherError) {
          this.unsubscribe();
          throw anotherError;
        }
        this.unsubscribe();
      }
    }
  
    complete() {
      if (!this.isStopped && this.complete) {
        // 判断是否中止过
        try {
          this.destination.complete(); // 完成
        } catch (err) {
          this.unsubscribe();
          throw err;
        }
        this.unsubscribe(); // 退订
      }
    }
    
    unsubscribe() {
      this.isStopped = true;
    }
  }
  
  function create(subscriber) {
      const observable = {
          subscribe: function(observerOrNext, error, complete) {
              const realObserver = new Observer(observerOrNext, error, complete)
              subscriber(realObserver);
              return realObserver;
          }       
      };
      return observable;
  }
  
  var observable = create(function(observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
    observer.next('not work');
  })
  
  var observer = {
    next: function(value) {
      console.log(value)
    },
    complete: function() {
        console.log('complete!')
    }
  }
  
  observable.subscribe(observer);
复制代码

9.复杂版 Observable 的实现

// 定义空的 observer 
  const emptyObserver = {
    next: () => {},
    error: (err) => { throw err; },
    complete: () => {}
  }
  
  class Observer {
    constructor(destinationOrNext, error, complete) {
      switch (arguments.length) {
        case 0:
          // 空的 observer
          this.destination = this.safeObserver(emptyObserver);
          break;
        case 1:
          if (!destinationOrNext) {
            // 空的 observer
            this.destination = this.safeObserver(emptyObserver);
            break;
          }
          // 判断传入的 destinationOrNext 是不是 Observer 的实例,若是是就不用执行 `this.safeObserver`
          if(destinationOrNext instanceof Observer){
            this.destination = destinationOrNext;
            break;
          }
          if (typeof destinationOrNext === 'object') {
            // 传入 observer 
            this.destination = this.safeObserver(destinationOrNext);
            break;
          }
        default:
          // 若是上面都不是,说明传入了一到三个 function
          this.destination = this.safeObserver(destinationOrNext, error, complete);
          break;
      }
    }
    safeObserver(observerOrNext, error, complete) {
      let next;
  
      if (typeof (observerOrNext) === 'function') {
        // observerOrNext 是 next function
        next = observerOrNext;
      } else if (observerOrNext) {
        // observerOrNext 是 observer 
        next = observerOrNext.next || () => {};
        error = observerOrNext.error || function(err) { 
          throw err 
        };
        complete = observerOrNext.complete || () => {};
      }
      // 返回预期的observer
      return {
        next: next,
        error: error,
        complete: complete
      };
    }
    
    next(value) {
      if (!this.isStopped && this.next) {
        // 是否中止
        try {
          this.destination.next(value); // 发送值
        } catch (err) {
          this.unsubscribe();
          throw err;
        }
      }
    }
    
    error(err) {
      if (!this.isStopped && this.error) {
        // 是否中止
        try {
          this.destination.error(err); // 发送值
        } catch (anotherError) {
          this.unsubscribe();
          throw anotherError;
        }
        this.unsubscribe();
      }
    }
  
    complete() {
      if (!this.isStopped && this.complete) {
        // 是否中止
        try {
          this.destination.complete(); // 完成
        } catch (err) {
          this.unsubscribe();
          throw err;
        }
        this.unsubscribe(); // 中止订阅
      }
    }
    
    unsubscribe() {
      this.isStopped = true;
    }
  }
  
  class MapObserver extends Observer {
    constructor(observer, callback) {
      // 传入原来的 observer 和 map 的 callback
      super(observer); // 继承Observer
      this.callback = callback; // 保存 callback
      this.next = this.next.bind(this); // 确保 next 的 this
    }
    next(value) {
      try {
        this.destination.next(this.callback(value)); 
        // this.destination 是父类 Observer 保存的 observer 
        // 这里 this.callback(value) 就是 map 的操做
      } catch (err) {
        this.destination.error(err);
        return;
      }
    }
  }
  
  class Observable {
    constructor(subscribe) {
      this._subscribe = subscribe; // 把 subscribe 存到属性中
    }
    subscribe(observerOrNext, error, complete) {
      const observer = new Observer(observerOrNext, error, complete);
      // 先用 this.operator 判断当前的 observable 是否有 operator 
      if(this.operator) {
        this.operator.call(observer, this.source)
      } else {
        // 若是没有 operator 再直接把 observer 传入 _subscribe
        this._subscribe(observer);
      }
      return observer;
    }
    map(callback) {
      const observable = new Observable(); // 创建新的 observable
      
      observable.source = this; // 保存当前的 observable
      
      observable.operator = {
          call: (observer, source) => { 
              // 执行 operator 
              const newObserver = new MapObserver(observer, callback);
              // 创建包裹后的 observer
              // 订阅并回传
              return source.subscribe(newObserver);
          }
      }; // 存储当前 operator ,并做为是否有 operator 的依据,
      
      return observable; // 返回新的 observable
    }
  }
  
  Observable.create = function(subscribe) {
      return new Observable(subscribe);
  }
  
  Observable.fromArray = function(array) {
      if(!Array.isArray(array)) {
          throw new Error('params need to be an array');
      }
      return new Observable(function(observer) {
          try{
              array.forEach(value => observer.next(value))
              observer.complete()
          } catch(err) {
              observer.error(err)
          }
      });
  }
  
  var observable = Observable.fromArray([1,2,3,4,5])
                    .map(x => x + 3)
                    .map(x => x + 1)
  
  var observer = {
    next: function(value) {
      console.log(value)
    },
    complete: function() {
        console.log('complete!')
    }
  }
  
  observable.subscribe(observer);
复制代码

10.Scheduler的基本概念

Scheduler是一个数据结构。它知道如何根据优先级或其余标准来储存并队列任务

Scheduler是一个执行环境。它意味着任务什么时候何地被执行,好比像是当即执行、在回调(callback)中执行、setTimeout中执行、animation frame中执行

Scheduler是一个虚拟时钟。它透过now()这个方法提供了时间的概念,咱们可让任务在特定的时间点被执行。

它有四个scheduler: queue(预设的当即执行,适合用在返回的operator 且有大量资料时使用); asap(非同步的执行,相似setTimeout 0); async(异步,相似setInterval,用在跟时间相关的操做); animationFrame(相似Window.requestAnimationFrame,用在复杂运算且高频率触发UI动画时)

var observable = Rx.Observable.create(function (observer) {
      observer.next(1);
      observer.next(2);
      observer.next(3);
      observer.complete();
  });
  
  console.log('before subscribe');
  observable.observeOn(Rx.Scheduler.async) // 设为 async 异步
  .subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  console.log('after subscribe');
  
  // "before subscribe"
  // "after subscribe"
  // 1
  // 2
  // 3
  // "complete"
复制代码

11.rxjs收尾

如何对rxjs进行debug

  • 用do这个operator.它不会对元素产生任何影响。
const source = Rx.Observable.interval(1000).take(3);
    
    const example = source
                    .do(x => console.log('do log: ' + x))
                    .map(x => x + 1);
    
    example.subscribe((x) => {
        console.log('subscription log: ' + x)
    })
    
    // do log: 0
    // subscription log: 1
    // do log: 1
    // subscription log: 2
    // do log: 2
    // subscription log: 3
复制代码
  • 画Marble Diagram(大理石)图
  • 用RxJS Devtools这个谷歌开发工具
Observable.prototype.debug = window.rxDevTool(Observable);
    
    Observable.interval(1000).take(5)
    .debug('source1')
    .map(x => x + 1)
    .debug('source2')
    .subscribe(function() {
        //...
    })
复制代码

Cold & Hot Observable: 区分不一样行为的Observable。Cold Observable 就是指每次订阅都独立的执行,而Hot Observable则是共同的订阅。而这一切的差别来自因而在Observale的内部创建仍是外部创建。

Cold Observable:下面的代码每次订阅都是独立的,它们之间不会互相影响

const source = Rx.Observable.interval(1000).take(5);
    
    source.subscribe(value => console.log('sub1: ' + value))
    
    setTimeout(() => {
        source.subscribe(value => console.log('sub2: ' + value))    
    }, 3500);
    
    // sub1: 0
    // sub1: 1
    // sub1: 2
    // sub1: 3
    // sub2: 0
    // sub1: 4
    // sub2: 1
    // sub2: 2
    // sub2: 3
    // sub2: 4
复制代码

Hot Observable:每一个订阅都是共用的,具体是指一个Observable在屡次订阅时,不会每次都重新开始发送元素

var source = Rx.Observable.interval(1000)
                .take(5)
                .share(); // 共用
    
    source.subscribe(value => console.log('sub1: ' + value))
    
    setTimeout(() => {
        source.subscribe(value => console.log('sub2: ' + value))    
    }, 3500);
    
    // sub1: 0
    // sub1: 1
    // sub1: 2
    // sub1: 3
    // sub2: 3
    // sub1: 4
    // sub2: 4
复制代码

mpvue中使用rxjs

mpvue其实就是vue的翻版。前面讲到,在某些没法直接使用Observable的前端框架中,咱们能够用subject,因此咱们就用subject在mpvue中,实现一个用户频繁点击支付按钮的功能。

定义一个支付class

import rxwx, { Rx } from 'rxjs-wx/RxWX'

export default class Payment {
  constructor () {
    this.rxSubject = new Rx.Subject()
    this.count = 0
  }
  get () {
    return this.rxSubject
  }
  clear () {
    this.count = 0
  }
  pay () {
    return Rx.Observable.of().multicast(this.rxSubject)
      .do(() => {
        this.count++
        if (this.count > 1) {
          wx.showToast({
            title: '别着急,正在完成支付..',
            icon: 'none'
          })
        }
      })
      .debounceTime(1000)
      .switchMap((payParams) => {
        console.log(payParams)
        return rxwx.requestPayment({
            timeStamp: payParams.payParams,
            nonceStr: payParams.nonceStr,
            package: payParams.package,
            signType: payParams.signType || 'MD5',
            paySign: payParams.paySign
            
        })
         .debounceTime(1000)
      })
  }
}
复制代码

页面伪代码

<template>
    <span @click="payHandle">确认支付</span>
</template>
export default {
    methods: {
        payHandle () {
          this.payment.get().next('xxx') // 调用支付功能
        },
        initPay () {
          this.payment = new Payment()
          this.payment.pay()
            .catch((e) => {
              wx.showToast({
                title: '支付失败',
                icon: 'none'
              })
            })
            .subscribe((resp) => {
              wx.showToast({
                title: '支付成功',
                icon: 'none'
              })
              this.payment.clear() // 重置count
              // 支付成功后balabala...
            })
        }
    },
    mounted () {
        // 初始化
        this.initPay()
      },
}
复制代码
相关文章
相关标签/搜索