Cool Boiled WaterCool Boiled Water Logo
HomeBlog
stream meme

JavaScript Streams: From Beginner to Expert

JS Syntax
Binary
2025 Apr 146442 words|Estimated reading time: 33 minutes

Streams in JavaScript are an elegant pattern for handling data, allowing us to process data as if we were dealing with a flowing stream—bit by bit, rather than loading everything all at once. Imagine the water in a stream, continuously flowing; this is the approach streams take, enabling efficient handling of videos, large files, and real-time data, even in resource-constrained environments.

Why Streams Are Needed

Do you remember the last time you tried to upload a large video file in a browser? The waiting, the freezing, and even the crashes can be quite frustrating. This is where the limitations of the traditional "load everything at once" approach become apparent.

When dealing with large files, the traditional method loads the entire file into memory at once, like pouring a bucket of water into a small cup—overflow is inevitable.

I encountered this issue in a project where users were uploading large files, causing the server's memory usage to spike instantly, eventually leading to a complete service outage. In such scenarios, stream programming is clearly the most appropriate solution. Let's compare the traditional method with stream processing:

// Traditional method - Load the entire file into memory
fs.readFile('huge-video.mp4', (err, data) => {
  if (err) throw err;
  // At this point, the data variable contains the entire file content
  processFile(data);
});

// Stream processing - Read the file in chunks
const readStream = fs.createReadStream('huge-video.mp4');
readStream.on('data', (chunk) => {
  // Process a small portion of data at a time
  processChunk(chunk);
});

After using stream programming, you'll notice that the server's memory usage drops significantly, and the freezing seems to never have occurred. This is the power of stream programming.

In fact, the strength of streams lies in "processing on demand"—you don't have to wait for all the data to start working. It's like sipping coffee while continuously refilling the cup, rather than waiting for a whole pot of coffee to brew before drinking. With streams, the memory footprint for a 1GB file might only be 64KB (default chunk size), a significant efficiency improvement compared to the traditional method's 1GB usage.

Even if the client has enough memory, processing large files all at once can cause noticeable performance issues. The main thread gets blocked, the user interface may freeze, and the overall responsiveness of the application is affected. Similarly, in a Node.js server environment, this blocking directly impacts the ability to handle other requests.

Moreover, modern applications often need to handle real-time data streams, such as live video streaming, real-time log analysis, or financial transaction data. Traditional data processing methods can no longer meet the demand for "processing as it is generated," leading to increased data processing delays and reduced real-time performance.

Therefore, what we need is a data processing mechanism that can quickly handle real-time data, has low memory costs, and is feature-rich. This mechanism is implemented in JavaScript as streams.

Basic Concepts of Streams

Note: Unless otherwise specified, the streams mentioned below refer to streams in the Node.js environment by default.

Streams provide a new paradigm for data processing, allowing us to split data into small chunks (chunks) and process them step by step, just like a continuous flow of water.

The best way to understand the concept of streams is through analogies from everyday life. Imagine water flowing through a pipe: water flows from the source (such as a reservoir) through the pipe system, eventually reaching the destination (such as a faucet). Throughout the process, the water is continuously flowing, not all delivered at once.

Technically, a stream is an abstract representation of an asynchronous sequence of data. It provides a standardized interface for handling continuously arriving data chunks. These data chunks can be:

  • Binary data in files
  • HTTP request bodies
  • Terminal input
  • Any segmentable continuous data

In Node.js, the stream module provides the core API for stream processing. Almost all I/O operations are built on streams, from the file system to HTTP requests and responses, streams are everywhere.

const { Readable, Writable, Transform, Duplex } = require('stream');

Stream processing enables a "pipeline" operation of data, where producers and consumers can work in parallel. Data can be processed immediately as it is generated, greatly improving the system's responsiveness and real-time performance. This is particularly prominent in scenarios such as video transcoding and real-time data analysis.

Performance Comparison Example:

Processing MethodMemory UsageProcessing DelayApplicable Scenarios
Traditional Method~1GBHighSmall Files
Stream Processing~64KB(default chunk size)LowLarge Files/Real-time Data

The main types of streams in Node.js are (we will cover these in detail in the next chapter):

  1. Readable Streams
  2. Writable Streams
  3. Duplex Streams
  4. Transform Streams

Each type of stream is an instance of EventEmitter, meaning they communicate through events. Common stream events include:

  • data - Triggered when there is data available to read
  • end - Triggered when no more data is available to read
  • error - Triggered when an error occurs
  • finish - Triggered when all data has been flushed to the underlying system

In the following chapters, we will delve into the various types of streams in JavaScript, their usage methods, and best practices, helping you master this powerful data processing tool.

The Four Basic Types of Streams

Readable Streams

Readable streams are the source of data, like a reservoir or water source. File reading, HTTP requests, and user input are typical scenarios for readable streams. To describe it professionally, a readable stream is a producer of data, representing a source of data. Its core characteristics are:

  • Data can only be read from the stream, not written to it
  • Supports two data consumption modes: flowing mode and paused mode
  • Automatically handles the backpressure mechanism
  • Data can be piped to writable streams

Typical Use Cases:

  • Reading data from files
  • Receiving HTTP request bodies
  • Reading database query results
  • Any scenario that requires sequential reading of data

Here are some common implementations of readable streams:

const fs = require("fs");

// 1. File Read Stream
const fileStream = fs.createReadStream("./data.txt");

// 2. HTTP Request Stream
const http = require("http");
http.createServer((req, res) => {
  // req is a readable stream
  req.on("data", (chunk) => {
    console.log(`Received ${chunk.length} bytes of data`);
  });
});

