Subscribe to stream using RxJS and twitter-stream-api module

Ok so I am a complete newbie with Rx and unfortunately very new to js and threads in js. Im using this https://github.com/trygve-lie/twitter-stream-api to connect to twitters streaming api and get json objects with tweets. So far I have this code

var Rx = require('rxjs/Rx');

var TwitterStream = require('twitter-stream-api'),
    fs = require('fs');
var filter = 'tweet';
var keys = {
    consumer_key : "key",
    consumer_secret : "secret",
    token : "token",
    token_secret : "tokensecret"
};

var Twitter = new TwitterStream(keys);
Twitter.stream('statuses/filter', {
    track: filter
});

Twitter.on('connection success', function (uri) {
    console.log('connection success', uri); 
});
Twitter.on('data', function (obj) {
    console.log(obj.text);
});

      

I am successfully tweeting to the console, but what I am really trying to learn is working with streams and in particular with RxJS. I've tried every possible way to create an observable. Rx.Observable. create / from etc.

I also tried Twitter.resume () as it seems to pause the default action to resume the thread and watch for it. I get errors like can't .subscribe is not a function. From what I have above, how can I use Rx.Observable to start filtering and playing data?

Thank!

+2


source to share


2 answers


RxJS 5 has no way to convert stream from / to Observable, so you will need to do it yourself. Ideally c Observable.create

.

const Rx = require('rxjs');
const Observable = Rx.Observable;

var TwitterStream = require('twitter-stream-api'),

...

var source$ = Observable.create(observer => {
  var Twitter = new TwitterStream(keys);
  Twitter.stream('statuses/filter', {
    track: filter
  });

  Twitter.on('data', function (obj) {
    observer.next(obj);
  });

  return () => {
    Twitter.close();
  };
});

      

This makes a cold Observable that will only connect to Twitter when you subscribe to it. Static methods Observable.create

allow you to inject values ​​to the observer, return a break function at the end, and then just close the connection. This function is called when unsubscribing or when monitoring ends.

Then you can bind this Observable to whatever you want:



source$.filter(...).map(...)

      

Note that there are also methods Observable.bindCallback()

and Observable.bindNodeCallback()

, but this will not help you in your situation.

More details:

+2


source


Here's an example using desmondmorris / node-twitter and rxjs 5.



const Observable = require('rxjs').Observable;

Observable
  .of(new require('twitter')({
    consumer_key: 'xxxx',
    consumer_secret: 'xxxx',
    access_token_key: 'xxxx',
    access_token_secret: 'xxxx',
  })).mergeMap(twitter =>
    Observable
    .fromEvent(twitter.stream('statuses/filter', {
        track: 'Stack Overflow'
      }),
      'data'))
  .filter(tweet => tweet.user.follow_count > 10000)
  .subscribe(console.log);
      

Run codeHide result


0


source







All Articles