How to watch custom event using RXJS in Angular 2?
I have a third party library that I am going to integrate with RxJS. This is a message library called Tiger Text. According to them, I can listen to an event called messages and when the thread has a message, I can use it for further use. The code snippet for it looks like this: -
var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' })
client.signIn('user@mail.com', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {
onSignedIn(session)
})
function onSignedIn(session) {
console.log('Signed in as', session.user.displayName)
client.messages.sendToUser(
'someone@mail.com',
'hello!'
).then(function (message) {
console.log('sent', message.body, 'to', message.recipient.displayName)
})
client.events.connect()
client.on('message', function (message) {
console.log(
'message event',
message.sender.displayName,
'to',
message.recipient.displayName,
':',
message.body
)
})
}
Now please take a look at the place where you have the below piece of code.
client.on('message', function (message) {
console.log(
'message event',
message.sender.displayName,
'to',
message.recipient.displayName,
':',
message.body
)
})
I wanted to know how to use RxJS to create an observable from this piece of code to subscribe to a stream, and whenever we have a change, I take the new data and process it as I wish.
Please advice.
source to share
You can use to create an observable from a custom event: fromEventPattern
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/fromEventPattern';
const messages = Observable.fromEventPattern(
handler => client.on('message', handler),
handler => client.off('message', handler)
);
messages.subscribe(message => console.log(message));
You pass functions fromEventPattern
that add and remove event handlers using a special add and remove API mechanism. You didn't include it in your question, but I assumed that the API you are using implements the method off
.
source to share
For these use cases, you usually don't need to write a custom Observable, and you can only use Observable.create()
. Then it depends if you want to write cold or hot observable.
For cold watchers, you create a value producer on subscribe and close it on unsubscribe:
Observable.create(obs => {
var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' });
client.signIn('user@mail.com', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {
onSignedIn(session);
});
client.on('message', function (message) {
obs.next(...);
});
return () => {
client.close(); // or whatever...
};
});
Or, if you want to write a Hot Observable, the producer will exist independently of any subscription and just add / remove a listener:
var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' });
client.signIn('user@mail.com', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {
onSignedIn(session);
});
Observable.create(obs => {
let listener = client.on('message', function (message) {
obs.next(...);
});
() => {
// remove the event listener somehow
listener.remove();
};
});
Sometimes you can see how this is resolved with Subject
, but it's usually more difficult than using Observable.create()
it because then you have to handle the create and break logic yourself and also objects have internal state.
Here's a very similar question as yours:
Articles on topics related to your question by a lead RxJS developer:
source to share