// 3. Custom Readable Stream
const { Readable } = require("stream");
const myReadable = new Readable({
  read(size) {
    // Custom data generation logic
    this.push("some data");
    this.push(null); // Indicates the end of data
  },
});

Writable Streams

Writable streams are the consumers of data, representing the destination of data. Its characteristics are:

  • Data can only be written to the stream, not read from it
  • Supports buffering to handle differences in write speeds
  • Provides a drain event to handle backpressure
  • Can receive data piped from readable streams

Typical Use Cases:

  • Writing to files
  • Sending HTTP responses
  • Writing to databases
  • Any scenario that requires sequential writing of data

Here are some common implementations of writable streams:

// 1. File Write Stream
const fileWriter = fs.createWriteStream("./output.txt");

// 2. HTTP Response Stream
http.createServer((req, res) => {
  // res is a writable stream
  res.write("Hello World");
  res.end();
});

// 3. Custom Writable Stream
const { Writable } = require("stream");
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    console.log("Writing data:", chunk.toString());
    callback(); // Indicates the write is complete
  },
});

Duplex Streams

Duplex streams are bidirectional streams, implementing both readable and writable interfaces, with both reading and writing capabilities. Its characteristics are:

  • Can read and write
  • The read and write ends are independent
  • Commonly used in bidirectional communication scenarios

Here are some common implementations of duplex streams, such as TCP socket servers:

const net = require('net');

// Create a TCP server
const server = net.createServer((socket) => {
  // socket is a duplex stream
  socket.write('Welcome to the connection!\n');
  
  socket.on('data', (data) => {
    console.log('Received data from client:', data.toString());
    socket.write('Server received your message\n');
  });
});

server.listen(8080, () => {
  console.log('Server started');
});

// Custom Duplex Stream
const { Duplex } = require('stream');
const myDuplex = new Duplex({
  write(chunk, encoding, callback) {
    console.log('Writing:', chunk.toString());
    callback();
  },
  
  read(size) {
    this.push('Data from duplex stream\n');
    this.push(null);
  }
});

Transform Streams

Transform streams are a special type of duplex stream, specifically used for data transformation. Its characteristics are:

  • Can read and write simultaneously
  • Data written to the write end is transformed and appears at the read end
  • Commonly used in data format conversion, encryption/decryption, etc.

There are many types of transform stream implementations, such as compression/decompression streams, encryption streams, etc.:

const { Transform } = require("stream");
const zlib = require("zlib");

// 1. Custom Transform Stream: Convert input to uppercase
class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

// Usage Example
process.stdin.pipe(new UpperCaseTransform()).pipe(process.stdout);

// 2. Gzip Compression Stream
fs.createReadStream("input.txt")
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream("input.txt.gz"));

// 3. Encryption Stream
const crypto = require("crypto");
const cipher = crypto.createCipher("aes192", "secret-key");

process.stdin.pipe(cipher).pipe(fs.createWriteStream("secret.enc"));

Summary of the Four Stream Types:

Stream TypeReadableWritableCharacteristicsTypical Applications
Readable✓✗Data sourceFile reading, HTTP requests
Writable✗✓Data destinationFile writing, HTTP responses
Duplex✓✓BidirectionalTCP sockets, bidirectional communication
Transform✓✓Data transformationCompression/decompression, encryption/decryption

Understanding these four basic stream types is the foundation of mastering Node.js stream programming. In practical applications, we often combine these basic stream types to build complex data processing pipelines.

Core APIs and Event Mechanisms of Streams

Key Events

data, end, error, etc.

Node.js streams are based on an event-driven model using EventEmitter. Key events include:

  1. Readable Stream Events:

    Readable streams in Node.js have two core states: flowing mode and paused mode.

    Flowing Mode: Data is automatically read from the underlying system and pushed to consumers through events (such as 'data'), without manual intervention. Suitable for high-speed continuous data processing. If no listeners are set up or piped to a destination, data may be lost.

    Paused Mode: The default state, where data must be explicitly fetched by calling stream.read() or by listening to the 'readable' event. Suitable for fine-grained control of the data flow. Switching to flowing mode can be achieved by listening to the 'data' event, calling resume(), or using the pipe() method.

    • data - Triggered when the stream passes a data chunk to the consumer. Once a 'data' event listener is set, the readable stream automatically switches to flowing mode.
    readable.on("data", (chunk) => {
      console.log(`Received ${chunk.length} bytes of data`);
    });
    • end - Triggered when there is no more data to consume from the stream
    • error - Triggered when an underlying system operation fails or an error occurs in the stream implementation
    • close - Triggered when the stream or its underlying resource (such as a file descriptor) is closed
  2. Writable Stream Events:

    • drain - Triggered when the writable stream can receive more data (key event for backpressure)
    • finish - Triggered when end() is called and all data has been flushed to the underlying system
    • pipe/unpipe - Triggered when a readable stream is piped to/from the writable stream

Event-Driven Programming Model

The event mechanism of streams enables an efficient producer-consumer pattern:

// Typical event-driven stream processing
const readable = getReadableStreamSomehow();

readable.on("data", (chunk) => {
  console.log("Processing data chunk:", chunk);

  // Simulate asynchronous processing
  process.nextTick(() => {
    if (!writable.write(chunk)) {
      // Handle backpressure
      readable.pause();
      writable.once("drain", () => readable.resume());
    }
  });
});

readable.on("end", () => {
  console.log("Data reading completed");
  writable.end();
});

readable.on("error", (err) => {
  console.error("Reading error:", err);
});

Important Methods

pipe()

pipe() is one of the most powerful features of streams, establishing a direct channel from a readable stream to a writable stream:

