How to watch custom event using RXJS in Angular 2?

I have a third party library that I am going to integrate with RxJS. This is a message library called Tiger Text. According to them, I can listen to an event called messages and when the thread has a message, I can use it for further use. The code snippet for it looks like this: -

var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' })

client.signIn('user@mail.com', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {
  onSignedIn(session)
})

function onSignedIn(session) {
  console.log('Signed in as', session.user.displayName)

  client.messages.sendToUser(
    'someone@mail.com',
    'hello!'
  ).then(function (message) {
    console.log('sent', message.body, 'to', message.recipient.displayName)
  })

  client.events.connect()

  client.on('message', function (message) {
    console.log(
      'message event',
      message.sender.displayName,
      'to',
      message.recipient.displayName,
      ':',
      message.body
    )
  })
}

      

Now please take a look at the place where you have the below piece of code.

client.on('message', function (message) {
    console.log(
      'message event',
      message.sender.displayName,
      'to',
      message.recipient.displayName,
      ':',
      message.body
    )
  })

      

I wanted to know how to use RxJS to create an observable from this piece of code to subscribe to a stream, and whenever we have a change, I take the new data and process it as I wish.

Please advice.

+4


source to share


2 answers


You can use to create an observable from a custom event: fromEventPattern

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/fromEventPattern';

const messages = Observable.fromEventPattern(
  handler => client.on('message', handler),
  handler => client.off('message', handler)
);
messages.subscribe(message => console.log(message));

      



You pass functions fromEventPattern

that add and remove event handlers using a special add and remove API mechanism. You didn't include it in your question, but I assumed that the API you are using implements the method off

.

+4


source


For these use cases, you usually don't need to write a custom Observable, and you can only use Observable.create()

. Then it depends if you want to write cold or hot observable.

For cold watchers, you create a value producer on subscribe and close it on unsubscribe:

Observable.create(obs => {
  var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' });
  client.signIn('user@mail.com', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {
    onSignedIn(session);
  });

  client.on('message', function (message) {
    obs.next(...);
  });

  return () => {
    client.close(); // or whatever...
  };
});

      

Or, if you want to write a Hot Observable, the producer will exist independently of any subscription and just add / remove a listener:

var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' });
client.signIn('user@mail.com', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {
  onSignedIn(session);
});

Observable.create(obs => {
  let listener = client.on('message', function (message) {
    obs.next(...);
  });

  () => {
    // remove the event listener somehow
    listener.remove();
  };
});

      



Sometimes you can see how this is resolved with Subject

, but it's usually more difficult than using Observable.create()

it because then you have to handle the create and break logic yourself and also objects have internal state.

Here's a very similar question as yours:

Articles on topics related to your question by a lead RxJS developer:

+4


source







All Articles