rxjs-流式编程

前言

第一次接触rxjs也是由于angular2应用,内置了rxjs的依赖,了解以后发现它的强大,是一个能够代替promise的框架,可是只处理promise的东西有点拿尚方宝剑砍蚊子的意思。javascript

若是咱们的应用是彻底rxjs的应用,会显得代码比较清晰,代码写的爽。css

angular团队和微软合做,采用的typescript和rxjs,互相宣传。。html

rxjs

rxjs是一个比较简单的库,它只有Observable,Observer,subscription,subject,Operators,Scheduler6个对象概念。比较相似于观察者模式,若是再了解一些函数式编程和node的stream就更好了。java

中文文档node

observable APIgit

observable 可观察对象

observable是一个可观察对象,也相似观察者模式中的可观察对象,后面的Subscription就至关于观察者模式中的订阅者。github

给一个例子:ajax

var observable = Rx.Observable.create(function (observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    setTimeout(() => {
    observer.next(4);
        observer.complete();
    }, 1000);
});

建立了一个Obervable对象,这里用到了create操做符。typescript

create操做符:建立一个新的 Observable ,当观察者( Observer )订阅该 Observable 时,它会执行指定的函数。编程

observer 观察者

如上例子中的observer,给一个典型的observer例子:

var observer={
    next:x=>console.log('Observer got a next value: ' + x),
    error: err => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification')
}

有点相似promise的返回,每来一个“流”就会执行一个next,出错会执行一个observer的error,完成后或者调用complete便再也不监听observable,执行complete函数。这些函数的集合也就是observer。

要使用观察者,须要订阅可观察对象:

observable.subscribe(observer)

Subscription订阅

订阅是一个表示一次性资源的对象,一般是一个可观察对象的执行。

它有一个重要的方法:unsubscribe,顾名思义。。。

好比observable的例子:

var observable = Rx.Observable.create(function (observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    setTimeout(() => {
        observer.next(4);
        observer.complete();
    }, 1000);
});
var observer={
    next:x=>console.log('Observer got a next value: ' + x),
    error: err => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification')
};
observable.subscribe(observer);
//返回
Observer got a next value: 1
Observer got a next value: 2
Observer got a next value: 3
Observer got a next value: 4 //after 1s return
Observer got a complete notification

若是在最后调用subscription.unsubscribe();那么4就不会执行,complete也不会执行,就会取消掉这个观察。

Subject

Subject是容许值被多播到多个观察者的一种特殊的Observable。然而纯粹的可观察对象是单播的(每个订阅的观察者拥有单独的可观察对象的执行)。

subject是Observable对象,而且自带next,error,complete函数,因此咱们不用在定义observer:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);
//返回
observerA: 1
observerB: 1
observerA: 2
observerB: 2

因为subject自带next等等的函数,因此它也是个observer,也能够这样用:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // You can subscribe providing a Subject

Operators操做符

rx由于operators强大,咱们能够流式的处理主要由于有operators在。

操做符是可观察对象上定义的方法,例如.map(...),.filter(...),.merge(...),等等。他们相似fp,返回新的observable而subscription对象也会继承。

好比

Rx.Observable.interval(500).filter(x => x%2==1).subscribe( res => console.log(res) );
// 一秒输出一个数,返回单数。

这里的filter就是操做符,咱们经过操做符来完成一系列的神奇操做。

Scheduler调度者

什么是调度者?调度者控制着什么时候启动一个订阅和什么时候通知被发送。

名称 类型 属性 描述
queue Scheduler 在当前事件帧中调度队列(trampoline 调度器)。迭代操做符使用此调度器。
asap Scheduler 微任务队列上的调度, 使用尽量快的转化机制, 或者是 Node.js 的 process.nextTick(),或者是 Web Worker 的消息通道,或者 setTimeout , 或者其余。异步转化使用此调度器.
async Scheduler 使用 setInterval 调度工做。基于时间的操做符使用此调度器。
animationFrame Scheduler 使用 requestAnimationFrame 调度工做。与平台的重绘同步使用此调度器。
var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
})
.observeOn(Rx.Scheduler.async);

console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

//返回
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

这是由于observeOn(Rx.Scheduler.async)在Observable.create和最终的Observer之间引入了一个代理Observer。

var proxyObserver = {
  next: (val) => {
    Rx.Scheduler.async.schedule(
      (x) => finalObserver.next(x),
      0 /* delay */,
      val /* will be the x for the function above */
    );
  },

  // ...
}

使用rxjs

搜索功能

<input id="text"></input>
<script>
    var text = document.querySelector('#text');
    text.addEventListener('keyup', (e) =>{
        var searchText = e.target.value;
        // 发送输入内容到后台
        $.ajax({
            url: `xx.com/${searchText}`,
            success: data => {
              // 拿到后台返回数据,并展现搜索结果
              render(data);
            }
        });
    });
</script>

以前实现一个搜索效果,其实须要这样的代码,应用到函数节流还须要写为