// Basic usage
readable.pipe(writable);

// Chained piping
sourceStream
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(destinationStream);

// Modern alternative (recommended)
const { pipeline } = require("stream/promises");

await pipeline(
  sourceStream,
  transformStream1,
  transformStream2,
  destinationStream
);

pipe() vs pipeline():

Featurepipe()pipeline()
Automatic error propagation✗✓
Promise support✗✓
Backpressure handling✓✓
Stream cleanupManualAutomatic

read()/write()

  1. read(size) - Explicitly reads data from a readable stream

    const data = readable.read(1024); // Try to read 1024 bytes
    if (data !== null) {
      processData(data);
    }
  2. write(chunk[, encoding][, callback]) - Writes data to a writable stream

    This function returns a boolean value indicating whether the writable stream's buffer has reached or exceeded its capacity limit. If the buffer is full or the data volume exceeds the limit, it returns false; otherwise, it returns true.

    Returning false indicates that the current write operation may cause backpressure (backpressure), suggesting pausing writes or waiting for the 'drain' event before continuing. Returning true means the buffer still has space, and more data can be safely written.

    const canWriteMore = writable.write(chunk, "utf8", () => {
      console.log("Write completed");
    });
    
    if (!canWriteMore) {
      // Handle backpressure
    }

pause()/resume()

Control the data flow of a readable stream:

// Pause data flow
readable.pause();

// Resume data flow
readable.resume();

// Practical application: Combined with backpressure management
writable.on("drain", () => {
  console.log("drain event: Can continue writing");
  readable.resume();
});

readable.on("data", (chunk) => {
  if (!writable.write(chunk)) {
    readable.pause();
  }
});

Backpressure Mechanism

What is Backpressure?

Backpressure is the pressure generated in a stream system when the data production speed exceeds the consumption speed. Just like water flowing too fast in a pipe can increase pressure, data flowing too fast can cause:

  1. Memory Accumulation: Unprocessed data accumulates in the buffer
  2. Resource Exhaustion: May exhaust memory or file descriptors
  3. Performance Degradation: Frequent GC triggers, slowing down the system

How to Handle Mismatched Data Production and Consumption Speeds?

  1. Automatic Backpressure Handling with pipe()

    // pipe() internally handles backpressure
    readable.pipe(writable);
  2. Manual Backpressure Control Mode

    readable.on("data", (chunk) => {
      const canContinue = writable.write(chunk);
      if (!canContinue) {
        readable.pause(); // Pause reading
        writable.once("drain", () => {
          readable.resume(); // Resume reading
        });
      }
    });
  3. High Water Mark Configuration

    // Custom buffer threshold (default 16KB)
    const readable = new Readable({
      highWaterMark: 64 * 1024, // 64KB
    });
    
    const writable = new Writable({
      highWaterMark: 128 * 1024, // 128KB
    });
  4. Modern Asynchronous Iteration Approach

    // Node.js 10+ supports asynchronous iterators
    async function processStream() {
      for await (const chunk of readable) {
        const canContinue = writable.write(chunk);
        if (!canContinue) {
          await new Promise((resolve) => {
            writable.once("drain", resolve);
          });
        }
      }
    }

Backpressure Handling Flowchart:

[Readable Stream] --(data too fast)--> [Buffer Full] ↓ [write() returns false] ↓ [Readable Stream Paused] --waiting--> [drain event] ↓ [Readable Stream Resumed]

Practical Example: Custom Backpressure-Aware Transform Stream

const { Transform } = require("stream");

class PressureAwareTransform extends Transform {
  constructor(options) {
    super({ ...options, highWaterMark: 32 * 1024 });
    this.processItem = this.processItem.bind(this);
    this.pending = 0;
    this.concurrency = 4;
  }

  _transform(chunk, enc, cb) {
    this.pending++;
    this.processItem(chunk, cb);

    // Backpressure control
    if (this.pending >= this.concurrency) {
      this.pause();
    }
  }

  async processItem(chunk, cb) {
    try {
      const result = await this.asyncOperation(chunk);
      this.push(result);

      if (--this.pending < this.concurrency) {
        this.resume();
      }
      cb();
    } catch (err) {
      this.emit("error", err);
    }
  }

  async asyncOperation(data) {
    // Simulate asynchronous operation
    return new Promise((resolve) => {
      setTimeout(() => resolve(data.toString().toUpperCase()), 100);
    });
  }
}

Understanding and correctly implementing the backpressure mechanism is key to building robust stream applications. Modern Node.js versions provide various automated handling solutions, but manual control of backpressure is still a necessary optimization in high-performance scenarios.

Creating Custom Streams

Implementing a Custom Readable Stream

Inheriting from the Readable Class

Creating a custom readable stream requires inheriting from the stream.Readable class and implementing the _read() method. This is a common pattern in Node.js stream implementations:

const { Readable } = require("stream");

class MyReadable extends Readable {
  constructor(options) {
    // Call the parent class constructor
    super(options);
    // Initialize custom state
    this.current = 0;
    this.max = options.max || 100;
  }

  // Must implement the _read method
  _read(size) {
    // size parameter indicates the number of bytes requested by the consumer (for reference only)
    if (this.current >= this.max) {
      // Push null to indicate the end of the stream
      this.push(null);
    } else {
      // Generate a data chunk
      const chunk = `Data chunk ${this.current++}\n`;
      // Push the data chunk into the read queue
      this.push(chunk);
    }
  }
}

Implementing the _read Method

Key points:

  1. When called, it should synchronously or asynchronously push data
  2. Must call the push() method to provide data
  3. When data ends, push null
  4. The size parameter is just a suggestion and can be ignored

