Folding two callbacks into one observable
The code snippet below is functional (in the sense that it works ;-)), but it seems like a lame at best and good ...
Can anyone suggest a way to make this more complicated, or at least less ugly?
The code is based on the examples on this page: Wrap an existing API with RxJS
function connect() {
return rx.Observable.create(function (observer) {
mongo.connect('mongodb://127.0.1:27017/things', function(err, db) {
if(err) observer.onError(err);
observer.onNext(db);
});
}).publish().refCount();
}
function getThings(db) {
return rx.Observable.create(function (observer) {
db.collection('things').find().toArray(function(err, results) {
if(err) observer.onError(err);
observer.onNext(results);
observer.onCompleted();
});
return function () {
db.close();
};
}).publish().refCount();
}
connect().subscribe(
function (db) {
getThings(db).subscribe(console.log);
}, function (err) {
console.log(err);
}
);
source to share
For this particular example, assuming getThings()
only one is supposed after connect()
, I would change the implementation getThings()
as such:
function getThings() {
return connect()
.flatMap(function(db) {
return rx.Observable.create(function (observer) {
db.collection('things').find().toArray(function(err, results) {
if(err) observer.onError(err);
observer.onNext(results);
observer.onCompleted();
});
return function () {
db.close();
};
});
});
}
Then you can just subscribe to the stream getThings()
:
getThings().subscribe(console.log);
We used flatMap
to hide the connect step inside everything getThings()
. The FlatMap documentation sounds more complicated, but it's not that hard. It simply replaces the event from the Observable source with another future event. Explained in the diagrams, it replaces each event with a x
future event y
.
---x----------x------->
flatMap( x => --y--> )
------y----------y---->
In our case, the event is x
"successfully connected" and y
"gets" things from the database.
However, there are several different ways to do this, depending on how the application is supposed to work. It's better to think of RxJS as an "Event Bus on steroids" rather than a replacement for the Promises chain, because it really isn't the last one.
RxJS development is best if you model everything that happens in your application as streams of events. Done right, you shouldn't be seeing these "do this, then do this, then do this" chains, because ultimately the imperative paradigm and RxJS can do more. Ideally, it should be more about saying that events are declarative. See this tutorial for more explanation, especially for the discourse in the Completion section. Gist can also help .
source to share