您现在的位置是:网站首页 > 自定义流的实现文章详情

自定义流的实现

什么是自定义流

Node.js中的流(Stream)是处理读写数据的抽象接口。自定义流允许开发者根据特定需求创建自己的流实现。流分为四种基本类型:可读流(Readable)、可写流(Writable)、双工流(Duplex)和转换流(Transform)。通过继承这些基类并实现特定方法,可以创建完全定制的流处理逻辑。

实现可读流

创建自定义可读流需要继承stream.Readable类并实现_read()方法。这个方法在流需要更多数据时被调用,通常会将数据推入内部缓冲区。

const { Readable } = require('stream');

class MyReadable extends Readable {
  constructor(options) {
    super(options);
    this.data = ['a', 'b', 'c', 'd', 'e'];
    this.index = 0;
  }

  _read(size) {
    if (this.index < this.data.length) {
      this.push(this.data[this.index++]);
    } else {
      this.push(null); // 表示数据结束
    }
  }
}

const myReadable = new MyReadable();
myReadable.on('data', (chunk) => {
  console.log(`Received chunk: ${chunk}`);
});

这个例子创建了一个简单的可读流,它会逐个输出数组中的字母。_read()方法中的size参数表示消费者请求的字节数,但可以忽略它。

实现可写流

自定义可写流需要继承stream.Writable并实现_write()方法。这个方法处理写入的数据。

const { Writable } = require('stream');

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    console.log(`Writing: ${chunk.toString()}`);
    // 模拟异步操作
    setTimeout(() => {
      console.log('Write completed');
      callback(); // 必须调用callback表示写入完成
    }, 100);
  }
}

const myWritable = new MyWritable();
myWritable.write('Hello');
myWritable.write('World');
myWritable.end(); // 结束写入

_write()方法接收三个参数:数据块、编码和回调函数。回调函数必须在写入操作完成后调用,无论成功还是失败。

双工流的实现

双工流同时实现可读和可写功能,需要继承stream.Duplex并实现_read()_write()方法。

const { Duplex } = require('stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.data = [];
  }

  _write(chunk, encoding, callback) {
    this.data.push(chunk);
    callback();
  }

  _read(size) {
    if (this.data.length) {
      this.push(this.data.shift());
    } else {
      setTimeout(() => this._read(size), 100);
    }
  }
}

const duplex = new MyDuplex();
duplex.on('data', (chunk) => {
  console.log(`Duplex received: ${chunk}`);
});
duplex.write('Message 1');
duplex.write('Message 2');

这个双工流将写入的数据存储起来,然后逐个读取出来。注意_read()方法在没有数据时会等待而不是直接结束。

转换流的实现

转换流是一种特殊的双工流,用于修改或转换数据。需要继承stream.Transform并实现_transform()方法。

const { Transform } = require('stream');

class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

const upperCase = new UpperCaseTransform();
process.stdin.pipe(upperCase).pipe(process.stdout);

这个转换流将所有输入转换为大写。_transform()方法处理输入数据并通过push()输出转换后的结果。

高级流控制

自定义流可以实现更复杂的控制逻辑,比如背压管理。当可写流处理速度跟不上数据生产速度时,需要正确处理背压。

const { Writable } = require('stream');

class ControlledWritable extends Writable {
  constructor(options) {
    super(options);
    this.buffer = [];
    this.processing = false;
  }

  _write(chunk, encoding, callback) {
    this.buffer.push({ chunk, callback });
    if (!this.processing) {
      this._processBuffer();
    }
  }

  _processBuffer() {
    if (this.buffer.length === 0) {
      this.processing = false;
      return;
    }

    this.processing = true;
    const { chunk, callback } = this.buffer.shift();
    
    // 模拟慢速处理
    setTimeout(() => {
      console.log('Processed:', chunk.toString());
      callback();
      this._processBuffer();
    }, 500);
  }
}

const controlled = new ControlledWritable();
for (let i = 0; i < 10; i++) {
  controlled.write(`Data ${i}`);
}
controlled.end();

这个可写流实现了缓冲机制,确保即使写入速度快于处理速度,数据也不会丢失。

错误处理

自定义流应该正确处理错误,并通过事件或回调通知使用者。