Usage Example:

// Create an instance of a custom readable stream
const myStream = new MyReadable({ max: 5 });

// Consume data
myStream.on("data", (chunk) => {
  console.log("Received:", chunk.toString());
});

myStream.on("end", () => {
  console.log("Stream ended");
});

Of course, you might also be curious about why there is a createReadStream method that can directly read files and convert them into streams. Let's try to implement one ourselves.

import { Readable } from "stream";
import { open } from "fs/promises";

class FileReadStream extends Readable {
  private fd: number | null = null;
  private position: number = 0;
  private filePath: string;

  constructor(filePath: string) {
    super();
    this.filePath = filePath;
  }

  async _construct(callback: (error?: Error | null) => void) {
    try {
      const fileHandle = await open(this.filePath);
      this.fd = fileHandle.fd;
      callback();
    } catch (error) {
      callback(error as Error);
    }
  }

  _read(size: number) {
    const buffer = Buffer.alloc(size);
    if (this.fd === null) {
      this.push(null);
      return;
    }

    // Read file content
    const fs = require('fs');
    fs.read(this.fd, buffer, 0, size, this.position, (err: NodeJS.ErrnoException | null, bytesRead: number) => {
      if (err) {
        this.destroy(err);
        return;
      }

      // If no data is read, it means the end of the file has been reached
      if (bytesRead === 0) {
        this.push(null);
        return;
      }

      // Update the position pointer
      this.position += bytesRead;
      // Push data to the stream
      this.push(bytesRead < size ? buffer.slice(0, bytesRead) : buffer);
    });
  }
}

// Usage Example
const customStream = new FileReadStream('./weather_station.csv');

customStream.on('data', (chunk) => {
  console.log(chunk.toString());
});

customStream.on('end', () => {
  console.log('Reading completed');
});

customStream.on('error', (err) => {
  console.error('An error occurred:', err);
});

Implementing a Custom Writable Stream

Inheriting from the Writable Class

Custom writable streams need to inherit from stream.Writable and implement the _write() method:

const { Writable } = require("stream");

class MyWritable extends Writable {
  constructor(options) {
    super(options);
    // Initialize custom state
    this.data = [];
  }

  // Must implement the _write method
  _write(chunk, encoding, callback) {
    // chunk: The data buffer to be written
    // encoding: If chunk is a string, specify the character encoding
    console.log("Writing data:", chunk.toString());

    // Store data
    this.data.push(chunk.toString());

    // Simulate an asynchronous operation
    setTimeout(() => {
      // Call callback upon completion (must be called)
      callback();
    }, 100);
  }

  // Optional _final method (triggered when end() is called)
  _final(callback) {
    console.log("All data has been written");
    console.log("Complete data:", this.data.join(""));
    callback();
  }
}

Implementing the _write Method

Key points:

  1. _write() must eventually call callback
  2. callback can be called synchronously or asynchronously
  3. If the write fails, pass an Error to callback
  4. encoding indicates the string encoding (when chunk is a Buffer, it might be 'buffer')

Usage Example:

const writer = new MyWritable();

writer.write('First data chunk\n');
writer.write('Second data chunk\n');
writer.end('Final data'); // Triggers _final

Implementing a Transform Stream

Inheriting from the Transform Class

Transform streams are the most flexible type of custom stream, inheriting from stream.Transform and implementing the _transform() method:

const { Transform } = require("stream");

class MyTransform extends Transform {
  constructor(options) {
    super(options);
    // Initialize transformation state
    this.lineCount = 0;
  }

  // Must implement the _transform method
  _transform(chunk, encoding, callback) {
    // Transformation logic
    const data = chunk.toString();
    this.lineCount += data.split("\n").length - 1;

    // Push transformed data to the readable end
    this.push(data.toUpperCase());

    // Complete processing
    callback();
  }

  // Optional _flush method (triggered before the stream ends)
  _flush(callback) {
    // Output statistics before the stream ends
    this.push(`\nTotal lines: ${this.lineCount}\n`);
    callback();
  }
}

Implementing the _transform Method

Key points:

  1. Must call push() zero or more times to output transformation results
  2. Must call callback to indicate processing is complete
  3. Can use _flush() to perform cleanup operations before the stream ends

Usage Example:

const transform = new MyTransform();

// Use piping to connect
process.stdin.pipe(transform).pipe(process.stdout);

Practical Example: Custom CSV Parser

Here is a complete custom transform stream implementation for converting CSV data into JSON objects:

const { Transform } = require("stream");

class CSVToJSON extends Transform {
  constructor(options) {
    super({ ...options, readableObjectMode: true });
    this.headers = null;
    this.buffer = "";
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();

    // Process complete lines in the buffer
    let lines = this.buffer.split("\n");
    // Keep the last incomplete line
    this.buffer = lines.pop() || "";

    if (!this.headers) {
      // The first line is the header
      this.headers = lines.shift().split(",");
    }

    // Process data lines
    for (const line of lines) {
      if (line.trim()) {
        try {
          const values = this.parseCSVLine(line);
          const obj = {};

          this.headers.forEach((header, i) => {
            obj[header.trim()] = values[i] ? values[i].trim() : "";
          });

          this.push(obj);
        } catch (err) {
          this.emit("error", err);
        }
      }
    }

    callback();
  }

  _flush(callback) {
    // Process the remaining last line in the buffer
    if (this.buffer && this.headers) {
      try {
        const values = this.parseCSVLine(this.buffer);
        const obj = {};

        this.headers.forEach((header, i) => {
          obj[header.trim()] = values[i] ? values[i].trim() : "";
        });

        this.push(obj);
      } catch (err) {
        this.emit("error", err);
      }
    }
    callback();
  }

