Nodejs stream suspend (defer) and resume (pipe) intermediate pipe

I need to "pause" the stream being read for a certain number of seconds and resume it again. The readable stream is passed to the transform stream, so I cannot use normal methods pause

and resume

, I had to use unpipe

and pipe

. In the transform stream, I can detect the event pipe

and then do unpipe

on a readable stream, then do it pipe

again after a few seconds to resume it (hopefully).

Here is the code:

main.ts

import {Transform, Readable} from 'stream';

const alphaTransform = new class extends Transform {
    constructor() {
        super({
            objectMode: true,
            transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
                let transformed: IterableIterator<string>;
                if (Buffer.isBuffer(chunk)) {
                    transformed = function* () {
                        for (const val of chunk) {
                            yield String.fromCharCode(val);
                        }
                    }();
                } else {
                    transformed = chunk[Symbol.iterator]();
                }
                callback(null,
                    Array.from(transformed).map(s => s.toUpperCase()).join(''));
            }
        });
    }
}

const spyingAlphaTransformStream =  new class extends Transform {
    private oncePaused = false;

    constructor() {
        super({
            transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
                console.log('Before transform:');
                if (Buffer.isBuffer(chunk)) {
                    console.log(chunk.toString('utf-8'));
                    alphaTransform.write(chunk);
                } else {
                    console.log(chunk);
                    alphaTransform.write(chunk, encoding);
                }
                callback(null, alphaTransform.read());
            }
        });

        this.on('pipe', (src: Readable) => {
            if (!this.oncePaused) {
                src.unpipe(this); // Here I unpipe the readable stream
                console.log(`Data event listeners count: ${src.listeners('data').length}`);
                console.log(`Readable state of reader: ${src.readable}`);
                console.log("We paused the reader!!");
                setTimeout(() => {
                    this.oncePaused = true;
                    src.pipe(this); // Here I resume it...hopefully?
                    src.resume();
                    console.log("We unpaused the reader!!");
                    console.log(`Data event listeners count: ${src.listeners('data').length}`);
                    console.log(`Readable state of reader: ${src.readable}`);
                }, 1000);
            }
        });

        this.on('data', (transformed) => {
            console.log('After transform:\n', transformed);
        });
    }
}

const reader = new class extends Readable {
    constructor(private content?: string | Buffer) {
        super({
            read: (size?: number) => {
                if (!this.content) {
                    this.push(null);
                } else {
                    this.push(this.content.slice(0, size));
                    this.content = this.content.slice(size);
                }
            }
        });
    }
} (new Buffer('The quick brown fox jumps over the lazy dog.\n'));

reader.pipe(spyingAlphaTransformStream)
    .pipe(process.stdout);

      

The problem lies in the middle thread spyingAlphaTransformStream

. This is the one that listens for the pipe event and then pauses and resumes the stream being read after 1 second

. The problem is that after it erases the readable stream and then feeds it back again, nothing is written to the standard output, which means that the method is transform

spyingAlphaTransformStream

never called, which means nothing happens on the stream.

I expect the result to look something like this:

Data event listeners count: 0
Readable state of reader: true
We paused the reader!!
We unpaused the reader!!
Data event listeners count: 1
Readable state of reader: true
Before transform:
The quick brown fox jumps over the lazy dog.

After transform:
 THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.

THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.

      

But it actually looks like this:

Data event listeners count: 0
Readable state of reader: true
We paused the reader!!
We unpaused the reader!!
Data event listeners count: 1
Readable state of reader: true

      

Basically nothing can be conveyed from the readable, this is what I can infer from this.

How can I fix this?

package.json

{
  "name": "hello-stream",
  "version": "1.0.0",
  "main": "main.ts",
  "scripts": {
    "start": "npm run build:live",
    "build:live": "nodemon"
  },
  "keywords": [
    "typescript",
    "nodejs",
    "ts-node",
    "cli",
    "node",
    "hello"
  ],
  "license": "WTFPL",
  "devDependencies": {
    "@types/node": "^7.0.21",
    "nodemon": "^1.11.0",
    "ts-node": "^3.0.4",
    "typescript": "^2.3.2"
  },
  "dependencies": {}
}

      

