您现在的位置是:网站首页 > cluster模块文章详情
cluster模块
陈川
【
Node.js
】
42272人已围观
5839字
Node.js的cluster
模块允许开发者充分利用多核CPU系统,通过创建子进程(worker)并行处理任务。它简化了多进程编程的复杂性,特别适合高并发场景。下面从核心概念、使用方法和实际案例展开。
cluster模块的核心概念
cluster
模块基于child_process模块,但提供了更高级的抽象。主进程(master)负责管理工作进程(worker),每个worker都是独立的V8实例。关键特性包括:
- 进程间通信:通过
process.on('message')
和process.send()
实现 - 负载均衡:默认采用轮询算法(Windows除外)
- 自动重启:worker崩溃时可配置自动重启
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(`主进程 ${process.pid} 正在运行`);
// 衍生工作进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 已退出`);
});
} else {
// 工作进程可以共享任何TCP连接
http.createServer((req, res) => {
res.writeHead(200);
res.end('你好世界\n');
}).listen(8000);
console.log(`工作进程 ${process.pid} 已启动`);
}
进程管理策略
主进程可以精细控制worker的生命周期:
cluster.on('fork', (worker) => {
console.log(`Worker ${worker.id} 被创建`);
});
cluster.on('online', (worker) => {
console.log(`Worker ${worker.id} 上线响应`);
});
cluster.on('disconnect', (worker) => {
console.log(`Worker ${worker.id} 断开连接`);
});
高级配置示例:
const cluster = require('cluster');
const timeout = 5000;
if (cluster.isMaster) {
const worker = cluster.fork();
let timer = setTimeout(() => {
worker.kill();
}, timeout);
worker.on('listening', (address) => {
clearTimeout(timer);
console.log(`Worker ready in ${Date.now() - start}ms`);
});
}
实际应用场景
HTTP服务器负载均衡
const cluster = require('cluster');
const express = require('express');
if (cluster.isMaster) {
const cpuCount = require('os').cpus().length;
for (let i = 0; i < cpuCount; i++) {
cluster.fork();
}
} else {
const app = express();
app.get('/heavy', (req, res) => {
// 模拟CPU密集型任务
let result = 0;
for (let i = 0; i < 1000000000; i++) {
result += Math.sqrt(i);
}
res.send(`Result: ${result}`);
});
app.listen(3000);
}
零停机重启
实现热更新方案:
// master.js
const cluster = require('cluster');
const workers = [];
if (cluster.isMaster) {
const cpuCount = require('os').cpus().length;
const spawn = () => {
const worker = cluster.fork();
workers.push(worker);
return worker;
};
// 初始启动
for (let i = 0; i < cpuCount; i++) {
spawn();
}
// 接收重启信号
process.on('SIGUSR2', () => {
console.log('开始零停机重启...');
let remaining = workers.length;
workers.forEach(worker => {
worker.once('disconnect', () => {
spawn().once('listening', () => {
if (--remaining === 0) {
console.log('重启完成');
}
});
});
worker.disconnect();
});
});
}
高级配置技巧
自定义负载均衡
const cluster = require('cluster');
const net = require('net');
const sticky = require('sticky-session');
if (!cluster.isMaster) {
const server = net.createServer((socket) => {
socket.end('Worker: ' + process.pid);
}).listen(8000);
return;
}
const workerCount = require('os').cpus().length;
const workers = [];
for (let i = 0; i < workerCount; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// 基于IP的会话保持
const server = net.createServer({ pauseOnConnect: true }, (connection) => {
const worker = workers[connection.remoteAddress & (workers.length - 1)];
worker.send('sticky-session:connection', connection);
}).listen(8000);
进程状态监控
cluster.on('message', (worker, message) => {
if (message.type === 'status') {
console.log(`Worker ${worker.id} CPU: ${message.cpu}%`);
}
});
// 在工作进程中
setInterval(() => {
const startTime = process.hrtime();
const startUsage = process.cpuUsage();
setTimeout(() => {
const elapsed = process.hrtime(startTime);
const endUsage = process.cpuUsage(startUsage);
const elapsedTime = elapsed[0] * 1000 + elapsed[1] / 1000000;
const cpuPercent = (endUsage.user + endUsage.system) / (elapsedTime * 1000) * 100;
process.send({
type: 'status',
cpu: cpuPercent.toFixed(1)
});
}, 1000);
}, 5000);
性能优化实践
共享端口与独立端口
默认共享方案:
// 所有worker共享8000端口
http.createServer().listen(8000);
独立端口方案:
// master.js
const basePort = 8000;
for (let i = 0; i < numCPUs; i++) {
cluster.fork({ WORKER_PORT: basePort + i });
}
// worker.js
const port = process.env.WORKER_PORT || 8000;
http.createServer().listen(port);
内存泄漏检测
const heapdump = require('heapdump');
const path = require('path');
cluster.on('message', (worker, message) => {
if (message === 'heapdump') {
const filename = path.join(__dirname, `heapdump-${worker.id}-${Date.now()}.heapsnapshot`);
heapdump.writeSnapshot(filename, (err) => {
worker.send(`Heapdump saved to ${filename}`);
});
}
});
// 在工作进程中
process.on('message', (msg) => {
if (msg.startsWith('Heapdump saved')) {
console.log(msg);
}
});
// 触发dump
process.send('heapdump');
错误处理机制
未捕获异常处理
process.on('uncaughtException', (err) => {
console.error(`未捕获异常: ${err.stack}`);
// 通知主进程准备退出
process.send({ type: 'error', msg: err.message });
// 优雅退出
server.close(() => {
process.exit(1);
});
// 强制超时退出
setTimeout(() => {
process.abort();
}, 5000).unref();
});
进程心跳检测
// master.js
workers.forEach(worker => {
let missed = 0;
const timer = setInterval(() => {
worker.send('ping');
missed++;
if (missed >= 3) {
clearInterval(timer);
worker.kill();
}
}, 3000);
worker.on('message', (msg) => {
if (msg === 'pong') missed = 0;
});
});
// worker.js
process.on('message', (msg) => {
if (msg === 'ping') process.send('pong');
});
上一篇: child_process模块
下一篇: 进程间通信