  parseCSVLine(line) {
    // Simple CSV parsing (actual projects should use more robust parsing logic)
    return line.split(",").map((field) => {
      // Remove quotes from the beginning and end of the field (if present)
      return field.replace(/^"|"$/g, "");
    });
  }
}

Usage Example:

const fs = require("fs");
const { pipeline } = require("stream/promises");

async function processCSV(inputFile) {
  const csvParser = new CSVToJSON();

  csvParser.on("data", (obj) => {
    console.log("Parsed object:", obj);
  });

  csvParser.on("error", (err) => {
    console.error("CSV parsing error:", err);
  });

  await pipeline(fs.createReadStream(inputFile), csvParser);

  console.log("CSV parsing completed");
}

// Assume there is a data.csv file
processCSV("data.csv").catch(console.error);

Key Points for Custom Stream Implementation:

  1. Mode Selection: • objectMode: Process JavaScript objects instead of Buffers/strings • Binary mode: Process raw binary data

  2. Error Handling: • Always pass errors in the callback • Use emit('error') to notify of errors

  3. Performance Considerations: • Avoid CPU-intensive operations in _transform • Consider using worker threads for complex transformations

  4. Stream Lifecycle: • Initialization → Data Processing → End Cleanup • Properly implement _flush for resource cleanup

By creating custom streams, you can integrate any data source or processing logic into Node.js's stream ecosystem, seamlessly working with other stream implementations.

Practical Applications of Streams

File Processing

Large File Read and Write Example

When dealing with large files, stream-based methods can significantly reduce memory usage. Here is an example of file copying, demonstrating how to efficiently handle large files:

const fs = require("fs");
const { pipeline } = require("stream/promises");

async function copyLargeFile(src, dest) {
  console.time("File copy time");

  await pipeline(fs.createReadStream(src), fs.createWriteStream(dest));

  console.timeEnd("File copy time");
}

// Usage Example (assuming bigfile.iso is a 2GB large file)
copyLargeFile("./bigfile.iso", "./copy.iso");

Performance Comparison: Stream vs Traditional Method

We compare the two methods through a benchmark test:

const fs = require("fs");
const { performance } = require("perf_hooks");

// Test file: 500MB test file
const testFile = "./test.data";

// Traditional method
async function traditionalMethod() {
  const start = performance.now();
  const data = await fs.promises.readFile(testFile);
  await fs.promises.writeFile("./traditional-copy.data", data);
  return performance.now() - start;
}

// Stream-based method
async function streamMethod() {
  const start = performance.now();
  await fs.promises.pipeline(
    fs.createReadStream(testFile),
    fs.createWriteStream("./stream-copy.data")
  );
  return performance.now() - start;
}

// Run the test
(async () => {
  console.log("Traditional method time:", await traditionalMethod(), "ms");
  console.log("Stream method time:", await streamMethod(), "ms");

  // Memory usage comparison
  console.log(
    "Traditional method memory peak:",
    process.memoryUsage().rss / 1024 / 1024,
    "MB"
  );
  console.log(
    "Stream method memory peak:",
    process.memoryUsage().rss / 1024 / 1024,
    "MB"
  );
})();

Typical Test Results:

MethodTime for 500MB FileMemory PeakApplicable Scenarios
Traditional Method1200ms~500MBSmall file processing
Stream Method800ms~30MBLarge file processing

HTTP Requests and Responses

Stream Processing of Request Bodies

When handling large HTTP request bodies, stream processing can prevent memory overflow:

const http = require("http");
const { pipeline } = require("stream/promises");
const fs = require("fs");

// File upload server
http
  .createServer(async (req, res) => {
    if (req.method === "POST" && req.url === "/upload") {
      const fileWriter = fs.createWriteStream("./upload.data");

      try {
        await pipeline(req, fileWriter);
        res.end("File uploaded successfully");
      } catch (err) {
        res.statusCode = 500;
        res.end("Upload failed");
      }
    } else {
      res.end("Please use POST method to upload files");
    }
  })
  .listen(3000);

Stream-Generated Response Bodies

For dynamically generating large responses, stream processing can significantly improve TTFB (Time To First Byte):

// Stream a large JSON response
http
  .createServer((req, res) => {
    res.writeHead(200, {
      "Content-Type": "application/json",
      "Transfer-Encoding": "chunked",
    });

    // Start the JSON array
    res.write("[\n");

    // Simulate generating 10,000 records
    let count = 0;
    const max = 10000;
    const interval = setInterval(() => {
      if (count++ < max) {
        res.write(
          JSON.stringify({ id: count, data: "..." }) +
            (count < max ? ",\n" : "\n")
        );
      } else {
        clearInterval(interval);
        res.end("]"); // End the JSON array
      }
    }, 10);
  })
  .listen(3001);

Data Transformation Pipelines

Combining Multiple Streams

The true power of streams lies in their ability to connect multiple processing steps into a pipeline:

const { pipeline } = require("stream/promises");
const zlib = require("zlib");
const crypto = require("crypto");

async function processFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    // Step 1: Decompress
    zlib.createGunzip(),
    // Step 2: Decrypt
    crypto.createDecipheriv("aes-256-cbc", "secret-key", "IV"),
    // Step 3: Convert to uppercase
    new Transform({
      transform(chunk, enc, cb) {
        this.push(chunk.toString().toUpperCase());
        cb();
      },
    }),
    // Step 4: Compress
    zlib.createGzip(),
    fs.createWriteStream(output)
  );
}

Practical Example: Log Processing Pipeline

