您现在的位置是:网站首页 > 多核CPU利用文章详情
多核CPU利用
陈川
【
Node.js
】
22819人已围观
6203字
多核CPU的基本概念
现代计算机普遍采用多核CPU架构,单个物理处理器包含多个独立运算核心。Node.js作为单线程运行时环境,默认情况下无法充分利用多核CPU的计算能力。一个四核CPU上运行的Node.js应用,理论上只能利用25%的硬件资源。
// 查看CPU核心数量
const os = require('os');
console.log(`CPU核心数: ${os.cpus().length}`);
Node.js的集群模块
Node.js内置的cluster
模块允许创建共享同一服务器端口的子进程。主进程(master)负责管理工作进程(worker),工作进程处理实际请求。当某个工作进程崩溃时,主进程可以立即重启它。
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(`主进程 ${process.pid} 正在运行`);
// 衍生工作进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 已退出`);
});
} else {
// 工作进程可以共享任何TCP连接
http.createServer((req, res) => {
res.writeHead(200);
res.end('你好世界\n');
}).listen(8000);
console.log(`工作进程 ${process.pid} 已启动`);
}
进程间通信机制
工作进程与主进程之间通过IPC(进程间通信)通道进行通信。消息传递是序列化和反序列化的过程,需要注意传递的数据类型和大小限制。
// 主进程发送消息
cluster.workers[workerId].send({ type: 'notification', data: '重要更新' });
// 工作进程接收消息
process.on('message', (msg) => {
if (msg.type === 'notification') {
console.log(`收到通知: ${msg.data}`);
}
});
负载均衡策略
Node.js集群默认采用轮询(round-robin)算法分配请求。在Linux系统上,内核会直接在工作进程间分配连接,避免主进程成为性能瓶颈。可以通过设置NODE_CLUSTER_SCHED_POLICY
环境变量修改调度策略。
// 设置调度策略(需要Node.js 16+)
process.env.NODE_CLUSTER_SCHED_POLICY = 'rr'; // 轮询
// 或
process.env.NODE_CLUSTER_SCHED_POLICY = 'none'; // 由操作系统决定
共享状态管理
多进程环境下需要特别注意状态共享问题。内存数据库如Redis可以解决进程间状态同步问题。
const redis = require('redis');
const client = redis.createClient();
// 不同进程通过Redis共享计数器
app.get('/counter', (req, res) => {
client.incr('visit_count', (err, count) => {
res.send(`访问次数: ${count}`);
});
});
进程管理工具
生产环境推荐使用PM2等进程管理工具,提供更完善的集群管理功能:
# 启动集群(自动根据CPU核心数创建进程)
pm2 start app.js -i max
# 查看运行状态
pm2 list
# 监控日志
pm2 logs
性能优化实践
CPU密集型任务可以通过任务分片提高并行效率。以下是将大数组分片处理的示例:
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
function parallelMap(array, mapper, concurrency = require('os').cpus().length) {
return new Promise((resolve) => {
const chunkSize = Math.ceil(array.length / concurrency);
const results = [];
let completed = 0;
for (let i = 0; i < concurrency; i++) {
const worker = new Worker(__filename, {
workerData: {
array: array.slice(i * chunkSize, (i + 1) * chunkSize),
mapper: mapper.toString()
}
});
worker.on('message', (result) => {
results.push(...result);
if (++completed === concurrency) {
resolve(results);
}
});
}
});
}
if (!isMainThread) {
const mapper = new Function('return ' + workerData.mapper)();
parentPort.postMessage(workerData.array.map(mapper));
}
错误处理与进程守护
需要特别注意子进程的异常处理,避免整个应用崩溃:
cluster.on('exit', (worker, code, signal) => {
if (code !== 0 && !worker.exitedAfterDisconnect) {
console.log(`工作进程 ${worker.id} 崩溃,正在重启...`);
cluster.fork();
}
});
process.on('uncaughtException', (err) => {
console.error('未捕获异常:', err);
// 记录日志后优雅退出
process.exit(1);
});
实际应用场景分析
视频转码服务是典型的CPU密集型应用,适合使用多核并行处理:
const ffmpeg = require('fluent-ffmpeg');
const { Worker } = require('worker_threads');
function processVideoSegment(input, output, segment) {
return new Promise((resolve) => {
const worker = new Worker('./video-worker.js', {
workerData: { input, output, segment }
});
worker.on('message', resolve);
});
}
async function parallelVideoProcessing(videos) {
const promises = videos.map(video =>
processVideoSegment(video.input, video.output, video.segment)
);
await Promise.all(promises);
// 合并处理后的视频片段
}
微服务架构中的实践
在微服务架构中,可以为每个CPU核心部署独立服务实例:
const express = require('express');
const port = 3000 + (cluster.worker ? cluster.worker.id : 0);
const app = express();
app.get('/service', (req, res) => {
// 每个工作进程处理特定功能
res.json({
pid: process.pid,
service: `service-${cluster.worker.id}`
});
});
app.listen(port, () => {
console.log(`服务 ${cluster.worker.id} 运行在 ${port}`);
});
容器化环境下的考量
在Docker/Kubernetes环境中,需要协调Node.js集群与容器编排的关系:
# Dockerfile示例
FROM node:16
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
CMD ["node", "cluster-app.js"]
Kubernetes部署时通常每个Pod只运行一个Node.js进程,由Kubernetes负责多实例部署和负载均衡。
性能监控与调试
使用内置的perf_hooks
模块监控各个工作进程的性能:
const { performance, PerformanceObserver } = require('perf_hooks');
const obs = new PerformanceObserver((items) => {
items.getEntries().forEach(entry => {
console.log(`${entry.name}: ${entry.duration}ms`);
});
});
obs.observe({ entryTypes: ['measure'] });
performance.mark('start');
// 执行需要测量的代码
performance.mark('end');
performance.measure('任务耗时', 'start', 'end');
现代JavaScript特性的应用
ES模块与Worker Threads的结合使用:
// main.mjs
import { Worker } from 'worker_threads';
import { fileURLToPath } from 'url';
const worker = new Worker(fileURLToPath(new URL('./worker.mjs', import.meta.url)), {
workerData: { task: 'complex-calculation' }
});
worker.on('message', result => {
console.log('计算结果:', result);
});
// worker.mjs
import { parentPort, workerData } from 'worker_threads';
function calculate() {
// 复杂计算
return 42;
}
parentPort.postMessage(calculate());
浏览器环境的多线程
虽然浏览器中无法使用Node.js的集群模块,但Web Worker可以实现类似效果:
// 主线程
const worker = new Worker('worker.js');
worker.postMessage({ data: largeArray });
worker.onmessage = (e) => {
console.log('处理结果:', e.data);
};
// worker.js
onmessage = function(e) {
const result = e.data.map(processItem);
postMessage(result);
};
新兴技术趋势
WebAssembly与多核利用的结合为Node.js带来新的可能性:
const fs = require('fs');
const { WASI } = require('wasi');
const wasi = new WASI();
const importObject = { wasi_snapshot_preview1: wasi.wasiImport };
WebAssembly.instantiate(fs.readFileSync('compute.wasm'), importObject)
.then(obj => {
const result = obj.instance.exports.compute();
console.log('WASM计算结果:', result);
});