var observable = Rx.Observable.create(function(observer) { observer.onNext('Simon'); observer.onNext('Jen'); observer.onNext('Sergi'); observer.onCompleted(); // 成功完成 });
var observer = Rx.Observer.create( function onNext(x) { console.log('Next: ' + x); }, function onError(err) { console.log('Error: ' + err); }, function onCompleted() { console.log('Completed'); } );
function get(url) { return Rx.Observable.create(function(observer) { // Make a traditional Ajax request var req = new XMLHttpRequest(); req.open('GET', url); req.onload = function() { if (req.status == 200) { // If the status is 200, meaning there have been no problems, // Yield the result to listeners and complete the sequence observer.onNext(req.response); observer.onCompleted(); } else { // Otherwise, signal to listeners that there has been an error observer.onError(new Error(req.statusText)); } }; req.onerror = function() { observer.onError(new Error("Unknown Error")); }; req.send(); }); } // Create an Ajax Observable var test = get('/api/contents.json');
在上面的代码中,使用create封装XMLHttpRequest的get函数,若是HTTP GET请求成功了,咱们将发送它的内容并完成那个序列(咱们的observable讲仅仅发送一个结果),不然咱们将发射一个错误。在最后一行,我是使用一个特定的url去调用这个函数。这将会产生一个observable,可是它不会发出任何请求,这个很重要:observable不会作任何事,直到最少有一个observer订阅它,因此让咱们接着以下:ajax
// Subscribe an Observer to it test.subscribe( function onNext(x) { console.log('Result: ' + x); }, function onError(err) { console.log('Error: ' + err); }, function onCompleted() { console.log('Completed'); } );
Rx.DOM.get('/api/contents.json').subscribe( function onNext(data) { console.log(data.response); }, function onError(err) { console.error(err); } );
Rx.Observable .from(['Adrià', 'Jen', 'Sergi']) .subscribe( function(x) { console.log('Next: ' + x); }, function(err) { console.log('Error:', err); } function() { console.log('Completed'); } );
var allMoves = Rx.Observable.fromEvent(document, 'mousemove'); allMoves.subscribe(function(e) { console.log(e.clientX, e.clientY); });
var movesOnTheRight = allMoves.filter(function(e) { return e.clientX > window.innerWidth / 2; }); var movesOnTheLeft = allMoves.filter(function(e) { return e.clientX < window.innerWidth / 2; }); movesOnTheRight.subscribe(function(e) { console.log('Mouse is on the right:', e.clientX); }); movesOnTheLeft.subscribe(function(e) { console.log('Mouse is on the left:', e.clientX); });
var Rx = require('rx'); // Load RxJS var fs = require('fs'); // Load Node.js Filesystem module // Create an Observable from the readdir method var readdir = Rx.Observable.fromNodeCallback(fs.readdir); // Send a delayed message var source = readdir('/Users/sergi'); var subscription = source.subscribe( function(res) { console.log('List of directories: ' + res); }, function(err) { console.log('Error: ' + err); }, function() { console.log('Done!'); });