Here is a complete log processing system example, demonstrating the application of streams in real projects:

const { Transform, pipeline } = require("stream");
const byline = require("byline"); // Library for line-by-line stream processing

// 1. Log Parsing Transform Stream
class LogParser extends Transform {
  _transform(chunk, enc, cb) {
    const logEntry = chunk.toString();
    try {
      const parsed = this.parseLog(logEntry);
      this.push(JSON.stringify(parsed) + "\n");
    } catch (err) {
      // Log parsing errors to another stream
      this.emit("parseError", { err, logEntry });
    }
    cb();
  }

  parseLog(entry) {
    // Actual projects would have more complex parsing logic
    const [timestamp, level, ...message] = entry.split(" ");
    return { timestamp, level, message: message.join(" ") };
  }
}

// 2. Log Filtering Transform Stream
class LogFilter extends Transform {
  constructor(options) {
    super(options);
    this.level = options.level || "error";
  }

  _transform(chunk, enc, cb) {
    const log = JSON.parse(chunk);
    if (log.level === this.level) {
      this.push(chunk);
    }
    cb();
  }
}

// 3. Use a pipeline to process logs
async function processLogs(inputFile, outputFile) {
  const startTime = Date.now();

  // Create processing pipeline
  const logStream = byline(fs.createReadStream(inputFile));
  const parser = new LogParser();
  const filter = new LogFilter({ level: "error" });
  const output = fs.createWriteStream(outputFile);

  // Error handling stream
  const errorStream = new Writable({
    write(chunk, enc, cb) {
      fs.appendFileSync("./parse-errors.log", chunk.logEntry + "\n");
      cb();
    },
  });

  // Listen for parsing errors
  parser.on("parseError", (errObj) => {
    errorStream.write(errObj);
  });

  // Build processing pipeline
  await pipeline(logStream, parser, filter, output);

  console.log(`Log processing completed in ${Date.now() - startTime}ms`);
}

// Run log processing
processLogs("./app.log", "./errors.json").catch(console.error);

Log Processing Pipeline Workflow:

  1. Read: Create a readable stream from the log file
  2. Split Lines: Use byline to split the stream by lines
  3. Parse: Parse each line into a structured JSON object
  4. Filter: Only keep logs at the error level
  5. Output: Write the results to a new file
  6. Error Handling: Handle parsing errors separately

This example demonstrates the powerful combination capabilities of streams, allowing you to build complex but efficient data processing systems. With stream processing, even when handling GB-level log files, memory usage can remain stable.

Best Practices and Performance Optimization for Streams

Error Handling Strategies

Centralized Error Handling

In stream-based applications, it is recommended to use a centralized error handling mechanism to manage errors uniformly:

const { pipeline } = require('stream/promises');

async function processWithPipeline() {
  try {
    await pipeline(
      fs.createReadStream('input.txt'),
      new TransformStream(), // Custom transform stream
      fs.createWriteStream('output.txt')
    );
  } catch (err) {
    // Centralized handling of all potential stream errors
    console.error('Pipeline processing failed:', err);
    // Perform cleanup operations
    await cleanupResources();
    // Retry or exit based on the error
    if (shouldRetry(err)) {
      return processWithPipeline();
    }
    process.exit(1);
  }
}

// Traditional pipe() error handling comparison
readable
  .pipe(transform)
  .pipe(writable)
  .on('error', (err) => {
    // Manual cleanup of all streams is required
    readable.destroy();
    transform.destroy();
    writable.destroy();
    console.error('Processing failed:', err);
  });

Error Propagation Mechanism

Understanding the error propagation behavior in Node.js streams is crucial:

  1. Automatic Propagation:

    • pipeline() automatically propagates errors and cleans up all streams.
    • pipe() does not automatically propagate errors; manual handling is required.
  2. Custom Error Handling:

    class SafeTransform extends Transform {
      _transform(chunk, enc, cb) {
        try {
          const result = this.processData(chunk);
          cb(null, result); // First argument is the error object
        } catch (err) {
          // Handle transformation error
          this.emit('error', new Error(`Data processing failed: ${err.message}`));
          cb(); // Still need to call the callback
        }
      }
    }
  3. Error Categorization Handling:

    // Handle errors based on their type
    stream.on('error', (err) => {
      if (err.code === 'ENOENT') {
        console.error('File not found');
      } else if (err instanceof CustomParseError) {
        console.error('Parsing error:', err.details);
      } else {
        console.error('Unknown error:', err);
      }
    });

Memory Management Techniques

High Water Mark Configuration

The high water mark is a key parameter for controlling stream memory usage:

// Readable stream configuration (default 16KB)
const readable = new Readable({
  highWaterMark: 64 * 1024 // 64KB
});

// Writable stream configuration (default 16KB)
const writable = new Writable({
  highWaterMark: 128 * 1024 // 128KB
});

// Object mode configuration (default 16 objects)
const objectStream = new Transform({
  objectMode: true,
  highWaterMark: 100 // 100 objects
});

Configuration Recommendations: • Adjust based on data chunk size: Use higher values for larger chunks. • Balance memory and throughput: Higher values increase memory usage but improve performance. • Monitor actual usage:

setInterval(() => {
  console.log('Memory usage:', process.memoryUsage().rss);
  console.log('Readable stream buffer:', readable._readableState.length);
  console.log('Writable stream buffer:', writable._writableState.length);
}, 1000);

Buffer Management

