您现在的位置是:网站首页 > 自定义流的实现文章详情
自定义流的实现
陈川
【
Node.js
】
25704人已围观
7111字
什么是自定义流
Node.js中的流(Stream)是处理读写数据的抽象接口。自定义流允许开发者根据特定需求创建自己的流实现。流分为四种基本类型:可读流(Readable)、可写流(Writable)、双工流(Duplex)和转换流(Transform)。通过继承这些基类并实现特定方法,可以创建完全定制的流处理逻辑。
实现可读流
创建自定义可读流需要继承stream.Readable
类并实现_read()
方法。这个方法在流需要更多数据时被调用,通常会将数据推入内部缓冲区。
const { Readable } = require('stream');
class MyReadable extends Readable {
constructor(options) {
super(options);
this.data = ['a', 'b', 'c', 'd', 'e'];
this.index = 0;
}
_read(size) {
if (this.index < this.data.length) {
this.push(this.data[this.index++]);
} else {
this.push(null); // 表示数据结束
}
}
}
const myReadable = new MyReadable();
myReadable.on('data', (chunk) => {
console.log(`Received chunk: ${chunk}`);
});
这个例子创建了一个简单的可读流,它会逐个输出数组中的字母。_read()
方法中的size
参数表示消费者请求的字节数,但可以忽略它。
实现可写流
自定义可写流需要继承stream.Writable
并实现_write()
方法。这个方法处理写入的数据。
const { Writable } = require('stream');
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
console.log(`Writing: ${chunk.toString()}`);
// 模拟异步操作
setTimeout(() => {
console.log('Write completed');
callback(); // 必须调用callback表示写入完成
}, 100);
}
}
const myWritable = new MyWritable();
myWritable.write('Hello');
myWritable.write('World');
myWritable.end(); // 结束写入
_write()
方法接收三个参数:数据块、编码和回调函数。回调函数必须在写入操作完成后调用,无论成功还是失败。
双工流的实现
双工流同时实现可读和可写功能,需要继承stream.Duplex
并实现_read()
和_write()
方法。
const { Duplex } = require('stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
this.data = [];
}
_write(chunk, encoding, callback) {
this.data.push(chunk);
callback();
}
_read(size) {
if (this.data.length) {
this.push(this.data.shift());
} else {
setTimeout(() => this._read(size), 100);
}
}
}
const duplex = new MyDuplex();
duplex.on('data', (chunk) => {
console.log(`Duplex received: ${chunk}`);
});
duplex.write('Message 1');
duplex.write('Message 2');
这个双工流将写入的数据存储起来,然后逐个读取出来。注意_read()
方法在没有数据时会等待而不是直接结束。
转换流的实现
转换流是一种特殊的双工流,用于修改或转换数据。需要继承stream.Transform
并实现_transform()
方法。
const { Transform } = require('stream');
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
const upperCase = new UpperCaseTransform();
process.stdin.pipe(upperCase).pipe(process.stdout);
这个转换流将所有输入转换为大写。_transform()
方法处理输入数据并通过push()
输出转换后的结果。
高级流控制
自定义流可以实现更复杂的控制逻辑,比如背压管理。当可写流处理速度跟不上数据生产速度时,需要正确处理背压。
const { Writable } = require('stream');
class ControlledWritable extends Writable {
constructor(options) {
super(options);
this.buffer = [];
this.processing = false;
}
_write(chunk, encoding, callback) {
this.buffer.push({ chunk, callback });
if (!this.processing) {
this._processBuffer();
}
}
_processBuffer() {
if (this.buffer.length === 0) {
this.processing = false;
return;
}
this.processing = true;
const { chunk, callback } = this.buffer.shift();
// 模拟慢速处理
setTimeout(() => {
console.log('Processed:', chunk.toString());
callback();
this._processBuffer();
}, 500);
}
}
const controlled = new ControlledWritable();
for (let i = 0; i < 10; i++) {
controlled.write(`Data ${i}`);
}
controlled.end();
这个可写流实现了缓冲机制,确保即使写入速度快于处理速度,数据也不会丢失。
错误处理
自定义流应该正确处理错误,并通过事件或回调通知使用者。
const { Readable } = require('stream');
class ErrorProneReadable extends Readable {
constructor(options) {
super(options);
this.count = 0;
}
_read(size) {
this.count++;
if (this.count > 3) {
this.emit('error', new Error('Simulated error'));
return;
}
this.push(`Data ${this.count}`);
}
}
const errorStream = new ErrorProneReadable();
errorStream
.on('data', (data) => console.log(data.toString()))
.on('error', (err) => console.error('Error:', err.message));
流对象模式
默认情况下,流处理Buffer和字符串。通过设置objectMode: true
,流可以处理任意JavaScript对象。
const { Readable } = require('stream');
class ObjectReadable extends Readable {
constructor(options) {
super({ ...options, objectMode: true });
this.objects = [
{ id: 1, name: 'First' },
{ id: 2, name: 'Second' },
{ id: 3, name: 'Third' }
];
this.index = 0;
}
_read(size) {
if (this.index < this.objects.length) {
this.push(this.objects[this.index++]);
} else {
this.push(null);
}
}
}
const objStream = new ObjectReadable();
objStream.on('data', (obj) => {
console.log('Received object:', obj);
});
性能优化技巧
实现高性能自定义流需要考虑几个关键因素:
- 批量处理:在
_write()
中累积数据到一定量再处理 - 内存管理:避免在流中保存大量数据
- 并行处理:使用工作线程处理CPU密集型操作
const { Writable } = require('stream');
const { Worker } = require('worker_threads');
class ParallelTransform extends Writable {
constructor(options) {
super(options);
this.workers = [];
this.pending = 0;
this.maxWorkers = 4;
this.queue = [];
}
_write(chunk, encoding, callback) {
this.queue.push({ chunk, callback });
this._processQueue();
}
_processQueue() {
while (this.queue.length > 0 && this.pending < this.maxWorkers) {
const { chunk, callback } = this.queue.shift();
this.pending++;
const worker = new Worker('./processor.js', { workerData: chunk });
worker.on('message', (result) => {
console.log('Processed:', result);
callback();
this.pending--;
this._processQueue();
});
this.workers.push(worker);
}
}
}
实际应用示例
创建一个HTTP请求日志流,将请求数据写入文件并按大小自动分割:
const { Writable } = require('stream');
const fs = require('fs');
const path = require('path');
class RequestLogger extends Writable {
constructor(dir, options = {}) {
super(options);
this.dir = dir;
this.currentFile = null;
this.fileSize = 0;
this.maxSize = options.maxSize || 1024 * 1024; // 1MB
this.fileIndex = 0;
this._ensureDirectory();
this._rotateFile();
}
_ensureDirectory() {
if (!fs.existsSync(this.dir)) {
fs.mkdirSync(this.dir, { recursive: true });
}
}
_rotateFile() {
if (this.currentFile) {
this.currentFile.end();
}
const filename = path.join(this.dir, `log_${Date.now()}_${++this.fileIndex}.txt`);
this.currentFile = fs.createWriteStream(filename);
this.fileSize = 0;
}
_write(chunk, encoding, callback) {
this.fileSize += chunk.length;
this.currentFile.write(chunk, encoding, (err) => {
if (err) return callback(err);
if (this.fileSize >= this.maxSize) {
this._rotateFile();
}
callback();
});
}
_final(callback) {
if (this.currentFile) {
this.currentFile.end(callback);
} else {
callback();
}
}
}
// 使用示例
const logger = new RequestLogger('./logs');
http.createServer((req, res) => {
logger.write(`Request at ${new Date()}: ${req.url}\n`);
res.end('Logged');
}).listen(3000);
上一篇: 流的背压问题
下一篇: Stream的高性能应用