您现在的位置是:网站首页 > WebSocket集成方案文章详情
WebSocket集成方案
陈川
【
Node.js
】
19797人已围观
6108字
WebSocket集成方案
WebSocket是一种在单个TCP连接上进行全双工通信的协议,适用于实时应用场景。Express作为流行的Node.js框架,可以通过中间件或独立库实现WebSocket功能,为应用添加实时通信能力。
基本集成方式
在Express中集成WebSocket主要有两种方式:使用ws
库直接创建WebSocket服务器,或通过express-ws
中间件扩展Express路由。以下是基础实现示例:
// 方式一:使用ws库
const express = require('express');
const WebSocket = require('ws');
const app = express();
const server = app.listen(3000);
const wss = new WebSocket.Server({ server });
wss.on('connection', (ws) => {
ws.on('message', (message) => {
console.log('Received:', message);
ws.send(`Echo: ${message}`);
});
});
// 方式二:使用express-ws
const expressWs = require('express-ws')(app);
app.ws('/chat', (ws, req) => {
ws.on('message', (msg) => {
expressWs.getWss().clients.forEach(client => {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(msg);
}
});
});
});
高级功能实现
心跳检测机制
保持连接稳定性需要实现心跳检测,防止死连接:
function setupHeartbeat(ws) {
ws.isAlive = true;
ws.on('pong', () => { ws.isAlive = true; });
const interval = setInterval(() => {
if (ws.isAlive === false) return ws.terminate();
ws.isAlive = false;
ws.ping(null, false, true);
}, 30000);
ws.on('close', () => clearInterval(interval));
}
消息协议设计
推荐使用结构化消息格式:
// 消息格式规范
const createMessage = (type, payload) => JSON.stringify({
timestamp: Date.now(),
version: '1.0',
type,
payload
});
// 消息处理器
function handleMessage(ws, raw) {
try {
const msg = JSON.parse(raw);
switch(msg.type) {
case 'CHAT':
broadcast(msg.payload);
break;
case 'TYPING':
updateUserStatus(msg.userId, true);
break;
}
} catch(e) {
ws.send(createMessage('ERROR', 'Invalid message format'));
}
}
生产环境注意事项
负载均衡处理
在集群模式下需要特殊处理:
const redis = require('redis');
const pub = redis.createClient();
const sub = redis.createClient();
sub.subscribe('websocket_messages');
wss.on('connection', (ws) => {
ws.on('message', (msg) => {
pub.publish('websocket_messages', msg);
});
sub.on('message', (channel, msg) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(msg);
}
});
});
安全防护措施
必须实现的安全机制:
app.ws('/secure', (ws, req) => {
// 1. 验证Token
if (!validateToken(req.query.token)) {
return ws.close(4001, 'Unauthorized');
}
// 2. 频率限制
const limiter = new RateLimiter(5, 1000);
ws.on('message', (msg) => {
if (!limiter.check()) {
return ws.close(4002, 'Rate limit exceeded');
}
// 处理消息...
});
// 3. 消息大小限制
ws._socket.on('data', (data) => {
if (data.length > 1024 * 1024) { // 1MB
ws.close(4003, 'Message too large');
}
});
});
性能优化技巧
连接管理策略
高效管理大量连接:
class ConnectionPool {
constructor() {
this.connections = new Map();
this.roomMap = new Map();
}
add(userId, ws) {
this.connections.set(ws.id, { userId, ws });
this.addToRoom('default', ws);
}
addToRoom(roomId, ws) {
if (!this.roomMap.has(roomId)) {
this.roomMap.set(roomId, new Set());
}
this.roomMap.get(roomId).add(ws.id);
}
broadcastToRoom(roomId, message) {
const room = this.roomMap.get(roomId);
if (room) {
room.forEach(wsId => {
const conn = this.connections.get(wsId);
if (conn.ws.readyState === WebSocket.OPEN) {
conn.ws.send(message);
}
});
}
}
}
二进制数据传输
处理二进制数据更高效:
app.ws('/binary', (ws, req) => {
ws.on('message', (message) => {
if (message instanceof Buffer) {
const compressed = zlib.deflateSync(message);
ws.send(compressed, { binary: true });
} else {
// 文本处理...
}
});
});
调试与监控
实时监控实现
const stats = {
connections: 0,
messages: 0,
errors: 0
};
wss.on('connection', (ws) => {
stats.connections++;
ws.on('message', () => stats.messages++);
ws.on('error', () => stats.errors++);
ws.on('close', () => stats.connections--);
});
// 暴露监控端点
app.get('/websocket-stats', (req, res) => {
res.json({
...stats,
memoryUsage: process.memoryUsage(),
uptime: process.uptime()
});
});
日志记录方案
结构化日志记录:
const { createLogger, transports } = require('winston');
const logger = createLogger({
transports: [new transports.File({ filename: 'websocket.log' })]
});
function logWSEvent(type, meta = {}) {
logger.log({
level: 'info',
message: `WS_${type.toUpperCase()}`,
timestamp: new Date(),
...meta
});
}
wss.on('connection', (ws, req) => {
logWSEvent('connect', { ip: req.socket.remoteAddress });
ws.on('close', () => {
logWSEvent('disconnect', {
duration: Date.now() - ws.startTime
});
});
});
客户端实现示例
完整的前端集成方案:
class WebSocketClient {
constructor(url) {
this.reconnectAttempts = 0;
this.maxReconnects = 5;
this.connect(url);
}
connect(url) {
this.ws = new WebSocket(url);
this.ws.onopen = () => {
this.reconnectAttempts = 0;
this.heartbeat();
};
this.ws.onmessage = (event) => {
this.handleMessage(event.data);
};
this.ws.onclose = () => {
if (this.reconnectAttempts < this.maxReconnects) {
setTimeout(() => {
this.reconnectAttempts++;
this.connect(url);
}, 1000 * Math.pow(2, this.reconnectAttempts));
}
};
}
heartbeat() {
this.pingInterval = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'PING' }));
}
}, 30000);
}
handleMessage(data) {
try {
const msg = JSON.parse(data);
switch(msg.type) {
case 'NOTIFICATION':
showNotification(msg.payload);
break;
case 'DATA_UPDATE':
updateUI(msg.payload);
break;
}
} catch(e) {
console.error('Message parse error:', e);
}
}
send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
}
}
}
上一篇: RESTful API开发支持
下一篇: 数据库连接与ORM集成