您现在的位置是:网站首页 > 常见Stream应用场景文章详情
常见Stream应用场景
陈川
【
Node.js
】
51623人已围观
5851字
文件操作
Stream在文件操作中非常常见,特别是处理大文件时。Node.js的fs
模块提供了创建文件流的接口,可以高效地读写文件,避免内存溢出。
const fs = require('fs');
// 读取大文件
const readStream = fs.createReadStream('largefile.txt', 'utf8');
readStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
// 写入文件
const writeStream = fs.createWriteStream('output.txt');
readStream.pipe(writeStream);
这种流式处理方式特别适合日志文件分析、大文件传输等场景。相比一次性读取整个文件,流可以分块处理数据,显著降低内存消耗。
HTTP请求与响应
Node.js的HTTP模块内置了Stream支持,请求和响应对象本身就是可读流和可写流。这使得处理HTTP请求体或发送大响应变得高效。
const http = require('http');
http.createServer((req, res) => {
// req是一个可读流
let body = '';
req.on('data', (chunk) => {
body += chunk;
});
req.on('end', () => {
// 处理请求体
res.end('Request received');
});
// 流式响应
if (req.url === '/stream') {
const fileStream = fs.createReadStream('largefile.pdf');
fileStream.pipe(res);
}
}).listen(3000);
这种模式在文件下载、视频流媒体等场景特别有用,可以实现边读取边发送,而不是等待整个文件加载完毕。
数据转换与处理
Stream非常适合数据转换场景,通过管道(pipeline)将多个处理步骤连接起来。常见的转换操作包括压缩、加密、编码转换等。
const zlib = require('zlib');
const crypto = require('crypto');
// 文件压缩管道
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'));
// 加密管道
const cipher = crypto.createCipher('aes192', 'secret-key');
fs.createReadStream('sensitive.data')
.pipe(cipher)
.pipe(fs.createWriteStream('sensitive.enc'));
这种模式也常用于ETL(提取、转换、加载)流程,可以构建复杂的数据处理管道而不会耗尽内存。
数据库操作
与数据库交互时,Stream可以帮助高效处理大量数据。许多Node.js数据库驱动支持返回查询结果流。
const { Client } = require('pg');
const client = new Client();
client.connect();
const queryStream = client.query(new Query('SELECT * FROM large_table'));
queryStream.on('data', (row) => {
// 处理每一行数据
console.log(row);
});
queryStream.on('end', () => {
client.end();
});
对于MongoDB,也有类似的游标流:
const mongoose = require('mongoose');
const User = mongoose.model('User');
const stream = User.find().cursor();
stream.on('data', (doc) => {
// 处理每个文档
}).on('end', () => {
// 完成
});
实时数据处理
Stream非常适合实时数据处理场景,如日志处理、实时分析等。可以构建处理管道来实时转换和分析数据。
const { Transform } = require('stream');
// 自定义转换流:统计字节数
class ByteCounter extends Transform {
constructor() {
super();
this.bytes = 0;
}
_transform(chunk, encoding, callback) {
this.bytes += chunk.length;
this.push(chunk);
callback();
}
}
const counter = new ByteCounter();
process.stdin
.pipe(counter)
.pipe(process.stdout)
.on('finish', () => {
console.log(`\nTotal bytes processed: ${counter.bytes}`);
});
这种模式可以扩展到更复杂的实时分析场景,如计算移动平均值、检测异常模式等。
多路复用与解复用
Stream可以用于实现多路复用,将多个流合并为一个,或者从一个流中分离出多个流。
const { PassThrough } = require('stream');
// 创建两个输出流
const out1 = new PassThrough();
const out2 = new PassThrough();
// 数据会被发送到两个流
const input = fs.createReadStream('input.txt');
input.pipe(out1);
input.pipe(out2);
// 统计两个流的字节数
let count1 = 0, count2 = 0;
out1.on('data', (chunk) => { count1 += chunk.length; });
out2.on('data', (chunk) => { count2 += chunk.length; });
这种技术在广播场景、数据备份等场景很有用,可以实现"一对多"的数据分发。
自定义流实现
当内置流类型不满足需求时,可以创建自定义流。Node.js提供了基类来创建可读、可写、双工或转换流。
const { Readable } = require('stream');
// 自定义可读流:生成随机数
class RandomStream extends Readable {
constructor(options) {
super(options);
this.maxNumbers = options.maxNumbers || 100;
this.numberCount = 0;
}
_read(size) {
this.numberCount += 1;
if (this.numberCount > this.maxNumbers) {
this.push(null); // 结束流
} else {
const num = Math.random();
this.push(num.toString() + '\n');
}
}
}
const randomStream = new RandomStream({ maxNumbers: 10 });
randomStream.pipe(process.stdout);
自定义流可以实现特定领域的数据源或数据处理逻辑,如传感器数据模拟、特定格式解析等。
错误处理与流量控制
Stream的错误处理和流量控制是实际应用中必须考虑的重要方面。正确的错误处理可以防止内存泄漏,流量控制可以避免数据积压。
// 错误处理示例
fs.createReadStream('nonexistent.txt')
.on('error', (err) => {
console.error('Error reading file:', err);
})
.pipe(fs.createWriteStream('output.txt'))
.on('error', (err) => {
console.error('Error writing file:', err);
});
// 流量控制示例
const slowConsumer = new Writable({
write(chunk, encoding, callback) {
console.log('Processing chunk...');
// 模拟慢速处理
setTimeout(callback, 1000);
}
});
const fastProducer = fs.createReadStream('largefile.txt');
fastProducer.pipe(slowConsumer);
// 当消费者处理不过来时,暂停生产者
slowConsumer.on('drain', () => {
fastProducer.resume();
});
理解这些机制对于构建健壮的流式应用至关重要,特别是在生产环境中处理不可靠的网络或慢速消费者时。
流组合与管道
Node.js提供了stream.pipeline
和stream.finished
等实用工具来简化流组合和资源清理。
const { pipeline, finished } = require('stream');
// 使用pipeline自动处理错误和清理
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
// 使用finished监听流结束
const stream = fs.createReadStream('file.txt');
finished(stream, (err) => {
if (err) {
console.error('Stream failed:', err);
} else {
console.log('Stream is done reading');
}
});
这些工具比手动连接流和监听事件更安全,能更好地处理错误情况和资源释放。
浏览器中的Stream API
现代浏览器也实现了类似的Stream API,可以与Node.js后端配合实现高效的Web应用。
// 浏览器中获取流式响应
fetch('/api/stream')
.then(response => {
const reader = response.body.getReader();
function readChunk() {
return reader.read().then(({ value, done }) => {
if (done) return;
console.log('Received chunk:', value);
return readChunk();
});
}
return readChunk();
});
// 创建可读流
const stream = new ReadableStream({
start(controller) {
controller.enqueue('Hello');
controller.enqueue('Stream');
controller.close();
}
});
这种技术可以实现渐进式渲染、大文件上传等场景,提升Web应用性能和用户体验。
上一篇: Stream的错误处理
下一篇: fs模块的核心API