Using observability reduction and websocket subscription
Trying to figure out how to get my epic move that will subscribe to the web layout and then dispatch some actions as the emitted events travel from the web.
The sample I see is using multiplex and does not actually trigger the websocket subscription and I am a little confused about changing it.
I started it this way. But I believe that the observed contraction perceives
const socket$ = Observable.webSocket<DataEvent>(
"ws://thewebsocketurl"
);
const bankStreamEpic = (action$, store) =>
action$.ofType(START_BANK_STREAM).mergeMap(action => {
console.log("in epic mergeMap");
socket$
.subscribe(
e => {
console.log("dispatch event " + e);
distributeEvent(e);
},
e => {
logger.log("AmbassadorsDataService", "Unclean socket closure");
},
() => {
logger.log("AmbassadorsDataService", "Socket connection closed");
}
)
});
function distributeEvent(event: DataEvent) : void {
//this.logger.log('AmbassadorsDataService', 'Event Received: ' + event.command + ' and id: ' + event.id);
if(event.source === '/ambassadors/bank') {
if( event.command === 'REMOVE') {
removeDataEvent(event);
}
else if(event.command == 'ADD') {
loadDataEvent(event);
}
}
}
This is an error: Uncaught TypeError: you provided 'undefined' where a stream was expected. You can provide Observable, Promise, Array, or Iterable.
Any help would be appreciated!
thank
source to share
In recovery-observable, you will almost never (unless you know why I say "almost") call subscribe
yourself. Instead, Observables are bound and middleware and other operators will handle the subscriptions for you.
If all you want to do is dispatch an action for every event received, it's simple:
const socket$ = Observable.webSocket<DataEvent>(
"ws://thewebsocketurl"
);
const bankStreamEpic = (action$, store) =>
action$.ofType('START_BANK_STREAM')
.mergeMap(action =>
socket$
.map(payload => ({
type: 'BANK_STREAM_MESSAGE',
payload
}))
);
You may (or should not) do some additional customization depending on what message content is received from the socket, but in reality you might be better off putting this other logic in your reducers as it is probably not a side related effect.
You probably need a way to stop the thread, which is simple takeUntil
:
const socket$ = Observable.webSocket<DataEvent>(
"ws://thewebsocketurl"
);
const bankStreamEpic = (action$, store) =>
action$.ofType('START_BANK_STREAM')
.mergeMap(action =>
socket$
.map(payload => ({
type: 'BANK_STREAM_MESSAGE',
payload
}))
.takeUntil(
action$.ofType('STOP_BANK_STREAM')
)
);
I used mergeMap
because you did it, but in this case I think it is switchMap
more appropriate since each one has several of them, seems redundant unless you need to have several, and your question just omits something unique about each ...
source to share