Rxjs-Observable

Observable

建立一个observable

var observable = Rx.Observable.create(function(observer) {
    observer.onNext('Simon');
    observer.onNext('Jen');
    observer.onNext('Sergi');
    observer.onCompleted(); // 成功完成
});

建立一个observer

var observer = Rx.Observer.create(
    function onNext(x) {
        console.log('Next: ' + x);
    },
    function onError(err) {
        console.log('Error: ' + err);
    },
    function onCompleted() {
        console.log('Completed');
    }
);

observable的方式的ajax请求

function get(url) {
    return Rx.Observable.create(function(observer) {
        // Make a traditional Ajax request
        var req = new XMLHttpRequest();
        req.open('GET', url);
        req.onload = function() {
            if (req.status == 200) {
                // If the status is 200, meaning there have been no problems,
                // Yield the result to listeners and complete the sequence
                observer.onNext(req.response);
                observer.onCompleted();
            } else {
                // Otherwise, signal to listeners that there has been an error
                observer.onError(new Error(req.statusText));
            }
        };
        req.onerror = function() {
            observer.onError(new Error("Unknown Error"));
        };
        req.send();
    });
}
// Create an Ajax Observable
var test = get('/api/contents.json');

 在上面的代码中,使用create封装XMLHttpRequest的get函数,若是HTTP GET请求成功了,咱们将发送它的内容并完成那个序列(咱们的observable讲仅仅发送一个结果),不然咱们将发射一个错误。在最后一行,我是使用一个特定的url去调用这个函数。这将会产生一个observable,可是它不会发出任何请求,这个很重要:observable不会作任何事,直到最少有一个observer订阅它,因此让咱们接着以下:ajax

// Subscribe an Observer to it
    test.subscribe(
        function onNext(x) {
            console.log('Result: ' + x);
        },
        function onError(err) {
            console.log('Error: ' + err);
        },
        function onCompleted() {
            console.log('Completed');
        }
    );

操做符(operator)

  • 在rxjs中,改变或者查询序列(sequence)的方法叫作操做符。
  • Rxjs DOM library提供了好多方法根据dom相关的资源去建立Observable。因此咱们就可使用Rx.DOM.Request.get来处理一个get请求了。
Rx.DOM.get('/api/contents.json').subscribe(
        function onNext(data) {
            console.log(data.response);
        },
        function onError(err) {
            console.error(err);
        }
    );

根据Arrays建立Observable

  • 可使用任何array相似或者Iterable对象,经过多功能的from操做符转化为Observable。from操做符使用一个array做为参数,而且返回一个Observable(发射array的每个元素)。
  • from操做符伴随着fromEvent,在RxJS中这是最方便和使用频率最高的操做符。
Rx.Observable
    .from(['Adrià', 'Jen', 'Sergi'])
    .subscribe(
        function(x) {
            console.log('Next: ' + x);
        },
        function(err) {
            console.log('Error:', err);
        }

        function() {
            console.log('Completed');
        }
    );

根据js的Event建立Observable

  • 把一个event转化为了一个Observable:
var allMoves = Rx.Observable.fromEvent(document, 'mousemove');
    allMoves.subscribe(function(e) {
        console.log(e.clientX, e.clientY);
    });
  • 把event转化为Observable解开了event自己的强制约束。更重要的是,咱们能够基于原始的Observable建立一个新的Observable,而且这些新的Observable是独立的,能够别用于其余的任务:
var movesOnTheRight = allMoves.filter(function(e) {
        return e.clientX > window.innerWidth / 2;
    });
    var movesOnTheLeft = allMoves.filter(function(e) {
        return e.clientX < window.innerWidth / 2;
    });
    movesOnTheRight.subscribe(function(e) {
        console.log('Mouse is on the right:', e.clientX);
    });
    movesOnTheLeft.subscribe(function(e) {
        console.log('Mouse is on the left:', e.clientX);
    });
  • Observable是不可改变的,每一涉及到它的操做符都是建立的一个新的Observable

根据Callback函数建立Observable

  • 若是使用第三方的js库基于callback编写代码进行交互总有好多意外。使用fromCallback和fromNodeCallback两个函数咱们能够把咱们的Callback转换为Observable。Node.js老是遵循着调用回调函数首先使用一个error的参数告诉回调函数,发生了错误。当咱们使用fromNodeCallback去建立指定的Node.js风格的回调函数:
var Rx = require('rx'); // Load RxJS
    var fs = require('fs'); // Load Node.js Filesystem module
    // Create an Observable from the readdir method
    var readdir = Rx.Observable.fromNodeCallback(fs.readdir);
    // Send a delayed message
    var source = readdir('/Users/sergi');
    var subscription = source.subscribe(
        function(res) {
            console.log('List of directories: ' + res);
        },
        function(err) {
            console.log('Error: ' + err);
        },
        function() {
            console.log('Done!');
        });
  • 在上面的代码中,咱们建立一个Observable readdir 使用了Node.js的 fs.readdir方法。fs.readdir接受一个目录路径和一个回调函数delayMsg,一旦这个目录内容重置回调函数就会被调用。
  • 咱们使用readdir在一样的参数时咱们传给原始的fs.readdir,减去了那个回调函数。它返会一个Observable,能够合适的使用onNext,onError,onCompleted,当咱们订阅一个Observer到它的时候。
本站公众号
   欢迎关注本站公众号,获取更多信息