您现在的位置是:网站首页 > 四种基本流类型文章详情
四种基本流类型
陈川
【
Node.js
】
33559人已围观
6010字
四种基本流类型
Node.js中的流是处理数据的高效方式,尤其适合处理大文件或连续数据。流分为四种基本类型:可读流、可写流、双工流和转换流。每种类型都有特定的应用场景和实现方式。
可读流(Readable Stream)
可读流用于从数据源读取数据。例如文件读取、HTTP请求体或TCP套接字。可读流有两种模式:流动模式(flowing)和暂停模式(paused)。流动模式下数据会自动通过事件推送,而暂停模式需要显式调用read()
方法。
const fs = require('fs');
const readableStream = fs.createReadStream('./large-file.txt');
// 流动模式
readableStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data`);
});
// 暂停模式
readableStream.on('readable', () => {
let chunk;
while ((chunk = readableStream.read()) !== null) {
console.log(`Read ${chunk.length} bytes`);
}
});
可读流还支持管道操作,可以直接将数据导入可写流:
const fs = require('fs');
const zlib = require('zlib');
fs.createReadStream('./large-file.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('./large-file.txt.gz'));
可写流(Writable Stream)
可写流用于向目标写入数据。常见场景包括文件写入、HTTP响应或数据库操作。可写流提供了write()
和end()
方法,并会触发drain
事件当缓冲区清空时。
const fs = require('fs');
const writableStream = fs.createWriteStream('./output.txt');
// 写入数据
writableStream.write('First line\n');
writableStream.write('Second line\n');
writableStream.end('Final line\n');
// 处理完成事件
writableStream.on('finish', () => {
console.log('All writes completed');
});
// 处理错误
writableStream.on('error', (err) => {
console.error('Write error:', err);
});
对于大容量写入,应该监听drain
事件避免内存问题:
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
writer.write(data, encoding, callback);
} else {
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
writer.once('drain', write);
}
}
}
双工流(Duplex Stream)
双工流同时实现了可读和可写接口,可以双向传输数据。典型的例子包括TCP套接字和WebSocket连接。双工流的读写通道是独立的。
const { Duplex } = require('stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
this.data = [];
}
_write(chunk, encoding, callback) {
this.data.push(chunk);
callback();
}
_read(size) {
if (this.data.length) {
this.push(this.data.shift());
} else {
this.push(null);
}
}
}
const duplex = new MyDuplex();
duplex.write('Hello');
duplex.write('World');
duplex.end();
duplex.on('data', (chunk) => {
console.log(chunk.toString()); // 输出: Hello World
});
转换流(Transform Stream)
转换流是特殊的双工流,用于在写入和读取之间修改数据。压缩、加密或数据格式转换等场景常用转换流。它必须实现_transform
方法。
const { Transform } = require('stream');
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
const upperCase = new UpperCaseTransform();
process.stdin.pipe(upperCase).pipe(process.stdout);
// 输入小写文本,输出大写形式
更实用的例子是创建CSV到JSON的转换器:
const { Transform } = require('stream');
const csv = require('csv-parser');
class CsvToJsonTransform extends Transform {
constructor() {
super({ objectMode: true });
this.isFirstChunk = true;
this.headers = [];
}
_transform(chunk, encoding, callback) {
if (this.isFirstChunk) {
this.headers = Object.keys(chunk);
this.isFirstChunk = false;
}
const jsonObj = {};
this.headers.forEach(header => {
jsonObj[header] = chunk[header];
});
this.push(JSON.stringify(jsonObj) + '\n');
callback();
}
}
fs.createReadStream('data.csv')
.pipe(csv())
.pipe(new CsvToJsonTransform())
.pipe(fs.createWriteStream('data.jsonl'));
流的高级用法
流可以组合使用创建复杂的数据处理管道。Node.js的pipeline
方法比传统的.pipe()
更安全,能正确处理错误和清理资源。
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed', err);
} else {
console.log('Pipeline succeeded');
}
}
);
自定义流时,可以通过选项控制行为:
const { Readable } = require('stream');
class CounterStream extends Readable {
constructor(limit, options) {
super(options);
this.limit = limit;
this.count = 1;
}
_read(size) {
if (this.count > this.limit) {
this.push(null); // 结束流
} else {
const buf = Buffer.from(`${this.count++}\n`, 'utf8');
this.push(buf);
}
}
}
const counter = new CounterStream(100, { highWaterMark: 8 });
counter.pipe(process.stdout);
流的错误处理
正确处理流错误至关重要。未捕获的错误可能导致内存泄漏或程序崩溃。最佳实践是为每个流单独添加错误监听器。
const fs = require('fs');
const stream = fs.createReadStream('nonexistent-file.txt');
stream.on('error', (err) => {
console.error('Stream error:', err.message);
// 执行清理操作
});
// 在管道中处理错误
const { pipeline } = require('stream');
const zlib = require('zlib');
pipeline(
fs.createReadStream('big-file.txt'),
zlib.createGzip(),
fs.createWriteStream('big-file.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
}
}
);
流的性能优化
合理配置流选项可以显著提升性能。highWaterMark
选项控制内部缓冲区大小,影响内存使用和吞吐量。
// 创建高性能可写流
const writable = fs.createWriteStream('output.txt', {
highWaterMark: 1024 * 1024, // 1MB缓冲区
encoding: 'utf8'
});
// 对象模式流处理JavaScript对象
const { Transform } = require('stream');
const objectTransform = new Transform({
objectMode: true,
transform(obj, encoding, callback) {
this.push(processObject(obj));
callback();
}
});
对于高吞吐量场景,可以使用stream.pipeline
配合异步生成器:
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);
async function processLargeData() {
const source = fs.createReadStream('input.csv');
const transformer = createTransformStream();
const destination = fs.createWriteStream('output.json');
await pipelineAsync(
source,
csvParser(),
transformer,
destination
);
}
上一篇: 视觉映射(VisualMap)
下一篇: 管道机制(pipe)