clearTimeout(timer);
  // 定时器,在 250 毫秒后触发
   timer = setTimeout(() => {
        console.log('发起请求..');
    },250)

还要考虑一种状况,若是咱们搜索了a,而后立刻改成了b,会返回a的结果,这样咱们就须要判断一下:

clearTimeout(timer)
    timer = setTimeout(() => {
        // 声明一个当前所搜的状态变量
        currentSearch = '书'; 

        var searchText = e.target.value;
        $.ajax({
            url: `xx.com/${searchText}`,
            success: data => {
                // 判断后台返回的标志与咱们存的当前搜索变量是否一致
                if (data.search === currentSearch) {
                    // 渲染展现
                    render(data);
                } else {
                    // ..
                }
            }           
    });

这种代码其实就很杂乱了。

若是用rxjs,咱们的代码能简单而且清楚不少:

var text = document.querySelector('#text');
var inputStream = Rx.Observable.fromEvent(text, 'keyup')
                    .debounceTime(250)
                    .pluck('target', 'value')
                    .switchMap(url => Http.get(url))
                    .subscribe(data => render(data));

rxjs几个操做符

forkJoin

rxjs版的promise.all

const getPostOne$ = Rx.Observable.timer(1000).mapTo({id: 1});
const getPostTwo$ = Rx.Observable.timer(2000).mapTo({id: 2});

Rx.Observable.forkJoin(getPostOne$, getPostTwo$).subscribe(res => console.log(res)) 
//返回
[ { id: 1 }, { id: 2 } ]

pairwise

能够保存上一个值

Rx.Observable
  .fromEvent(document, 'scroll')
  .map(e => window.pageYOffset)
  .pairwise()
  .subscribe(pair => console.log(pair)); // pair[1] - pair[0]

switchMap

合并两个流的值,并只发出最新的值

const clicks$ = Rx.Observable.fromEvent(document, 'click');
const innerObservable$ = Rx.Observable.interval(1000);

clicks$.switchMap(event => innerObservable$)
                    .subscribe(val => console.log(val));

每次点击触发才发送interval值,而且点击以后interval从新发送,取消掉以前的值。若是是mergeMap,则不取消以前的值。

toPromise

返回promise

let source = Rx.Observable
  .of(42)
  .toPromise();

source.then((value) => console.log('Value: %s', value));
// => Value: 42

fromPromise

将 Promise 转化为 Observable。

var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
result.subscribe(x => console.log(x), e => console.error(e));

有了和promise相互转化的api,就很方便的用rx,ng2中内置rx,用着不爽就职意改为promise来写。

takeUntil

public takeUntil(notifier: Observable): Observable
发出源 Observable 发出的值,直到notifier:Observable 发出值。

rx.Observable.interval(1000).takeUntil(rx.Observable.fromEvent(document,'click'))

触发interval,而后每次点击中止触发。

因此它还有一个用法就是创建一个stop流,来避免手动调用unsubscribe。

const data$ = this.getData();
   const cancelBtn = this.element.querySelector('.cancel-button');
   const rangeSelector = this.element.querySelector('.rangeSelector');

   const cancel$ = Observable.fromEvent(cancelBtn, 'click');
   const range$ = Observable.fromEvent(rangeSelector, 'change').map(e => e.target.value);
   
   const stop$ = Observable.merge(cancel$, range$.filter(x => x > 500))
   this.subscription = data$.takeUntil(stop$).subscribe(data => this.updateData(data));

rxjs在ng2

先提BehaviorSubject

BehaviorSubject继承自Observable类,它储存着要发射给消费者的最新的值。
不管什么时候一个新的观察者订阅它,都会当即接受到这个来自BehaviorSubject的"当前值"。

好比

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

//返回
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

每次next就传一个值,在observer里面写函数处理。

例子

咱们有一个material table的例子来看。

代码看文最后

咱们作的是一个table中的filter功能,相似find item by name。

通常的思路就是获取这个input的值,函数节流,在咱们的table数据中filter这个name,而后给原来绑定的data赋值。

对于rx的写法就很清楚了。

Observable.fromEvent(this.filter.nativeElement, 'keyup')
        .debounceTime(150)
        .distinctUntilChanged()
        .subscribe(() => {
          if (!this.dataSource) { return; }
          this.dataSource.filter = this.filter.nativeElement.value;
        });

咱们获取输入的值,节流,去重,赋值给this.dataSource,this.dataSource实际上是ExampleDataSource的实例。

ExampleDatabase类是生成数据的类,能够忽略,ExampleDataSource是咱们作处理的一个类,material暴露了一个connect方法,返回的observable直接绑定table的data。

主要的处理在ExampleDataSource里:

export class ExampleDataSource extends DataSource<any> {
  _filterChange = new BehaviorSubject('');
  get filter(): string { return this._filterChange.value; }
  set filter(filter: string) { this._filterChange.next(filter); }

  constructor(private _exampleDatabase: ExampleDatabase) {
    super();
  }

  /** Connect function called by the table to retrieve one stream containing the data to render. */
  connect(): Observable<UserData[]> {
    const displayDataChanges = [
      this._exampleDatabase.dataChange,
      this._filterChange,
    ];

    return Observable.merge(...displayDataChanges).map(() => {
      return this._exampleDatabase.data.slice().filter((item: UserData) => {
        let searchStr = (item.name + item.color).toLowerCase();
        return searchStr.indexOf(this.filter.toLowerCase()) != -1;
      });
    });
  }

咱们设置了filter这个属性的get和set,每次咱们按下按键,给this.dataSource.filter赋值的时候,实际上,咱们调用了BehaviorSubject的next方法,

发了一个事件。咱们还须要merge一下_exampleDatabase.dataChange事件,为了当table数据改变的时候,咱们能作出相应的处理。

而后就用map操做符,filter一下咱们的data数据。给table数据绑定material已经帮咱们作了。

附文:

import {Component, ElementRef, ViewChild} from '@angular/core';
import {DataSource} from '@angular/cdk';
import {BehaviorSubject} from 'rxjs/BehaviorSubject';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/operator/startWith';
import 'rxjs/add/observable/merge';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/debounceTime';
import 'rxjs/add/operator/distinctUntilChanged';
import 'rxjs/add/observable/fromEvent';

@Component({
  selector: 'table-filtering-example',
  styleUrls: ['table-filtering-example.css'],
  templateUrl: 'table-filtering-example.html',
})
export class TableFilteringExample {
  displayedColumns = ['userId', 'userName', 'progress', 'color'];
  exampleDatabase = new ExampleDatabase();
  dataSource: ExampleDataSource | null;

  @ViewChild('filter') filter: ElementRef;

  ngOnInit() {
    this.dataSource = new ExampleDataSource(this.exampleDatabase);
    Observable.fromEvent(this.filter.nativeElement, 'keyup')
        .debounceTime(150)
        .distinctUntilChanged()
        .subscribe(() => {
          if (!this.dataSource) { return; }
          this.dataSource.filter = this.filter.nativeElement.value;
        });
  }
}

/** Constants used to fill up our data base. */
const COLORS = ['maroon', 'red', 'orange', 'yellow', 'olive', 'green', 'purple',
  'fuchsia', 'lime', 'teal', 'aqua', 'blue', 'navy', 'black', 'gray'];
const NAMES = ['Maia', 'Asher', 'Olivia', 'Atticus', 'Amelia', 'Jack',
  'Charlotte', 'Theodore', 'Isla', 'Oliver', 'Isabella', 'Jasper',
  'Cora', 'Levi', 'Violet', 'Arthur', 'Mia', 'Thomas', 'Elizabeth'];

export interface UserData {
  id: string;
  name: string;
  progress: string;
  color: string;
}

/** An example database that the data source uses to retrieve data for the table. */
export class ExampleDatabase {
  /** Stream that emits whenever the data has been modified. */
  dataChange: BehaviorSubject<UserData[]> = new BehaviorSubject<UserData[]>([]);
  get data(): UserData[] { return this.dataChange.value; }

  constructor() {
    // Fill up the database with 100 users.
    for (let i = 0; i < 100; i++) { this.addUser(); }
  }

  /** Adds a new user to the database. */
  addUser() {
    const copiedData = this.data.slice();
    copiedData.push(this.createNewUser());
    this.dataChange.next(copiedData);
  }

  /** Builds and returns a new User. */
  private createNewUser() {
    const name =
        NAMES[Math.round(Math.random() * (NAMES.length - 1))] + ' ' +
        NAMES[Math.round(Math.random() * (NAMES.length - 1))].charAt(0) + '.';

    return {
      id: (this.data.length + 1).toString(),
      name: name,
      progress: Math.round(Math.random() * 100).toString(),
      color: COLORS[Math.round(Math.random() * (COLORS.length - 1))]
    };
  }
}

/**
 * Data source to provide what data should be rendered in the table. Note that the data source
 * can retrieve its data in any way. In this case, the data source is provided a reference
 * to a common data base, ExampleDatabase. It is not the data source's responsibility to manage
 * the underlying data. Instead, it only needs to take the data and send the table exactly what
 * should be rendered.
 */
export class ExampleDataSource extends DataSource<any> {
  _filterChange = new BehaviorSubject('');
  get filter(): string { return this._filterChange.value; }
  set filter(filter: string) { this._filterChange.next(filter); }

  constructor(private _exampleDatabase: ExampleDatabase) {
    super();
  }

  /** Connect function called by the table to retrieve one stream containing the data to render. */
  connect(): Observable<UserData[]> {
    const displayDataChanges = [
      this._exampleDatabase.dataChange,
      this._filterChange,
    ];

    return Observable.merge(...displayDataChanges).map(() => {
      return this._exampleDatabase.data.slice().filter((item: UserData) => {
        let searchStr = (item.name + item.color).toLowerCase();
        return searchStr.indexOf(this.filter.toLowerCase()) != -1;
      });
    });
  }

  disconnect() {}
}
相关文章
相关标签/搜索