const { Readable } = require('stream');

class ErrorProneReadable extends Readable {
  constructor(options) {
    super(options);
    this.count = 0;
  }

  _read(size) {
    this.count++;
    if (this.count > 3) {
      this.emit('error', new Error('Simulated error'));
      return;
    }
    this.push(`Data ${this.count}`);
  }
}

const errorStream = new ErrorProneReadable();
errorStream
  .on('data', (data) => console.log(data.toString()))
  .on('error', (err) => console.error('Error:', err.message));

流对象模式

默认情况下,流处理Buffer和字符串。通过设置objectMode: true,流可以处理任意JavaScript对象。

const { Readable } = require('stream');

class ObjectReadable extends Readable {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.objects = [
      { id: 1, name: 'First' },
      { id: 2, name: 'Second' },
      { id: 3, name: 'Third' }
    ];
    this.index = 0;
  }

  _read(size) {
    if (this.index < this.objects.length) {
      this.push(this.objects[this.index++]);
    } else {
      this.push(null);
    }
  }
}

const objStream = new ObjectReadable();
objStream.on('data', (obj) => {
  console.log('Received object:', obj);
});

性能优化技巧

实现高性能自定义流需要考虑几个关键因素:

  1. 批量处理:在_write()中累积数据到一定量再处理
  2. 内存管理:避免在流中保存大量数据
  3. 并行处理:使用工作线程处理CPU密集型操作
const { Writable } = require('stream');
const { Worker } = require('worker_threads');

class ParallelTransform extends Writable {
  constructor(options) {
    super(options);
    this.workers = [];
    this.pending = 0;
    this.maxWorkers = 4;
    this.queue = [];
  }

  _write(chunk, encoding, callback) {
    this.queue.push({ chunk, callback });
    this._processQueue();
  }

  _processQueue() {
    while (this.queue.length > 0 && this.pending < this.maxWorkers) {
      const { chunk, callback } = this.queue.shift();
      this.pending++;
      
      const worker = new Worker('./processor.js', { workerData: chunk });
      worker.on('message', (result) => {
        console.log('Processed:', result);
        callback();
        this.pending--;
        this._processQueue();
      });
      
      this.workers.push(worker);
    }
  }
}

实际应用示例

创建一个HTTP请求日志流,将请求数据写入文件并按大小自动分割:

const { Writable } = require('stream');
const fs = require('fs');
const path = require('path');

class RequestLogger extends Writable {
  constructor(dir, options = {}) {
    super(options);
    this.dir = dir;
    this.currentFile = null;
    this.fileSize = 0;
    this.maxSize = options.maxSize || 1024 * 1024; // 1MB
    this.fileIndex = 0;
    this._ensureDirectory();
    this._rotateFile();
  }

  _ensureDirectory() {
    if (!fs.existsSync(this.dir)) {
      fs.mkdirSync(this.dir, { recursive: true });
    }
  }

  _rotateFile() {
    if (this.currentFile) {
      this.currentFile.end();
    }
    
    const filename = path.join(this.dir, `log_${Date.now()}_${++this.fileIndex}.txt`);
    this.currentFile = fs.createWriteStream(filename);
    this.fileSize = 0;
  }

  _write(chunk, encoding, callback) {
    this.fileSize += chunk.length;
    this.currentFile.write(chunk, encoding, (err) => {
      if (err) return callback(err);
      
      if (this.fileSize >= this.maxSize) {
        this._rotateFile();
      }
      callback();
    });
  }

  _final(callback) {
    if (this.currentFile) {
      this.currentFile.end(callback);
    } else {
      callback();
    }
  }
}

// 使用示例
const logger = new RequestLogger('./logs');
http.createServer((req, res) => {
  logger.write(`Request at ${new Date()}: ${req.url}\n`);
  res.end('Logged');
}).listen(3000);

我的名片

网名:~川~

岗位:console.log 调试员

坐标:重庆市-九龙坡区

邮箱:cc@qdcc.cn

沙漏人生

站点信息

  • 建站时间:2013/03/16
  • 本站运行
  • 文章数量
  • 总访问量
微信公众号
每次关注
都是向财富自由迈进的一步