您现在的位置是:网站首页 > 四种基本流类型文章详情

四种基本流类型

四种基本流类型

Node.js中的流是处理数据的高效方式,尤其适合处理大文件或连续数据。流分为四种基本类型:可读流、可写流、双工流和转换流。每种类型都有特定的应用场景和实现方式。

可读流(Readable Stream)

可读流用于从数据源读取数据。例如文件读取、HTTP请求体或TCP套接字。可读流有两种模式:流动模式(flowing)和暂停模式(paused)。流动模式下数据会自动通过事件推送,而暂停模式需要显式调用read()方法。

const fs = require('fs');
const readableStream = fs.createReadStream('./large-file.txt');

// 流动模式
readableStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data`);
});

// 暂停模式
readableStream.on('readable', () => {
  let chunk;
  while ((chunk = readableStream.read()) !== null) {
    console.log(`Read ${chunk.length} bytes`);
  }
});

可读流还支持管道操作,可以直接将数据导入可写流:

const fs = require('fs');
const zlib = require('zlib');

fs.createReadStream('./large-file.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('./large-file.txt.gz'));

可写流(Writable Stream)

可写流用于向目标写入数据。常见场景包括文件写入、HTTP响应或数据库操作。可写流提供了write()end()方法,并会触发drain事件当缓冲区清空时。

const fs = require('fs');
const writableStream = fs.createWriteStream('./output.txt');

// 写入数据
writableStream.write('First line\n');
writableStream.write('Second line\n');
writableStream.end('Final line\n');

// 处理完成事件
writableStream.on('finish', () => {
  console.log('All writes completed');
});

// 处理错误
writableStream.on('error', (err) => {
  console.error('Write error:', err);
});

对于大容量写入,应该监听drain事件避免内存问题:

function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        writer.write(data, encoding, callback);
      } else {
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      writer.once('drain', write);
    }
  }
}

双工流(Duplex Stream)

双工流同时实现了可读和可写接口,可以双向传输数据。典型的例子包括TCP套接字和WebSocket连接。双工流的读写通道是独立的。

const { Duplex } = require('stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.data = [];
  }
  
  _write(chunk, encoding, callback) {
    this.data.push(chunk);
    callback();
  }
  
  _read(size) {
    if (this.data.length) {
      this.push(this.data.shift());
    } else {
      this.push(null);
    }
  }
}

const duplex = new MyDuplex();
duplex.write('Hello');
duplex.write('World');
duplex.end();

duplex.on('data', (chunk) => {
  console.log(chunk.toString()); // 输出: Hello World
});

转换流(Transform Stream)

转换流是特殊的双工流,用于在写入和读取之间修改数据。压缩、加密或数据格式转换等场景常用转换流。它必须实现_transform方法。

const { Transform } = require('stream');

class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

const upperCase = new UpperCaseTransform();
process.stdin.pipe(upperCase).pipe(process.stdout);

// 输入小写文本,输出大写形式

更实用的例子是创建CSV到JSON的转换器:

const { Transform } = require('stream');
const csv = require('csv-parser');

class CsvToJsonTransform extends Transform {
  constructor() {
    super({ objectMode: true });
    this.isFirstChunk = true;
    this.headers = [];
  }
  
  _transform(chunk, encoding, callback) {
    if (this.isFirstChunk) {
      this.headers = Object.keys(chunk);
      this.isFirstChunk = false;
    }
    
    const jsonObj = {};
    this.headers.forEach(header => {
      jsonObj[header] = chunk[header];
    });
    
    this.push(JSON.stringify(jsonObj) + '\n');
    callback();
  }
}

fs.createReadStream('data.csv')
  .pipe(csv())
  .pipe(new CsvToJsonTransform())
  .pipe(fs.createWriteStream('data.jsonl'));

流的高级用法

流可以组合使用创建复杂的数据处理管道。Node.js的pipeline方法比传统的.pipe()更安全,能正确处理错误和清理资源。

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('input.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

自定义流时,可以通过选项控制行为:

const { Readable } = require('stream');

class CounterStream extends Readable {
  constructor(limit, options) {
    super(options);
    this.limit = limit;
    this.count = 1;
  }
  
  _read(size) {
    if (this.count > this.limit) {
      this.push(null); // 结束流
    } else {
      const buf = Buffer.from(`${this.count++}\n`, 'utf8');
      this.push(buf);
    }
  }
}

const counter = new CounterStream(100, { highWaterMark: 8 });
counter.pipe(process.stdout);

流的错误处理

正确处理流错误至关重要。未捕获的错误可能导致内存泄漏或程序崩溃。最佳实践是为每个流单独添加错误监听器。

const fs = require('fs');
const stream = fs.createReadStream('nonexistent-file.txt');

stream.on('error', (err) => {
  console.error('Stream error:', err.message);
  // 执行清理操作
});

// 在管道中处理错误
const { pipeline } = require('stream');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('big-file.txt'),
  zlib.createGzip(),
  fs.createWriteStream('big-file.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    }
  }
);

流的性能优化

合理配置流选项可以显著提升性能。highWaterMark选项控制内部缓冲区大小,影响内存使用和吞吐量。

// 创建高性能可写流
const writable = fs.createWriteStream('output.txt', {
  highWaterMark: 1024 * 1024, // 1MB缓冲区
  encoding: 'utf8'
});

// 对象模式流处理JavaScript对象
const { Transform } = require('stream');

const objectTransform = new Transform({
  objectMode: true,
  transform(obj, encoding, callback) {
    this.push(processObject(obj));
    callback();
  }
});

对于高吞吐量场景,可以使用stream.pipeline配合异步生成器:

const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);

async function processLargeData() {
  const source = fs.createReadStream('input.csv');
  const transformer = createTransformStream();
  const destination = fs.createWriteStream('output.json');
  
  await pipelineAsync(
    source,
    csvParser(),
    transformer,
    destination
  );
}

我的名片

网名:~川~

岗位:console.log 调试员

坐标:重庆市-九龙坡区

邮箱:cc@qdcc.cn

沙漏人生

站点信息

  • 建站时间:2013/03/16
  • 本站运行
  • 文章数量
  • 总访问量
微信公众号
每次关注
都是向财富自由迈进的一步