您现在的位置是:网站首页 > cluster模块文章详情

cluster模块

Node.js的cluster模块允许开发者充分利用多核CPU系统,通过创建子进程(worker)并行处理任务。它简化了多进程编程的复杂性,特别适合高并发场景。下面从核心概念、使用方法和实际案例展开。

cluster模块的核心概念

cluster模块基于child_process模块,但提供了更高级的抽象。主进程(master)负责管理工作进程(worker),每个worker都是独立的V8实例。关键特性包括:

  1. 进程间通信:通过process.on('message')process.send()实现
  2. 负载均衡:默认采用轮询算法(Windows除外)
  3. 自动重启:worker崩溃时可配置自动重启
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`主进程 ${process.pid} 正在运行`);
  
  // 衍生工作进程
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`工作进程 ${worker.process.pid} 已退出`);
  });
} else {
  // 工作进程可以共享任何TCP连接
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('你好世界\n');
  }).listen(8000);

  console.log(`工作进程 ${process.pid} 已启动`);
}

进程管理策略

主进程可以精细控制worker的生命周期:

cluster.on('fork', (worker) => {
  console.log(`Worker ${worker.id} 被创建`);
});

cluster.on('online', (worker) => {
  console.log(`Worker ${worker.id} 上线响应`);
});

cluster.on('disconnect', (worker) => {
  console.log(`Worker ${worker.id} 断开连接`);
});

高级配置示例:

const cluster = require('cluster');
const timeout = 5000;

if (cluster.isMaster) {
  const worker = cluster.fork();
  
  let timer = setTimeout(() => {
    worker.kill();
  }, timeout);

  worker.on('listening', (address) => {
    clearTimeout(timer);
    console.log(`Worker ready in ${Date.now() - start}ms`);
  });
}

实际应用场景

HTTP服务器负载均衡

const cluster = require('cluster');
const express = require('express');

if (cluster.isMaster) {
  const cpuCount = require('os').cpus().length;
  
  for (let i = 0; i < cpuCount; i++) {
    cluster.fork();
  }
} else {
  const app = express();
  
  app.get('/heavy', (req, res) => {
    // 模拟CPU密集型任务
    let result = 0;
    for (let i = 0; i < 1000000000; i++) {
      result += Math.sqrt(i);
    }
    res.send(`Result: ${result}`);
  });

  app.listen(3000);
}

零停机重启

实现热更新方案:

// master.js
const cluster = require('cluster');
const workers = [];

if (cluster.isMaster) {
  const cpuCount = require('os').cpus().length;
  
  const spawn = () => {
    const worker = cluster.fork();
    workers.push(worker);
    return worker;
  };

  // 初始启动
  for (let i = 0; i < cpuCount; i++) {
    spawn();
  }

  // 接收重启信号
  process.on('SIGUSR2', () => {
    console.log('开始零停机重启...');
    
    let remaining = workers.length;
    workers.forEach(worker => {
      worker.once('disconnect', () => {
        spawn().once('listening', () => {
          if (--remaining === 0) {
            console.log('重启完成');
          }
        });
      });
      worker.disconnect();
    });
  });
}

高级配置技巧

自定义负载均衡

const cluster = require('cluster');
const net = require('net');
const sticky = require('sticky-session');

if (!cluster.isMaster) {
  const server = net.createServer((socket) => {
    socket.end('Worker: ' + process.pid);
  }).listen(8000);
  return;
}

const workerCount = require('os').cpus().length;
const workers = [];

for (let i = 0; i < workerCount; i++) {
  const worker = cluster.fork();
  workers.push(worker);
}

// 基于IP的会话保持
const server = net.createServer({ pauseOnConnect: true }, (connection) => {
  const worker = workers[connection.remoteAddress & (workers.length - 1)];
  worker.send('sticky-session:connection', connection);
}).listen(8000);

进程状态监控

cluster.on('message', (worker, message) => {
  if (message.type === 'status') {
    console.log(`Worker ${worker.id} CPU: ${message.cpu}%`);
  }
});

// 在工作进程中
setInterval(() => {
  const startTime = process.hrtime();
  const startUsage = process.cpuUsage();
  
  setTimeout(() => {
    const elapsed = process.hrtime(startTime);
    const endUsage = process.cpuUsage(startUsage);
    
    const elapsedTime = elapsed[0] * 1000 + elapsed[1] / 1000000;
    const cpuPercent = (endUsage.user + endUsage.system) / (elapsedTime * 1000) * 100;
    
    process.send({
      type: 'status',
      cpu: cpuPercent.toFixed(1)
    });
  }, 1000);
}, 5000);

性能优化实践

共享端口与独立端口

默认共享方案:

// 所有worker共享8000端口
http.createServer().listen(8000);

独立端口方案:

// master.js
const basePort = 8000;
for (let i = 0; i < numCPUs; i++) {
  cluster.fork({ WORKER_PORT: basePort + i });
}

// worker.js
const port = process.env.WORKER_PORT || 8000;
http.createServer().listen(port);

内存泄漏检测

const heapdump = require('heapdump');
const path = require('path');

cluster.on('message', (worker, message) => {
  if (message === 'heapdump') {
    const filename = path.join(__dirname, `heapdump-${worker.id}-${Date.now()}.heapsnapshot`);
    heapdump.writeSnapshot(filename, (err) => {
      worker.send(`Heapdump saved to ${filename}`);
    });
  }
});

// 在工作进程中
process.on('message', (msg) => {
  if (msg.startsWith('Heapdump saved')) {
    console.log(msg);
  }
});

// 触发dump
process.send('heapdump');

错误处理机制

未捕获异常处理

process.on('uncaughtException', (err) => {
  console.error(`未捕获异常: ${err.stack}`);
  // 通知主进程准备退出
  process.send({ type: 'error', msg: err.message });
  // 优雅退出
  server.close(() => {
    process.exit(1);
  });
  // 强制超时退出
  setTimeout(() => {
    process.abort();
  }, 5000).unref();
});

进程心跳检测

// master.js
workers.forEach(worker => {
  let missed = 0;
  
  const timer = setInterval(() => {
    worker.send('ping');
    missed++;
    
    if (missed >= 3) {
      clearInterval(timer);
      worker.kill();
    }
  }, 3000);

  worker.on('message', (msg) => {
    if (msg === 'pong') missed = 0;
  });
});

// worker.js
process.on('message', (msg) => {
  if (msg === 'ping') process.send('pong');
});

我的名片

网名:~川~

岗位:console.log 调试员

坐标:重庆市-九龙坡区

邮箱:cc@qdcc.cn

沙漏人生

站点信息

  • 建站时间:2013/03/16
  • 本站运行
  • 文章数量
  • 总访问量
微信公众号
每次关注
都是向财富自由迈进的一步