Using RxJs and Angular 2 to Handle Server Sent Events
I am trying to display server sent events outgoing values ββin angular 2 / RxJs app.
The backend regularly sends individual lines to the client via server-dispatched events.
I'm not sure how to handle the received values ββon the angular 2 / RxJs side.
Here is my client (ng component):
import {Component, OnInit} from 'angular2/core';
import {Http, Response} from 'angular2/http';
import 'rxjs/Rx';
import {Observable} from 'rxjs/Observable';
@Component({
selector: 'my-app',
template: `<h1>My second Angular 2 App</h1>
<ul>
<li *ngFor="#s of someStrings | async">
a string: {{ s }}
</li>
</ul>
`
})
export class AppComponent implements OnInit {
constructor(private http:Http) {
}
errorMessage:string;
someStrings:string[];
ngOnInit() {
this.getSomeStrings()
.subscribe(
aString => this.someStrings.push(aString),
error => this.errorMessage = <any>error);
}
private getSomeStrings():Observable<string> {
return this.http.get('interval-sse-observable')
.map(this.extractData)
.catch(this.handleError);
}
private extractData(res:Response) {
if (res.status < 200 || res.status >= 300) {
throw new Error('Bad response status: ' + res.status);
}
let body = res.json();
return body || {};
}
private handleError(error:any) {
// In a real world app, we might send the error to remote logging infrastructure
let errMsg = error.message || 'Server error';
console.error(errMsg); // log to console instead
return Observable.throw(errMsg);
}
}
The backend method looks like this (and uses RxJava):
@ResponseStatus(HttpStatus.OK)
@RequestMapping(method = RequestMethod.GET, path = "interval-sse-observable")
public SseEmitter tickSseObservable() {
return RxResponse.sse(
Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
.map(tick -> randomUUID().toString())
);
}
I just noticed that the application is stuck in the request and that nothing is displayed on the page.
I suspect there is a problem with my use of the map method i.e. .map(this.extractData)
...
I would just like to add the incoming strings to an array and display that array in a template that will update as the strings are entered.
Can anyone please help?
edit . Here is a working solution (thanks to Thierry below):
import {Component, OnInit} from 'angular2/core';
import 'rxjs/Rx';
@Component({
selector: 'my-app',
template: `<h1>My second Angular 2 App</h1>
<ul>
<li *ngFor="#s of someStrings">
a string: {{ s }}
</li>
</ul>
`
})
export class AppComponent implements OnInit {
someStrings:string[] = [];
ngOnInit() {
let source = new EventSource('/interval-sse-observable');
source.addEventListener('message', aString => this.someStrings.push(aString.data), false);
}
}
source to share
Here's a working example:
SseService
import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';
declare var EventSource;
@Injectable()
export class SseService {
constructor() {
}
observeMessages(sseUrl: string): Observable<string> {
return new Observable<string>(obs => {
const es = new EventSource(sseUrl);
es.addEventListener('message', (evt) => {
console.log(evt.data);
obs.next(evt.data);
});
return () => es.close();
});
}
}
AppComponent
import {Component, OnDestroy, OnInit} from '@angular/core';
import {SseService} from './shared/services/sse/sse.service';
import {Observable, Subscription} from 'rxjs/Rx';
@Component({
selector: 'my-app',
template: `<h1>Angular Server-Sent Events</h1>
<ul>
<li *ngFor="let message of messages">
{{ message }}
</li>
</ul>
`
})
export class AppComponent implements OnInit, OnDestroy {
private sseStream: Subscription;
messages:Array<string> = [];
constructor(private sseService: SseService){
}
ngOnInit() {
this.sseStream = this.sseService.observeMessages('https://server.com/mysse')
.subscribe(message => {
messages.push(message);
});
}
ngOnDestroy() {
if (this.sseStream) {
this.sseStream.unsubscribe();
}
}
}
source to share
To add to Thierry's answer , the default event type is "message". However, the event type can be anything ("chat", "log", etc.) based on the server side implementation. In my case, the first two events from the server were "message" and the rest of them were "log". My code looks like below
var source = new EventSource('/...');
source.addListener('message', message => {
(...)
});
source.addListener('log', log => {
(...)
});
source to share