您现在的位置是:网站首页 > 多核CPU利用文章详情

多核CPU利用

多核CPU的基本概念

现代计算机普遍采用多核CPU架构,单个物理处理器包含多个独立运算核心。Node.js作为单线程运行时环境,默认情况下无法充分利用多核CPU的计算能力。一个四核CPU上运行的Node.js应用,理论上只能利用25%的硬件资源。

// 查看CPU核心数量
const os = require('os');
console.log(`CPU核心数: ${os.cpus().length}`);

Node.js的集群模块

Node.js内置的cluster模块允许创建共享同一服务器端口的子进程。主进程(master)负责管理工作进程(worker),工作进程处理实际请求。当某个工作进程崩溃时,主进程可以立即重启它。

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`主进程 ${process.pid} 正在运行`);
  
  // 衍生工作进程
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`工作进程 ${worker.process.pid} 已退出`);
  });
} else {
  // 工作进程可以共享任何TCP连接
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('你好世界\n');
  }).listen(8000);

  console.log(`工作进程 ${process.pid} 已启动`);
}

进程间通信机制

工作进程与主进程之间通过IPC(进程间通信)通道进行通信。消息传递是序列化和反序列化的过程,需要注意传递的数据类型和大小限制。

// 主进程发送消息
cluster.workers[workerId].send({ type: 'notification', data: '重要更新' });

// 工作进程接收消息
process.on('message', (msg) => {
  if (msg.type === 'notification') {
    console.log(`收到通知: ${msg.data}`);
  }
});

负载均衡策略

Node.js集群默认采用轮询(round-robin)算法分配请求。在Linux系统上,内核会直接在工作进程间分配连接,避免主进程成为性能瓶颈。可以通过设置NODE_CLUSTER_SCHED_POLICY环境变量修改调度策略。

// 设置调度策略(需要Node.js 16+)
process.env.NODE_CLUSTER_SCHED_POLICY = 'rr'; // 轮询
// 或
process.env.NODE_CLUSTER_SCHED_POLICY = 'none'; // 由操作系统决定

共享状态管理

多进程环境下需要特别注意状态共享问题。内存数据库如Redis可以解决进程间状态同步问题。

const redis = require('redis');
const client = redis.createClient();

// 不同进程通过Redis共享计数器
app.get('/counter', (req, res) => {
  client.incr('visit_count', (err, count) => {
    res.send(`访问次数: ${count}`);
  });
});

进程管理工具

生产环境推荐使用PM2等进程管理工具,提供更完善的集群管理功能:

# 启动集群(自动根据CPU核心数创建进程)
pm2 start app.js -i max

# 查看运行状态
pm2 list

# 监控日志
pm2 logs

性能优化实践

CPU密集型任务可以通过任务分片提高并行效率。以下是将大数组分片处理的示例:

const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

function parallelMap(array, mapper, concurrency = require('os').cpus().length) {
  return new Promise((resolve) => {
    const chunkSize = Math.ceil(array.length / concurrency);
    const results = [];
    let completed = 0;

    for (let i = 0; i < concurrency; i++) {
      const worker = new Worker(__filename, {
        workerData: {
          array: array.slice(i * chunkSize, (i + 1) * chunkSize),
          mapper: mapper.toString()
        }
      });

      worker.on('message', (result) => {
        results.push(...result);
        if (++completed === concurrency) {
          resolve(results);
        }
      });
    }
  });
}

if (!isMainThread) {
  const mapper = new Function('return ' + workerData.mapper)();
  parentPort.postMessage(workerData.array.map(mapper));
}

错误处理与进程守护

需要特别注意子进程的异常处理,避免整个应用崩溃:

cluster.on('exit', (worker, code, signal) => {
  if (code !== 0 && !worker.exitedAfterDisconnect) {
    console.log(`工作进程 ${worker.id} 崩溃,正在重启...`);
    cluster.fork();
  }
});