Optimizing buffer usage can significantly enhance performance:

  1. Avoid Unnecessary Buffering:

    // Bad practice: Accumulating data in the transform stream
    class BadTransform extends Transform {
      constructor() {
        super();
        this.data = [];
      }
      
      _transform(chunk, enc, cb) {
        this.data.push(chunk); // Memory usage keeps growing
        cb();
      }
    }
    
    // Good practice: Process data immediately
    class GoodTransform extends Transform {
      _transform(chunk, enc, cb) {
        this.push(process(chunk)); // Process and push immediately
        cb();
      }
    }
  2. Reuse Buffers:

    const pool = require('buffer-pool'); // Third-party buffer pool
    
    class PooledTransform extends Transform {
      _transform(chunk, enc, cb) {
        // Allocate a buffer from the pool
        const buffer = pool.alloc(1024);
        
        // Process data into the buffer...
        processDataInto(chunk, buffer);
        
        this.push(buffer);
        pool.free(buffer); // Return buffer to the pool
        cb();
      }
    }

Stream Combination and Reusability

Best Practices for Piping

  1. Prefer pipeline over pipe for error handling:

    const { pipeline } = require('stream/promises');
    
    // Best practice: Automatic error propagation and cleanup
    async function processData() {
      await pipeline(
        fs.createReadStream('input'),
        zlib.createGzip(),
        encryptStream,
        fs.createWriteStream('output.gz.enc')
      );
    }
  2. Readability in piping:

    // Bad practice: Long pipeline is hard to read
    input.pipe(a).pipe(b).pipe(c).pipe(d).pipe(output);
    
    // Good practice: Organize in lines
    input
      .pipe(transformA)
      .pipe(transformB)
      .pipe(transformC)
      .pipe(output);
  3. Adding middleware for processing:

    function createProcessingPipeline() {
      const stream = require('stream');
      const passThrough = new stream.PassThrough();
      
      // Add monitoring middleware
      passThrough.on('data', (chunk) => {
        monitor.recordChunk(chunk.length);
      });
      
      return passThrough;
    }
    
    input
      .pipe(createProcessingPipeline())
      .pipe(output);

Reusable Stream Design Patterns

  1. Factory Pattern for Streams:

    function createCSVParser(options = {}) {
      const parser = new Transform({
        objectMode: true,
        ...options
      });
      
      // Initialization logic...
      
      return parser;
    }
    
    // Usage example
    const csvParser = createCSVParser({ delimiter: '|' });
  2. Decorator Pattern to Enhance Streams:

    function withLogging(stream) {
      const duplex = new Duplex({
        write(chunk, enc, cb) {
          console.log('Writing:', chunk.length, 'bytes');
          stream.write(chunk, enc, cb);
        },
        
        read(size) {
          const chunk = stream.read(size);
          if (chunk) console.log('Reading:', chunk.length, 'bytes');
          this.push(chunk);
        }
      });
      
      // Pass through original stream events
      stream.on('error', err => duplex.emit('error', err));
      
      return duplex;
    }
    
    // Usage example
    const loggedStream = withLogging(fs.createReadStream('data'));
  3. Composite Pattern for Complex Streams:

    class CompositeTransform extends Transform {
      constructor() {
        super();
        this.step1 = new Transform1();
        this.step2 = new Transform2();
        this.step3 = new Transform3();
        
        // Internal piping
        this.step1.pipe(this.step2).pipe(this.step3);
        
        // Proxy data flow
        this.step3.on('data', chunk => this.push(chunk));
      }
      
      _transform(chunk, enc, cb) {
        this.step1.write(chunk, enc, cb);
      }
      
      _flush(cb) {
        this.step1.end(() => {
          this.step3.once('end', cb);
        });
      }
    }

Performance Optimization Checklist:

  1. Use pipeline instead of pipe for error handling.
  2. Adjust high water mark based on data characteristics.
  3. Avoid accumulating large amounts of data in streams.
  4. Consider using buffer pools for buffer reuse.
  5. Design reusable stream components.
  6. Implement composite streams for complex processing.
  7. Add monitoring middleware to track stream performance.

By applying these best practices, you can build efficient and robust stream-based systems capable of handling various data processing needs.

Modern JavaScript Stream APIs

Web Streams API Overview

Browser Support for Streams

The Web Streams API is a modern, standardized stream processing API supported in contemporary browsers. It offers a similar abstraction to Node.js streams but with some differences:

// Creating a readable stream
const readableStream = new ReadableStream({
  start(controller) {
    // Stream initialization logic
    controller.enqueue('Data chunk 1');
    controller.enqueue('Data chunk 2');
    controller.close();
  },
  pull(controller) {
    // Called when the consumer requests more data
  },
  cancel(reason) {
    // Cleanup resources when the stream is canceled
  }
});

// Creating a writable stream
const writableStream = new WritableStream({
  write(chunk) {
    console.log('Writing data:', chunk);
  },
  close() {
    console.log('Stream closed');
  },
  abort(err) {
    console.error('Stream error:', err);
  }
});

// Connecting streams
readableStream.pipeTo(writableStream);

Browser Support:

  • Modern browsers like Chrome, Firefox, Edge, and Safari have implemented the Web Streams API.
  • Suitable for use in Service Workers, Fetch API, and other web platform features.

Differences from Node.js Streams

Similarities:

  1. Both are based on the same data stream processing concepts.
  2. Both support backpressure mechanisms.
  3. Both can be piped together.

Key Differences:

FeatureNode.js StreamsWeb Streams API
Creation MethodInherit base classesUse constructors
Error HandlingEventEmitterPromise-based
Data Chunk TypesBuffers/StringsUint8Array/Strings
Piping Methodspipe()pipeTo(), pipeThrough()
Backpressure Signalspause(), resume()Built-in Promise mechanism
Transform StreamsTransformTransformStream

