Websocket connection in Angular and rxjs?
I have an ngrx / store (v2.2.2) and rxjs (v5.1.0) based application that listens to a web socket for incoming data using an observable. When I run the application, I receive the incoming data flawlessly.
However, after a while (updates are quite rare), the connection seems to get lost and I don't receive any more incoming data. My code:
Service
import { Injectable, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Injectable()
export class MemberService implements OnInit {
private websocket: any;
private destination: string = "wss://notessensei.mybluemix.net/ws/time";
constructor() { }
ngOnInit() { }
listenToTheSocket(): Observable<any> {
this.websocket = new WebSocket(this.destination);
this.websocket.onopen = () => {
console.log("WebService Connected to " + this.destination);
}
return Observable.create(observer => {
this.websocket.onmessage = (evt) => {
observer.next(evt);
};
})
.map(res => res.data)
.share();
}
}
Subscriber
export class AppComponent implements OnInit {
constructor(/*private store: Store<fromRoot.State>,*/ private memberService: MemberService) {}
ngOnInit() {
this.memberService.listenToTheSocket().subscribe((result: any) => {
try {
console.log(result);
// const member: any = JSON.parse(result);
// this.store.dispatch(new MemberActions.AddMember(member));
} catch (ex) {
console.log(JSON.stringify(ex));
}
})
}
}
What do I need to do to reconnect the websocket when it expires so the observable keeps emitting incoming values?
I looked at some Q&A here , here, and here and it didn't seem to address this question (somehow I could figure it out).
Note: the websocket wss://notessensei.mybluemix.net/ws/time
is live and generates a timestamp once a minute (in case anyone wants to check this).
Advice is welcome!
source to share
Actually there is now a WebsocketSubject in rxjs!
import { webSocket } from 'rxjs/webSocket' // for RxJS 6, for v5 use Observable.webSocket
let subject = webSocket('ws://localhost:8081');
subject.subscribe(
(msg) => console.log('message received: ' + msg),
(err) => console.log(err),
() => console.log('complete')
);
subject.next(JSON.stringify({ op: 'hello' }));
It performs reconnection when re-subscribing to a broken connection. So, for example, write this to reestablish the connection:
subject.retry().subscribe(...)
See the documentation for more information. Unfortunately, the search box doesn't show the method, but you can find it here:
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket
that # -navigation doesn't work in my browser, so search for "webSocket" on this page.
Source: http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15
source to share
This may not be a very good answer, but it is too much for a comment.
The problem might be related to your service:
listenToTheSocket(): Observable<any> {
this.websocket = new WebSocket(this.destination);
this.websocket.onopen = () => {
console.log("WebService Connected to " + this.destination);
}
return Observable.create(observer => {
this.websocket.onmessage = (evt) => {
observer.next(evt);
};
})
.map(res => res.data)
.share();
}
Do you think that in your component you are going to method multiple times ngOnInit
?
You should try to stick console.log
in ngOnInit
to be sure.
Because if you do this, in your service you will override it with a this.websocket
new one.
Instead, you should try something like this:
@Injectable()
export class MemberService implements OnInit {
private websocket: any;
private websocketSubject$ = new BehaviorSubject<any>();
private websocket$ = this.websocketSubject$.asObservable();
private destination = 'wss://notessensei.mybluemix.net/ws/time';
constructor() { }
ngOnInit() { }
listenToTheSocket(): Observable<any> {
if (this.websocket) {
return this.websocket$;
}
this.websocket = new WebSocket(this.destination);
this.websocket.onopen = () => console.log('WebService Connected to ${this.destination}');
this.websocket.onmessage = (res) => this.websocketSubject$.next(res => res.data);
}
}
BehaviorSubject
will send the last value if it receives an event before you subscribe to it. Also, as a subject, there is no need to use an operator share
.
source to share