process.on('uncaughtException', (err) => {
  console.error('未捕获异常:', err);
  // 记录日志后优雅退出
  process.exit(1);
});

实际应用场景分析

视频转码服务是典型的CPU密集型应用,适合使用多核并行处理:

const ffmpeg = require('fluent-ffmpeg');
const { Worker } = require('worker_threads');

function processVideoSegment(input, output, segment) {
  return new Promise((resolve) => {
    const worker = new Worker('./video-worker.js', {
      workerData: { input, output, segment }
    });
    worker.on('message', resolve);
  });
}

async function parallelVideoProcessing(videos) {
  const promises = videos.map(video => 
    processVideoSegment(video.input, video.output, video.segment)
  );
  await Promise.all(promises);
  // 合并处理后的视频片段
}

微服务架构中的实践

在微服务架构中,可以为每个CPU核心部署独立服务实例:

const express = require('express');
const port = 3000 + (cluster.worker ? cluster.worker.id : 0);

const app = express();
app.get('/service', (req, res) => {
  // 每个工作进程处理特定功能
  res.json({ 
    pid: process.pid,
    service: `service-${cluster.worker.id}` 
  });
});

app.listen(port, () => {
  console.log(`服务 ${cluster.worker.id} 运行在 ${port}`);
});

容器化环境下的考量

在Docker/Kubernetes环境中,需要协调Node.js集群与容器编排的关系:

# Dockerfile示例
FROM node:16
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
CMD ["node", "cluster-app.js"]

Kubernetes部署时通常每个Pod只运行一个Node.js进程,由Kubernetes负责多实例部署和负载均衡。

性能监控与调试

使用内置的perf_hooks模块监控各个工作进程的性能:

const { performance, PerformanceObserver } = require('perf_hooks');

const obs = new PerformanceObserver((items) => {
  items.getEntries().forEach(entry => {
    console.log(`${entry.name}: ${entry.duration}ms`);
  });
});
obs.observe({ entryTypes: ['measure'] });

performance.mark('start');
// 执行需要测量的代码
performance.mark('end');
performance.measure('任务耗时', 'start', 'end');

现代JavaScript特性的应用

ES模块与Worker Threads的结合使用:

// main.mjs
import { Worker } from 'worker_threads';
import { fileURLToPath } from 'url';

const worker = new Worker(fileURLToPath(new URL('./worker.mjs', import.meta.url)), {
  workerData: { task: 'complex-calculation' }
});

worker.on('message', result => {
  console.log('计算结果:', result);
});

// worker.mjs
import { parentPort, workerData } from 'worker_threads';

function calculate() {
  // 复杂计算
  return 42;
}

parentPort.postMessage(calculate());

浏览器环境的多线程

虽然浏览器中无法使用Node.js的集群模块,但Web Worker可以实现类似效果:

// 主线程
const worker = new Worker('worker.js');
worker.postMessage({ data: largeArray });
worker.onmessage = (e) => {
  console.log('处理结果:', e.data);
};

// worker.js
onmessage = function(e) {
  const result = e.data.map(processItem);
  postMessage(result);
};

新兴技术趋势

WebAssembly与多核利用的结合为Node.js带来新的可能性:

const fs = require('fs');
const { WASI } = require('wasi');

const wasi = new WASI();
const importObject = { wasi_snapshot_preview1: wasi.wasiImport };

WebAssembly.instantiate(fs.readFileSync('compute.wasm'), importObject)
  .then(obj => {
    const result = obj.instance.exports.compute();
    console.log('WASM计算结果:', result);
  });

上一篇: 进程管理工具

下一篇: 进程守护

我的名片

网名:~川~

岗位:console.log 调试员

坐标:重庆市-九龙坡区

邮箱:cc@qdcc.cn

沙漏人生

站点信息

  • 建站时间:2013/03/16
  • 本站运行
  • 文章数量
  • 总访问量
微信公众号
每次关注
都是向财富自由迈进的一步