In Node.js, a ReadableStream can be converted to a Web Stream using Readable.toWeb(), and vice versa with Duplex.fromWeb().

// Node.js Stream to Web Stream
import { Readable } from 'stream';
const nodeReadable = Readable.from(['hello', 'world']);
const webReadableStream = Readable.toWeb(nodeReadable);

// Web Stream to Node.js Stream
const nodeDuplex = Duplex.fromWeb(webDuplexStream);

Stream Processing in Fetch API

Stream-Based Processing of Response Bodies

The Fetch API's response body is inherently a readable stream, enabling stream-based processing of large responses:

// Stream-based processing of a JSON response
async function processLargeJSON(url) {
  const response = await fetch(url);
  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let result = '';
  
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    
    // Process each chunk
    const chunk = decoder.decode(value, { stream: true });
    result += chunk;
    
    // Perform incremental processing
    const partialData = tryParseJSON(result);
    if (partialData) updateUI(partialData);
  }
  
  return JSON.parse(result);
}

// Stream-based file download
async function downloadFile(url, outputStream) {
  const response = await fetch(url);
  const reader = response.body.getReader();
  
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    outputStream.write(value);
  }
  
  outputStream.end();
}

Progressive Rendering Applications

Using stream-based responses allows for progressive content rendering:

// Server-side code (Node.js)
app.get('/streaming-page', (req, res) => {
  // Immediately send HTML headers
  res.write(`
    <!DOCTYPE html>
    <html>
    <head><title>Progressive Rendering</title></head>
    <body>
  `);
  
  // Simulate asynchronous content loading
  const items = ['Item 1', 'Item 2', 'Item 3'];
  let index = 0;
  
  const timer = setInterval(() => {
    if (index < items.length) {
      res.write(`<div>${items[index++]}</div>`);
    } else {
      clearInterval(timer);
      res.end('</body></html>');
    }
  }, 500);
});

// Client-side code
async function renderStreamingResponse() {
  const response = await fetch('/streaming-page');
  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  const contentEl = document.getElementById('content');
  
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    
    const htmlChunk = decoder.decode(value);
    contentEl.innerHTML += htmlChunk;
    
    // Scroll to the latest content
    window.scrollTo(0, document.body.scrollHeight);
  }
}

Stream Applications in Service Workers

Service Workers, combined with the Stream API, enable powerful offline caching and response handling:

// Cache-first stream-based response strategy
self.addEventListener('fetch', (event) => {
  const url = new URL(event.request.url);
  
  if (url.pathname.endsWith('.mp4')) {
    event.respondWith(
      caches.match(event.request).then((cachedResponse) => {
        // Return cached response if available
        if (cachedResponse) {
          return cachedResponse;
        }
        
        // Stream and cache the response
        return fetch(event.request).then((response) => {
          const { readable, writable } = new TransformStream();
          
          // Asynchronously cache data
          response.clone().body.pipeTo(
            new WritableStream({
              write(chunk) {
                // Implement caching logic here
                cache.put(event.request, new Response(chunk));
              }
            })
          );
          
          // Immediately return the response stream
          return new Response(readable, response);
        });
      })
    );
  }
});

// Stream multiple responses into one
self.addEventListener('fetch', (event) => {
  if (event.request.url.endsWith('combined.json')) {
    event.respondWith(
      (async () => {
        const [part1, part2] = await Promise.all([
          fetch('/api/part1'),
          fetch('/api/part2')
        ]);
        
        const { readable, writable } = new TransformStream();
        const writer = writable.getWriter();
        
        // Write JSON start
        writer.write(new TextEncoder().encode('{"combined":['));
        
        // Stream merge two responses
        part1.body.pipeTo(new WritableStream({
          write(chunk) {
            writer.write(chunk);
          }
        }));
        
        writer.write(new TextEncoder().encode(','));
        
        part2.body.pipeTo(new WritableStream({
          write(chunk) {
            writer.write(chunk);
          },
          close() {
            writer.write(new TextEncoder().encode(']}'));
            writer.close();
          }
        }));
        
        return new Response(readable, {
          headers: { 'Content-Type': 'application/json' }
        });
      })()
    );
  }
});

Advantages of Stream Applications in Service Workers:

  1. Faster Content Presentation: Content can be displayed as it is downloaded.
  2. Memory Efficiency: No need to cache the entire response.
  3. Bandwidth Savings: Interrupt unnecessary downloads.
  4. Offline Experience: Combined with Cache API for seamless offline functionality.

Modern Stream-Based Application Architecture Example:

[Client] ← Stream Response → [Service Worker]
                      ↓      ↑
                [Edge Cache] ← Stream Processing → [Origin Server]

Through the Web Streams API, modern JavaScript applications can achieve consistent stream processing across both browser and Node.js environments, providing a standardized solution for handling large data and real-time applications.

Content

Why Streams Are Needed Basic Concepts of Streams The Four Basic Types of Streams Readable Streams Writable Streams Duplex Streams Transform Streams Core APIs and Event Mechanisms of Streams Key Events Important Methods Backpressure Mechanism Creating Custom Streams Implementing a Custom Readable Stream Implementing a Custom Writable Stream Implementing a Transform Stream Practical Example: Custom CSV Parser Practical Applications of Streams File Processing HTTP Requests and Responses Data Transformation Pipelines Best Practices and Performance Optimization for Streams Error Handling Strategies Memory Management Techniques Stream Combination and Reusability Modern JavaScript Stream APIs Web Streams API Overview Stream Processing in Fetch API Stream Applications in Service Workers
Switch To PCThank you for visiting, but please switch to a PC for the best experience.