您现在的位置是:网站首页 > WebSocket集成方案文章详情

WebSocket集成方案

WebSocket集成方案

WebSocket是一种在单个TCP连接上进行全双工通信的协议,适用于实时应用场景。Express作为流行的Node.js框架,可以通过中间件或独立库实现WebSocket功能,为应用添加实时通信能力。

基本集成方式

在Express中集成WebSocket主要有两种方式:使用ws库直接创建WebSocket服务器,或通过express-ws中间件扩展Express路由。以下是基础实现示例:

// 方式一:使用ws库
const express = require('express');
const WebSocket = require('ws');

const app = express();
const server = app.listen(3000);

const wss = new WebSocket.Server({ server });

wss.on('connection', (ws) => {
  ws.on('message', (message) => {
    console.log('Received:', message);
    ws.send(`Echo: ${message}`);
  });
});

// 方式二:使用express-ws
const expressWs = require('express-ws')(app);

app.ws('/chat', (ws, req) => {
  ws.on('message', (msg) => {
    expressWs.getWss().clients.forEach(client => {
      if (client !== ws && client.readyState === WebSocket.OPEN) {
        client.send(msg);
      }
    });
  });
});

高级功能实现

心跳检测机制

保持连接稳定性需要实现心跳检测,防止死连接:

function setupHeartbeat(ws) {
  ws.isAlive = true;
  ws.on('pong', () => { ws.isAlive = true; });

  const interval = setInterval(() => {
    if (ws.isAlive === false) return ws.terminate();
    
    ws.isAlive = false;
    ws.ping(null, false, true);
  }, 30000);

  ws.on('close', () => clearInterval(interval));
}

消息协议设计

推荐使用结构化消息格式:

// 消息格式规范
const createMessage = (type, payload) => JSON.stringify({
  timestamp: Date.now(),
  version: '1.0',
  type,
  payload
});

// 消息处理器
function handleMessage(ws, raw) {
  try {
    const msg = JSON.parse(raw);
    switch(msg.type) {
      case 'CHAT':
        broadcast(msg.payload);
        break;
      case 'TYPING':
        updateUserStatus(msg.userId, true);
        break;
    }
  } catch(e) {
    ws.send(createMessage('ERROR', 'Invalid message format'));
  }
}

生产环境注意事项

负载均衡处理

在集群模式下需要特殊处理:

const redis = require('redis');
const pub = redis.createClient();
const sub = redis.createClient();

sub.subscribe('websocket_messages');

wss.on('connection', (ws) => {
  ws.on('message', (msg) => {
    pub.publish('websocket_messages', msg);
  });

  sub.on('message', (channel, msg) => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.send(msg);
    }
  });
});

安全防护措施

必须实现的安全机制:

app.ws('/secure', (ws, req) => {
  // 1. 验证Token
  if (!validateToken(req.query.token)) {
    return ws.close(4001, 'Unauthorized');
  }

  // 2. 频率限制
  const limiter = new RateLimiter(5, 1000);
  ws.on('message', (msg) => {
    if (!limiter.check()) {
      return ws.close(4002, 'Rate limit exceeded');
    }
    // 处理消息...
  });

  // 3. 消息大小限制
  ws._socket.on('data', (data) => {
    if (data.length > 1024 * 1024) { // 1MB
      ws.close(4003, 'Message too large');
    }
  });
});

性能优化技巧

连接管理策略

高效管理大量连接:

class ConnectionPool {
  constructor() {
    this.connections = new Map();
    this.roomMap = new Map();
  }

  add(userId, ws) {
    this.connections.set(ws.id, { userId, ws });
    this.addToRoom('default', ws);
  }

  addToRoom(roomId, ws) {
    if (!this.roomMap.has(roomId)) {
      this.roomMap.set(roomId, new Set());
    }
    this.roomMap.get(roomId).add(ws.id);
  }

  broadcastToRoom(roomId, message) {
    const room = this.roomMap.get(roomId);
    if (room) {
      room.forEach(wsId => {
        const conn = this.connections.get(wsId);
        if (conn.ws.readyState === WebSocket.OPEN) {
          conn.ws.send(message);
        }
      });
    }
  }
}

二进制数据传输

处理二进制数据更高效:

app.ws('/binary', (ws, req) => {
  ws.on('message', (message) => {
    if (message instanceof Buffer) {
      const compressed = zlib.deflateSync(message);
      ws.send(compressed, { binary: true });
    } else {
      // 文本处理...
    }
  });
});

调试与监控

实时监控实现

const stats = {
  connections: 0,
  messages: 0,
  errors: 0
};

wss.on('connection', (ws) => {
  stats.connections++;
  
  ws.on('message', () => stats.messages++);
  ws.on('error', () => stats.errors++);
  ws.on('close', () => stats.connections--);
});

// 暴露监控端点
app.get('/websocket-stats', (req, res) => {
  res.json({
    ...stats,
    memoryUsage: process.memoryUsage(),
    uptime: process.uptime()
  });
});

日志记录方案

结构化日志记录:

const { createLogger, transports } = require('winston');
const logger = createLogger({
  transports: [new transports.File({ filename: 'websocket.log' })]
});

function logWSEvent(type, meta = {}) {
  logger.log({
    level: 'info',
    message: `WS_${type.toUpperCase()}`,
    timestamp: new Date(),
    ...meta
  });
}

wss.on('connection', (ws, req) => {
  logWSEvent('connect', { ip: req.socket.remoteAddress });
  
  ws.on('close', () => {
    logWSEvent('disconnect', { 
      duration: Date.now() - ws.startTime 
    });
  });
});

客户端实现示例

完整的前端集成方案:

class WebSocketClient {
  constructor(url) {
    this.reconnectAttempts = 0;
    this.maxReconnects = 5;
    this.connect(url);
  }

  connect(url) {
    this.ws = new WebSocket(url);

    this.ws.onopen = () => {
      this.reconnectAttempts = 0;
      this.heartbeat();
    };

    this.ws.onmessage = (event) => {
      this.handleMessage(event.data);
    };

    this.ws.onclose = () => {
      if (this.reconnectAttempts < this.maxReconnects) {
        setTimeout(() => {
          this.reconnectAttempts++;
          this.connect(url);
        }, 1000 * Math.pow(2, this.reconnectAttempts));
      }
    };
  }

  heartbeat() {
    this.pingInterval = setInterval(() => {
      if (this.ws.readyState === WebSocket.OPEN) {
        this.ws.send(JSON.stringify({ type: 'PING' }));
      }
    }, 30000);
  }

  handleMessage(data) {
    try {
      const msg = JSON.parse(data);
      switch(msg.type) {
        case 'NOTIFICATION':
          showNotification(msg.payload);
          break;
        case 'DATA_UPDATE':
          updateUI(msg.payload);
          break;
      }
    } catch(e) {
      console.error('Message parse error:', e);
    }
  }

  send(data) {
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(data));
    }
  }
}

我的名片

网名:~川~

岗位:console.log 调试员

坐标:重庆市-九龙坡区

邮箱:cc@qdcc.cn

沙漏人生

站点信息

  • 建站时间:2013/03/16
  • 本站运行
  • 文章数量
  • 总访问量
微信公众号
每次关注
都是向财富自由迈进的一步