I have a third party library that I am going to integrate with RxJS. This is a message library called Tiger Text. According to them, I can listen to an event called messages and when the thread has a message, I can use it for further use. The code snippet for it looks like this: -

var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' })

client.signIn('', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {

function onSignedIn(session) {
  console.log('Signed in as', session.user.displayName)

  ).then(function (message) {
    console.log('sent', message.body, 'to', message.recipient.displayName)

  client.on('message', function (message) {
      'message event',


Now please take a look at the place where you have the below piece of code.

client.on('message', function (message) {
      'message event',


I wanted to know how to use RxJS to create an observable from this piece of code to subscribe to a stream, and whenever we have a change, I take the new data and process it as I wish.

Please advice.


You can use to create an observable from a custom event: fromEventPattern

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/fromEventPattern';

const messages = Observable.fromEventPattern(
  handler => client.on('message', handler),
  handler =>'message', handler)
messages.subscribe(message => console.log(message));


You pass functions fromEventPattern

that add and remove event handlers using a special add and remove API mechanism. You didn't include it in your question, but I assumed that the API you are using implements the method off




For these use cases, you usually don't need to write a custom Observable, and you can only use Observable.create()

. Then it depends if you want to write cold or hot observable.

For cold watchers, you create a value producer on subscribe and close it on unsubscribe:

Observable.create(obs => {
  var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' });
  client.signIn('', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {

  client.on('message', function (message) {;

  return () => {
    client.close(); // or whatever...


Or, if you want to write a Hot Observable, the producer will exist independently of any subscription and just add / remove a listener:

var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' });
client.signIn('', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {

Observable.create(obs => {
  let listener = client.on('message', function (message) {;

  () => {
    // remove the event listener somehow


Sometimes you can see how this is resolved with Subject

, but it's usually more difficult than using Observable.create()

it because then you have to handle the create and break logic yourself and also objects have internal state.

