RxJs Observed with WebSocket

My angular app uses a web host to communicate with the backend.

In my test case, I have 2 client components. The Observable timer prints two different client IDs as expected.

Each ngOnInit () also prints its client ID.

NOW for some reason subscribing to websocketService.observeClient () gets called 2 times for each message, but this.client.id

always prints the value of the second client.

Here is my client component

@Component({
...
})
export class ClientComponent implements OnInit {

  @Input() client: Client;

  constructor(public websocketService: WebsocketService) {
    Observable.timer(1000, 1000).subscribe(() => console.log(this.client.id));
  }

  ngOnInit() {

    console.log(this.client.id);
    this.websocketService.observeClient().subscribe(data => {
      console.log('message', this.client.id);
    });

  }

}

      

And my websocket service

@Injectable()
export class WebsocketService {

  private observable: Observable<MessageEvent>;
  private observer: Subject<Message>;

  constructor() {

    const socket = new WebSocket('ws://localhost:9091');

    this.observable = Observable.create(
      (observer: Observer<MessageEvent>) => {
        socket.onmessage = observer.next.bind(observer);
        socket.onerror = observer.error.bind(observer);
        socket.onclose = observer.complete.bind(observer);
        return socket.close.bind(socket);
      }
    );

    this.observer = Subject.create({
      next: (data: Message) => {
        if (socket.readyState === WebSocket.OPEN) {
          socket.send(JSON.stringify(data));
        }
      }
    });

  }

  observeClient(): Observable<MessageEvent> {
    return this.observable;
  }

}

      


Edit

Ok, as far as I've read, this is because Observables are unicast objects and I have to use a theme for this, but I don't know how to create a theme.

+3


source to share


3 answers


As of rxjs 5, you can use the built-in websocket function that creates the theme for you. It also reconnects when re-subscribing to a stream after an error. Please refer to this answer:

fooobar.com/questions/2404081 / ...



TL; DR:

 let subject = Observable.webSocket('ws://localhost:8081');
 subject
   .retry()
   .subscribe(
      (msg) => console.log('message received: ' + msg),
      (err) => console.log(err),
      () => console.log('complete')
    );
 subject.next(JSON.stringify({ op: 'hello' }));

      

+5


source


You need to use an operator share

to share it among subscribers.

this.observable = Observable.create(
    (observer: Observer<MessageEvent>) => {
       socket.onmessage = observer.next.bind(observer);
       socket.onerror = observer.error.bind(observer);
       socket.onclose = observer.complete.bind(observer);
       return socket.close.bind(socket);
    }
).share();

      



Also make sure this service is singleton.

Doc: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/share.md

+1


source


Ok I have been suffering with angular and python websockets for a long time. I had some websites for each component and they didn't close, so every time I go through one tab to another (swapping between components) the websocket was created again in the back and I was getting two or more times the same the most.

There are many tutorials out there explaining how to make websites, but not many explain how to close or how to use them.

Here's my slice of bread: The best tuto I found a bit: https://medium.com/@lwojciechowski/websockets-with-angular2-and-rxjs-8b6c5be02fac But not finished yet. It doesn't close WS properly

Here's what I did: (My WebSocketTestService class)

@Injectable()
export class WebSocketTestService {
    public messages: Subject<any>  = new Subject<any>();

    constructor(private wsService: WebSocketService) {
        console.log('constructyor ws synop service')
    }

    public connect() {
        this.messages = <Subject<any>>this.wsService
            .connect('ws://localhost:8000/mytestwsid')
            .map((response: any): any => {
                return JSON.parse(response.data);
            })
    }

    public close() {
        this.wsService.close()
    }
} // end class 

      

And here (in another file for me) the websocket class

import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';
import {Subject} from 'rxjs/Subject';
import {Observer} from "rxjs/Observer";

@Injectable()
export class WebSocketService {
    private subject: Subject<MessageEvent>;
    private subjectData: Subject<number>;
  private ws: any;
    // For chat box
    public connect(url: string): Subject<MessageEvent> {
        if (!this.subject) {
            this.subject = this.create(url);
        }
        return this.subject;
    }

    private create(url: string): Subject<MessageEvent> {
        this.ws = new WebSocket(url);

        let observable = Observable.create(
            (obs: Observer<MessageEvent>) => {
                this.ws.onmessage = obs.next.bind(obs);
                this.ws.onerror   = obs.error.bind(obs);
                this.ws.onclose   = obs.complete.bind(obs);

                return this.ws.close.bind(this.ws);
            });

        let observer = {
            next: (data: Object) => {
                if (this.ws.readyState === WebSocket.OPEN) {
                    this.ws.send(JSON.stringify(data));
                }
            }
        };

        return Subject.create(observer, observable);
  }

  public close() {
    console.log('on closing WS');
    this.ws.close()
    this.subject = null
  }

} // end class WebSocketService

      

And finally, my WSTestService call in the component

this.webSocketTestService.connect();
this.webSocketTestService.messages.subscribe(message => {
      console.log(message);

})

      

and in the most important part ngOnDestroy ()

ngOnDestroy() {
    this.webSocketTestService.messages.unsubscribe();
    this.webSocketTestService.close()

      

Of course add both services to the providers section in app.module

This is the only way I have found to properly close the websocket when changing between views and destroy the components. I'm not sure if this is your case, but I am having a hard time finding an example that works correctly with multiple views and websites. I had similar problems with those you asked, so I hope this works for you. Let me know if my approach is right for you.

I really don't understand why angular calls every ngOnInit for every component, even if it doesn't show up when the app starts. I only want to create a WS when I enter the view, not when the application starts. If you find a solution for this, let me know.

Good luck!

0


source







All Articles