Using observability reduction and websocket subscription

Trying to figure out how to get my epic move that will subscribe to the web layout and then dispatch some actions as the emitted events travel from the web.

The sample I see is using multiplex and does not actually trigger the websocket subscription and I am a little confused about changing it.

I started it this way. But I believe that the observed contraction perceives

const socket$ = Observable.webSocket<DataEvent>(
  "ws://thewebsocketurl"
);

const bankStreamEpic = (action$, store) =>
  action$.ofType(START_BANK_STREAM).mergeMap(action => {
    console.log("in epic mergeMap");
    socket$
      .subscribe(
        e => {
          console.log("dispatch event " + e);
         distributeEvent(e);
        },
        e => {
          logger.log("AmbassadorsDataService", "Unclean socket closure");
        },
        () => {
          logger.log("AmbassadorsDataService", "Socket connection closed");
        }
      )
  });

   function distributeEvent(event: DataEvent) : void {
        //this.logger.log('AmbassadorsDataService', 'Event Received: ' + event.command + ' and id: ' + event.id);
        if(event.source === '/ambassadors/bank') {
            if( event.command === 'REMOVE') {
                removeDataEvent(event);
            }
            else if(event.command == 'ADD') {
                loadDataEvent(event);
            }
        }
    }

      

This is an error: Uncaught TypeError: you provided 'undefined' where a stream was expected. You can provide Observable, Promise, Array, or Iterable.

Any help would be appreciated!

thank

+3


source to share


1 answer


In recovery-observable, you will almost never (unless you know why I say "almost") call subscribe

yourself. Instead, Observables are bound and middleware and other operators will handle the subscriptions for you.

If all you want to do is dispatch an action for every event received, it's simple:

const socket$ = Observable.webSocket<DataEvent>(
  "ws://thewebsocketurl"
);

const bankStreamEpic = (action$, store) =>
  action$.ofType('START_BANK_STREAM')
    .mergeMap(action =>
      socket$
        .map(payload => ({
          type: 'BANK_STREAM_MESSAGE',
          payload
        }))
    );

      

You may (or should not) do some additional customization depending on what message content is received from the socket, but in reality you might be better off putting this other logic in your reducers as it is probably not a side related effect.



You probably need a way to stop the thread, which is simple takeUntil

:

const socket$ = Observable.webSocket<DataEvent>(
  "ws://thewebsocketurl"
);

const bankStreamEpic = (action$, store) =>
  action$.ofType('START_BANK_STREAM')
    .mergeMap(action =>
      socket$
        .map(payload => ({
          type: 'BANK_STREAM_MESSAGE',
          payload
        }))
        .takeUntil(
          action$.ofType('STOP_BANK_STREAM')
        )
    );

      

I used mergeMap

because you did it, but in this case I think it is switchMap

more appropriate since each one has several of them, seems redundant unless you need to have several, and your question just omits something unique about each ...

+4


source







All Articles