nodemon.json

{
    "ignore": ["node_modules"],
    "delay": "2000ms",
    "execMap": {
        "ts": "ts-node"
    },
    "runOnChangeOnly": false,
    "verbose": true
}

      

tsconfig.json

{
  "compilerOptions": {
    "target": "es2015",
    "module": "commonjs",
    "typeRoots": ["node_modules/@types"],
    "lib": ["es6", "dom"],

    "strict": true,
    "noUnusedLocals": true,
    "types": ["node"]
  }
}

      

+3


source to share


1 answer


The solution was surprisingly simpler than I expected. I had to find a way to defer any callbacks made in the method transform

and wait for the thread to be "ready" before calling the initial callback.

Basically, in the constructor spyingAlphaTransformStream

, I had a boolean to check if the thread was ready, and if not, I kept the callback in the class that will execute the first callback I got into the transform

method. Now, since this first callback has not been executed, the thread receives no further calls , i.e. There is only one pending callback to worry about; so now it's just a waiting game until the thread shows it was ready (this is done with a simple one setTimeout

).

When the thread is "ready", I set the ready boolean to true, then I call the pending callback (if set), at which point the thread continues throughout the thread.

I have a longer example to show how it works:



import {Transform, Readable} from 'stream';

const alphaTransform = new class extends Transform {
    constructor() {
        super({
            objectMode: true,
            transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
                let transformed: IterableIterator<string>;
                if (Buffer.isBuffer(chunk)) {
                    transformed = function* () {
                        for (const val of chunk) {
                            yield String.fromCharCode(val);
                        }
                    }();
                } else {
                    transformed = chunk[Symbol.iterator]();
                }
                callback(null,
                    Array.from(transformed).map(s => s.toUpperCase()).join(''));
            }
        });
    }
}

class LoggingStream extends Transform {
    private pending: () => void;
    private isReady = false;

    constructor(message: string) {
        super({
            objectMode: true,
            transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
                if (!this.isReady) { // ready flag
                    this.pending = () => { // create a pending callback
                        console.log(message);
                        if (Buffer.isBuffer(chunk)) {
                            console.log(`[${new Date().toTimeString()}]: ${chunk.toString('utf-8')}`);
                        } else {
                            console.log(`[${new Date().toTimeString()}]: ${chunk}`);
                        }
                        callback(null, chunk);
                    }
                } else {
                    console.log(message);
                    if (Buffer.isBuffer(chunk)) {
                        console.log(`[${new Date().toTimeString()}]: ${chunk.toString('utf-8')}`);
                    } else {
                        console.log(`[${new Date().toTimeString()}]: ${chunk}`);
                    }
                    callback(null, chunk);
                }
            }
        });

        this.on('pipe', this.pauseOnPipe);
    }

    private pauseOnPipe() {
        this.removeListener('pipe', this.pauseOnPipe);
        setTimeout(() => {
            this.isReady = true; // set ready flag to true
            if (this.pending) { // execute pending callbacks (if any) 
                this.pending();
            }
        }, 3000); // wait three seconds
    }
}

const reader = new class extends Readable {
    constructor(private content?: string | Buffer) {
        super({
            read: (size?: number) => {
                if (!this.content) {
                    this.push(null);
                } else {
                    this.push(this.content.slice(0, size));
                    this.content = this.content.slice(size);
                }
            }
        });
    }
} (new Buffer('The quick brown fox jumps over the lazy dog.\n'));

reader.pipe(new LoggingStream("Before transformation:"))
    .pipe(alphaTransform)
    .pipe(new LoggingStream("After transformation:"))
    .pipe(process.stdout);

      

Output

<Waits about 3 seconds...>

Before transformation:
[11:13:53 GMT-0600 (CST)]: The quick brown fox jumps over the lazy dog.

After transformation:
[11:13:53 GMT-0600 (CST)]: THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.

THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.

      

Note that since JS is single-threaded, both multi-page threads wait the same amount of time before proceeding

+2


source







All Articles