您现在的位置是:网站首页 > Stream的错误处理文章详情

Stream的错误处理

Stream的错误处理

Node.js中的Stream是处理流式数据的核心抽象,错误处理对于构建健壮的流式应用至关重要。流在传输过程中可能遇到各种错误,如文件读取失败、网络中断或数据处理异常,正确的错误处理能防止程序崩溃和数据丢失。

错误事件的基本处理

所有Stream都是EventEmitter的实例,当错误发生时会触发error事件。未处理的error事件会导致Node.js进程崩溃:

const fs = require('fs');

const readStream = fs.createReadStream('nonexistent-file.txt');

// 未捕获的错误会导致进程退出
readStream.on('data', (chunk) => {
  console.log(chunk);
});

正确处理方式应该是监听error事件:

readStream.on('error', (err) => {
  console.error('读取文件时发生错误:', err.message);
  // 可以在这里进行资源清理
});

管道操作中的错误传播

使用pipe()方法时,错误不会自动从一个流传播到另一个流。需要单独处理每个流的错误:

const fs = require('fs');
const zlib = require('zlib');

fs.createReadStream('input.txt')
  .on('error', (err) => console.error('读取错误:', err))
  .pipe(zlib.createGzip())
  .on('error', (err) => console.error('压缩错误:', err))
  .pipe(fs.createWriteStream('output.txt.gz'))
  .on('error', (err) => console.error('写入错误:', err));

使用pipeline处理错误

Node.js的stream.pipeline方法提供了更好的错误处理机制,它会自动传播错误并清理资源:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('output.txt.gz'),
  (err) => {
    if (err) {
      console.error('管道操作失败:', err);
    } else {
      console.log('管道操作成功完成');
    }
  }
);

自定义流的错误处理

实现自定义流时,应该通过this.emit('error', error)触发错误:

const { Transform } = require('stream');

class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    try {
      const uppercased = chunk.toString().toUpperCase();
      callback(null, uppercased);
    } catch (err) {
      // 触发错误事件
      this.emit('error', err);
      // 也可以直接传递给callback
      // callback(err);
    }
  }
}

const transformer = new UppercaseTransform();
transformer.on('error', (err) => console.error('转换错误:', err));

process.stdin.pipe(transformer).pipe(process.stdout);

错误恢复策略

某些场景下可能需要实现错误恢复机制。例如,当读取网络资源失败时重试:

const { PassThrough } = require('stream');
const axios = require('axios');

function createRetryableStream(url, maxRetries = 3) {
  const stream = new PassThrough();
  let retries = 0;
  
  async function fetchData() {
    try {
      const response = await axios.get(url, { responseType: 'stream' });
      response.data.pipe(stream);
      response.data.on('error', (err) => {
        if (retries < maxRetries) {
          retries++;
          console.log(`第 ${retries} 次重试...`);
          fetchData();
        } else {
          stream.emit('error', err);
        }
      });
    } catch (err) {
      if (retries < maxRetries) {
        retries++;
        console.log(`第 ${retries} 次重试...`);
        fetchData();
      } else {
        stream.emit('error', err);
      }
    }
  }
  
  fetchData();
  return stream;
}

const stream = createRetryableStream('https://example.com/data');
stream.on('data', (chunk) => console.log('收到数据:', chunk.length));
stream.on('error', (err) => console.error('最终失败:', err));

错误边界与资源清理

当处理文件或网络连接等资源时,错误发生后需要确保正确清理:

const fs = require('fs');
const { Writable } = require('stream');

class SafeFileWriter extends Writable {
  constructor(filename) {
    super();
    this.filename = filename;
    this.fileDescriptor = null;
    this.tempFilename = `${filename}.tmp`;
    
    fs.open(this.tempFilename, 'w', (err, fd) => {
      if (err) return this.emit('error', err);
      this.fileDescriptor = fd;
    });
  }
  
  _write(chunk, encoding, callback) {
    if (!this.fileDescriptor) {
      return callback(new Error('文件未准备好'));
    }
    
    fs.write(this.fileDescriptor, chunk, (err) => {
      if (err) {
        this._cleanup();
        return callback(err);
      }
      callback();
    });
  }
  
  _final(callback) {
    fs.close(this.fileDescriptor, (err) => {
      if (err) {
        this._cleanup();
        return callback(err);
      }
      
      fs.rename(this.tempFilename, this.filename, (err) => {
        if (err) {
          this._cleanup();
          return callback(err);
        }
        callback();
      });
    });
  }
  
  _cleanup() {
    if (this.fileDescriptor) {
      fs.close(this.fileDescriptor, () => {
        fs.unlink(this.tempFilename, () => {});
      });
    } else {
      fs.unlink(this.tempFilename, () => {});
    }
  }
  
  _destroy(err, callback) {
    this._cleanup();
    callback(err);
  }
}

const writer = new SafeFileWriter('output.txt');
writer.on('error', (err) => console.error('写入失败:', err));
writer.write('安全写入数据\n');
writer.end();

错误日志与监控

在生产环境中,应该记录流错误以便后续分析:

const { createServer } = require('http');
const { createWriteStream } = require('fs');
const { promisify } = require('util');
const appendFile = promisify(require('fs').appendFile);

const errorLogStream = createWriteStream('stream_errors.log', { flags: 'a' });

const server = createServer((req, res) => {
  const dataStream = getDataFromSomewhere(); // 假设的流来源
  
  dataStream
    .on('error', async (err) => {
      const logEntry = `${new Date().toISOString()} - ${err.stack}\n\n`;
      await appendFile('stream_errors.log', logEntry).catch(console.error);
      if (!res.headersSent) {
        res.statusCode = 500;
        res.end('Internal Server Error');
      }
    })
    .pipe(res);
});

server.listen(3000);

多流操作的错误聚合

当并行处理多个流时,需要聚合错误处理:

const { PassThrough } = require('stream');
const EventEmitter = require('events');

class StreamAggregator extends EventEmitter {
  constructor(streams) {
    super();
    this.streams = streams;
    this.completed = 0;
    this.errors = [];
    this.output = new PassThrough();
    
    streams.forEach((stream, i) => {
      stream.on('data', (chunk) => this.output.write(chunk));
      stream.on('end', () => this._handleComplete());
      stream.on('error', (err) => this._handleError(err));
    });
  }
  
  _handleComplete() {
    this.completed++;
    if (this.completed + this.errors.length === this.streams.length) {
      if (this.errors.length > 0) {
        this.output.emit('error', new AggregateError(this.errors));
      } else {
        this.output.end();
      }
    }
  }
  
  _handleError(err) {
    this.errors.push(err);
    if (this.completed + this.errors.length === this.streams.length) {
      this.output.emit('error', new AggregateError(this.errors));
    }
  }
}

// 使用示例
const streams = [
  fs.createReadStream('file1.txt'),
  fs.createReadStream('file2.txt'),
  fs.createReadStream('file3.txt')
];

const aggregator = new StreamAggregator(streams);
aggregator.output
  .on('data', (chunk) => console.log('数据:', chunk))
  .on('error', (err) => console.error('聚合错误:', err.errors));

我的名片

网名:~川~

岗位:console.log 调试员

坐标:重庆市-九龙坡区

邮箱:cc@qdcc.cn

沙漏人生

站点信息

  • 建站时间:2013/03/16
  • 本站运行
  • 文章数量
  • 总访问量
微信公众号
每次关注
都是向财富自由迈进的一步