How do I create a StreamTransformer in Dart?

Trying to create a custom StreamTransformer class, however many examples of them seem to be outdated and the one in the documentation is not (which some types of typed languages ​​might consider anyway) as a class (found here: https: // api .dartlang.org / apidocs / channels / stable / dartdoc-viewer / dart: async.StreamTransformer ). This doesn't sound like Dart's approach like Dart, but rather a Javascript way (which I use Dart to avoid).

Many online sources say that this is how you create the StreamTransformer, but there are errors in extending it.

class exampleStreamTransformer extends StreamTransformer
{
  //... (This won't work)
}

      

"Implements" seems to be the way to go, along with implementing the required binding functionality:

class exampleStreamTransformer implements StreamTransformer
{
  Stream bind(Stream stream)
  {
    //... (Go on to return new stream, etc)
  }
}

      

I can't find any examples of such a way, but threw something myself (which is accepted in my IDE, but not accepted at runtime, I get a null object error when it tries to use the pause getter):

class exampleStreamTransformer implements StreamTransformer
{
  StreamController<String> _controller;
  StreamSubscription<String> _subscription;

  Stream bind(Stream stream)
  {
    _controller = new StreamController<String>(
        onListen: ()
        {
          _subscription = stream.listen((data)
          {
            // Transform the data.
            _controller.add(data);
          },
          onError: _controller.addError,
          onDone: _controller.close,
          cancelOnError: true); // Unsure how I'd pass this in?????
        },
        onPause: _subscription.pause,
        onResume: _subscription.resume,
        onCancel: _subscription.cancel,
        sync: true
    );

    return _controller.stream;
  }
}

      

Would like to achieve this in a way like the "typed" way of producing the class, any help is greatly appreciated, thanks.

+3


source to share


3 answers


Good. Here's another working example:

import 'dart:async';

class DuplicateTransformer<S, T> implements StreamTransformer<S, T> {
  StreamController _controller;

  StreamSubscription _subscription;

  bool cancelOnError;

  // Original Stream
  Stream<S> _stream;

  DuplicateTransformer({bool sync: false, this.cancelOnError}) {
    _controller = new StreamController<T>(onListen: _onListen, onCancel: _onCancel, onPause: () {
      _subscription.pause();
    }, onResume: () {
      _subscription.resume();
    }, sync: sync);
  }

  DuplicateTransformer.broadcast({bool sync: false, bool this.cancelOnError}) {
    _controller = new StreamController<T>.broadcast(onListen: _onListen, onCancel: _onCancel, sync: sync);
  }

  void _onListen() {
    _subscription = _stream.listen(onData,
      onError: _controller.addError,
      onDone: _controller.close,
      cancelOnError: cancelOnError);
  }

  void _onCancel() {
    _subscription.cancel();
    _subscription = null;
  }

  /**
   * Transformation
   */

  void onData(S data) {
    _controller.add(data);
    _controller.add(data); /* DUPLICATE EXAMPLE!! REMOVE FOR YOUR OWN IMPLEMENTATION!! */
  }

  /**
   * Bind
   */

  Stream<T> bind(Stream<S> stream) {
    this._stream = stream;
    return _controller.stream;
  }
}

void main() {
  // Create StreamController
  StreamController controller = new StreamController.broadcast();
  // Transform
  Stream s = controller.stream.transform(new DuplicateTransformer.broadcast());

  s.listen((data) {
    print('data: $data');
  }).cancel();

  s.listen((data) {
    print('data2: $data');
  }).cancel();

  s.listen((data) {
    print('data3: $data');
  });

  // Simulate data

  controller.add(1);
  controller.add(2);
  controller.add(3);
}

      



Let me add a few notes:

  • Usage implements

    seems to be the correct way when looking at the source code of the other internal dart transformers.
  • I have implemented both regular and broadcast versions.
  • In the case of a normal thread, you can call cancel / pause / resumt directly on the new thread controller, because we can only listen once.
  • If you are using a broadcast stream, I found out that listen () is only called if no one is listening on that stream. onCancel behaves the same. If the last caller cancels their subscription, onCancel is called. This is why it is safe to use the same functions here.
+3


source


Why don't you use StreamTransformer.fromHandler()

:

import 'dart:async';

void handleData(data, EventSink sink) {
  sink.add(data*2);
}

void main() {
  StreamTransformer doubleTransformer = new StreamTransformer.fromHandlers(handleData: handleData);

  StreamController controller = new StreamController();
  controller.stream.transform(doubleTransformer).listen((data) {
    print('data: $data');
  });

  controller.add(1);
  controller.add(2);
  controller.add(3);
}

      



Output

data: 2
data: 4
data: 6

      

+2


source


https://github.com/dart-lang/sdk/issues/27740#issuecomment-258073139

You can use StreamTransformer.fromHandlers to easily create transformers that simply convert input events to output events.

Example:

new StreamTransformer.fromHandlers(handleData: (String event, EventSink output) {
  if (event.startsWith('data:')) {
    output.add(JSON.decode(event.substring('data:'.length)));
  } else if (event.isNotEmpty) {
    output.addError('Unexpected data from CloudBit stream: "$event"');
  }
});

      

0


source







All Articles