Websocket connection in Angular and rxjs?

I have an ngrx / store (v2.2.2) and rxjs (v5.1.0) based application that listens to a web socket for incoming data using an observable. When I run the application, I receive the incoming data flawlessly.

However, after a while (updates are quite rare), the connection seems to get lost and I don't receive any more incoming data. My code:


import { Injectable, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

export class MemberService implements OnInit {

  private websocket: any;
  private destination: string = "wss://notessensei.mybluemix.net/ws/time";

  constructor() { }

  ngOnInit() { }

  listenToTheSocket(): Observable<any> {

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => {
      console.log("WebService Connected to " + this.destination);

    return Observable.create(observer => {
      this.websocket.onmessage = (evt) => {
      .map(res => res.data)



  export class AppComponent implements OnInit {

  constructor(/*private store: Store<fromRoot.State>,*/ private memberService: MemberService) {}

  ngOnInit() {
    this.memberService.listenToTheSocket().subscribe((result: any) => {
      try {
        // const member: any = JSON.parse(result);
        // this.store.dispatch(new MemberActions.AddMember(member));
      } catch (ex) {


What do I need to do to reconnect the websocket when it expires so the observable keeps emitting incoming values?

I looked at some Q&A here , here, and here and it didn't seem to address this question (somehow I could figure it out).

Note: the websocket wss://notessensei.mybluemix.net/ws/time

is live and generates a timestamp once a minute (in case anyone wants to check this).

Advice is welcome!


source to share

3 answers

Actually there is now a WebsocketSubject in rxjs!

 import { webSocket } from 'rxjs/webSocket' // for RxJS 6, for v5 use Observable.webSocket

 let subject = webSocket('ws://localhost:8081');
    (msg) => console.log('message received: ' + msg),
    (err) => console.log(err),
    () => console.log('complete')
 subject.next(JSON.stringify({ op: 'hello' }));


It performs reconnection when re-subscribing to a broken connection. So, for example, write this to reestablish the connection:



See the documentation for more information. Unfortunately, the search box doesn't show the method, but you can find it here:


that # -navigation doesn't work in my browser, so search for "webSocket" on this page.

Source: http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15



This may not be a very good answer, but it is too much for a comment.

The problem might be related to your service:

listenToTheSocket(): Observable<any> {
  this.websocket = new WebSocket(this.destination);

  this.websocket.onopen = () => {
    console.log("WebService Connected to " + this.destination);

  return Observable.create(observer => {
    this.websocket.onmessage = (evt) => {
  .map(res => res.data)


Do you think that in your component you are going to method multiple times ngOnInit

You should try to stick console.log

in ngOnInit

to be sure.

Because if you do this, in your service you will override it with a this.websocket

new one.

Instead, you should try something like this:

export class MemberService implements OnInit {

  private websocket: any;
  private websocketSubject$ = new BehaviorSubject<any>();
  private websocket$ = this.websocketSubject$.asObservable();

  private destination = 'wss://notessensei.mybluemix.net/ws/time';

  constructor() { }

  ngOnInit() { }

  listenToTheSocket(): Observable<any> {
    if (this.websocket) {
      return this.websocket$;

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => console.log('WebService Connected to ${this.destination}');

    this.websocket.onmessage = (res) => this.websocketSubject$.next(res => res.data);



will send the last value if it receives an event before you subscribe to it. Also, as a subject, there is no need to use an operator share




For rxjs 6 - websocket implementation.

import { webSocket } from 'rxjs/websocket';
let subject = webSocket('ws://localhost:8081');




All Articles