RxJs Observed with WebSocket
My angular app uses a web host to communicate with the backend.
In my test case, I have 2 client components. The Observable timer prints two different client IDs as expected.
Each ngOnInit () also prints its client ID.
NOW for some reason subscribing to websocketService.observeClient () gets called 2 times for each message, but this.client.id
always prints the value of the second client.
Here is my client component
@Component({
...
})
export class ClientComponent implements OnInit {
@Input() client: Client;
constructor(public websocketService: WebsocketService) {
Observable.timer(1000, 1000).subscribe(() => console.log(this.client.id));
}
ngOnInit() {
console.log(this.client.id);
this.websocketService.observeClient().subscribe(data => {
console.log('message', this.client.id);
});
}
}
And my websocket service
@Injectable()
export class WebsocketService {
private observable: Observable<MessageEvent>;
private observer: Subject<Message>;
constructor() {
const socket = new WebSocket('ws://localhost:9091');
this.observable = Observable.create(
(observer: Observer<MessageEvent>) => {
socket.onmessage = observer.next.bind(observer);
socket.onerror = observer.error.bind(observer);
socket.onclose = observer.complete.bind(observer);
return socket.close.bind(socket);
}
);
this.observer = Subject.create({
next: (data: Message) => {
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify(data));
}
}
});
}
observeClient(): Observable<MessageEvent> {
return this.observable;
}
}
Edit
Ok, as far as I've read, this is because Observables are unicast objects and I have to use a theme for this, but I don't know how to create a theme.
source to share
As of rxjs 5, you can use the built-in websocket function that creates the theme for you. It also reconnects when re-subscribing to a stream after an error. Please refer to this answer:
fooobar.com/questions/2404081 / ...
TL; DR:
let subject = Observable.webSocket('ws://localhost:8081');
subject
.retry()
.subscribe(
(msg) => console.log('message received: ' + msg),
(err) => console.log(err),
() => console.log('complete')
);
subject.next(JSON.stringify({ op: 'hello' }));
source to share
You need to use an operator share
to share it among subscribers.
this.observable = Observable.create(
(observer: Observer<MessageEvent>) => {
socket.onmessage = observer.next.bind(observer);
socket.onerror = observer.error.bind(observer);
socket.onclose = observer.complete.bind(observer);
return socket.close.bind(socket);
}
).share();
Also make sure this service is singleton.
Doc: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/share.md
source to share
Ok I have been suffering with angular and python websockets for a long time. I had some websites for each component and they didn't close, so every time I go through one tab to another (swapping between components) the websocket was created again in the back and I was getting two or more times the same the most.
There are many tutorials out there explaining how to make websites, but not many explain how to close or how to use them.
Here's my slice of bread: The best tuto I found a bit: https://medium.com/@lwojciechowski/websockets-with-angular2-and-rxjs-8b6c5be02fac But not finished yet. It doesn't close WS properly
Here's what I did: (My WebSocketTestService class)
@Injectable()
export class WebSocketTestService {
public messages: Subject<any> = new Subject<any>();
constructor(private wsService: WebSocketService) {
console.log('constructyor ws synop service')
}
public connect() {
this.messages = <Subject<any>>this.wsService
.connect('ws://localhost:8000/mytestwsid')
.map((response: any): any => {
return JSON.parse(response.data);
})
}
public close() {
this.wsService.close()
}
} // end class
And here (in another file for me) the websocket class
import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';
import {Subject} from 'rxjs/Subject';
import {Observer} from "rxjs/Observer";
@Injectable()
export class WebSocketService {
private subject: Subject<MessageEvent>;
private subjectData: Subject<number>;
private ws: any;
// For chat box
public connect(url: string): Subject<MessageEvent> {
if (!this.subject) {
this.subject = this.create(url);
}
return this.subject;
}
private create(url: string): Subject<MessageEvent> {
this.ws = new WebSocket(url);
let observable = Observable.create(
(obs: Observer<MessageEvent>) => {
this.ws.onmessage = obs.next.bind(obs);
this.ws.onerror = obs.error.bind(obs);
this.ws.onclose = obs.complete.bind(obs);
return this.ws.close.bind(this.ws);
});
let observer = {
next: (data: Object) => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
}
}
};
return Subject.create(observer, observable);
}
public close() {
console.log('on closing WS');
this.ws.close()
this.subject = null
}
} // end class WebSocketService
And finally, my WSTestService call in the component
this.webSocketTestService.connect();
this.webSocketTestService.messages.subscribe(message => {
console.log(message);
})
and in the most important part ngOnDestroy ()
ngOnDestroy() {
this.webSocketTestService.messages.unsubscribe();
this.webSocketTestService.close()
Of course add both services to the providers section in app.module
This is the only way I have found to properly close the websocket when changing between views and destroy the components. I'm not sure if this is your case, but I am having a hard time finding an example that works correctly with multiple views and websites. I had similar problems with those you asked, so I hope this works for you. Let me know if my approach is right for you.
I really don't understand why angular calls every ngOnInit for every component, even if it doesn't show up when the app starts. I only want to create a WS when I enter the view, not when the application starts. If you find a solution for this, let me know.
Good luck!
source to share