您现在的位置是:网站首页 > 发布/订阅模式文章详情

发布/订阅模式

发布/订阅模式的核心概念

发布/订阅模式(Pub/Sub)是一种消息传递范式,消息发送者(发布者)不直接将消息发送给特定接收者(订阅者),而是将消息分类发布到特定频道。订阅者可以订阅一个或多个频道,只接收感兴趣的消息。这种模式实现了发布者和订阅者的完全解耦。

在Node.js中,发布/订阅模式常用于事件处理、消息队列和实时通信等场景。典型的实现包括:

  • EventEmitter类(Node.js内置模块)
  • Redis的Pub/Sub功能
  • MQTT协议实现

Node.js中的EventEmitter

Node.js的events模块提供了EventEmitter类,这是最简单的发布/订阅实现:

const EventEmitter = require('events');

class MyEmitter extends EventEmitter {}

const myEmitter = new MyEmitter();

// 订阅事件
myEmitter.on('event', (arg) => {
  console.log('事件触发,参数:', arg);
});

// 发布事件
myEmitter.emit('event', { data: '测试数据' });

EventEmitter的主要方法包括:

  • on(eventName, listener) - 添加订阅
  • once(eventName, listener) - 只订阅一次
  • emit(eventName[, ...args]) - 触发事件
  • removeListener(eventName, listener) - 移除订阅

Redis的发布/订阅实现

Redis提供了完整的Pub/Sub功能,适合分布式系统:

const redis = require('redis');
const publisher = redis.createClient();
const subscriber = redis.createClient();

// 订阅频道
subscriber.on('message', (channel, message) => {
  console.log(`收到 ${channel} 频道的消息: ${message}`);
});
subscriber.subscribe('news');

// 发布消息
publisher.publish('news', '重大新闻内容');
publisher.publish('news', '另一条新闻');

Redis Pub/Sub特点:

  • 支持模式匹配订阅(psubscribe)
  • 消息是即时的,不会持久化
  • 适合广播式通信

高级应用场景

微服务间通信

在微服务架构中,Pub/Sub模式可以实现服务解耦:

// 订单服务(发布者)
const orderCreated = (order) => {
  redisClient.publish('orders.created', JSON.stringify(order));
};

// 支付服务(订阅者)
redisSubscriber.subscribe('orders.created');
redisSubscriber.on('message', (channel, message) => {
  if(channel === 'orders.created') {
    processPayment(JSON.parse(message));
  }
});

实时日志处理

// 日志生产者
const logEmitter = new EventEmitter();

function log(level, message) {
  const logEntry = { timestamp: Date.now(), level, message };
  logEmitter.emit('log', logEntry);
}

// 日志消费者(多个)
logEmitter.on('log', (entry) => {
  if(entry.level === 'error') {
    sendErrorToSlack(entry);
  }
});

logEmitter.on('log', (entry) => {
  writeToFile(entry);
});

性能优化与注意事项

  1. 内存泄漏:忘记移除监听器会导致内存泄漏
// 错误示例
emitter.on('event', () => { /*...*/ });

// 正确做法
const listener = () => { /*...*/ };
emitter.on('event', listener);
emitter.removeListener('event', listener);
  1. 错误处理:EventEmitter默认会抛出错误导致进程退出
myEmitter.on('error', (err) => {
  console.error('发生错误:', err);
});
  1. 性能瓶颈:大量事件可能阻塞事件循环
// 使用setImmediate拆分长时间运行的处理程序
emitter.on('data', (chunk) => {
  setImmediate(() => {
    processChunk(chunk);
  });
});

与其他模式的比较

与观察者模式的区别

虽然相似,但两者有本质区别:

  • 观察者模式:观察者直接订阅目标
  • 发布/订阅模式:通过消息代理/频道间接通信
// 观察者模式示例
class Subject {
  constructor() {
    this.observers = [];
  }
  addObserver(obs) {
    this.observers.push(obs);
  }
  notify(data) {
    this.observers.forEach(obs => obs.update(data));
  }
}

// 发布/订阅模式更松耦合
eventBus.publish('event', data);
eventBus.subscribe('event', handler);

与消息队列的区别

消息队列通常保证消息传递,而Pub/Sub是即时的:

  • 消息队列:消息持久化,消费者主动拉取
  • Pub/Sub:实时推送,无持久化保证

实际项目中的最佳实践

  1. 命名规范:使用清晰的命名空间
// 推荐
publish('user:created', userData);

// 不推荐
publish('newUser', userData);
  1. 中间件模式:在发布前后添加处理逻辑
const middleware = {
  beforePublish: (event, data) => {
    data.timestamp = Date.now();
    return [event, data];
  },
  afterSubscribe: (handler) => {
    return async (data) => {
      try {
        await handler(data);
      } catch (err) {
        logger.error(err);
      }
    };
  }
};
  1. TypeScript支持:增强类型安全
interface Events {
  'user.registered': { id: string; email: string };
  'order.completed': { orderId: string; amount: number };
}

class TypedEmitter extends EventEmitter {
  emit<T extends keyof Events>(event: T, data: Events[T]): boolean;
  on<T extends keyof Events>(event: T, listener: (data: Events[T]) => void): this;
}

扩展应用:浏览器中的Pub/Sub

虽然Node.js是主要环境,但浏览器中也可以实现:

// 简单的浏览器实现
const bus = {
  events: {},
  subscribe(event, callback) {
    if (!this.events[event]) this.events[event] = [];
    this.events[event].push(callback);
  },
  publish(event, data) {
    (this.events[event] || []).forEach(cb => cb(data));
  }
};

// 使用示例
bus.subscribe('cart.updated', (items) => {
  updateCartUI(items);
});

// 其他模块
bus.publish('cart.updated', currentCartItems);

现代JavaScript中的替代方案

随着语言发展,一些新特性可以替代传统Pub/Sub:

  1. RxJS Observables
import { Subject } from 'rxjs';

const eventBus = new Subject();

// 订阅
const subscription = eventBus.subscribe({
  next: (data) => console.log('收到数据:', data)
});

// 发布
eventBus.next({ type: 'DATA_UPDATE', payload: {} });

// 取消订阅
subscription.unsubscribe();
  1. React Context API
// 创建上下文
const EventContext = createContext();

// 提供者组件
function EventProvider({ children }) {
  const [events, setEvents] = useState({});
  
  const publish = (event, data) => {
    setEvents(prev => ({
      ...prev,
      [event]: [...(prev[event] || []), data]
    }));
  };

  return (
    <EventContext.Provider value={{ events, publish }}>
      {children}
    </EventContext.Provider>
  );
}

测试策略

确保Pub/Sub系统可靠性的测试方法:

describe('Pub/Sub系统', () => {
  let emitter;
  
  beforeEach(() => {
    emitter = new EventEmitter();
  });

  it('应该正确传递事件', (done) => {
    emitter.on('test', (data) => {
      expect(data).toEqual({ key: 'value' });
      done();
    });
    emitter.emit('test', { key: 'value' });
  });

  it('应该支持多个订阅者', () => {
    const mock1 = jest.fn();
    const mock2 = jest.fn();
    
    emitter.on('multi', mock1);
    emitter.on('multi', mock2);
    emitter.emit('multi', {});
    
    expect(mock1).toHaveBeenCalled();
    expect(mock2).toHaveBeenCalled();
  });
});

分布式系统中的挑战

在跨服务场景下,Pub/Sub面临额外挑战:

  1. 消息顺序保证:不同消费者可能以不同顺序接收消息
  2. 至少一次交付:确保消息不丢失
  3. 死信队列:处理无法消费的消息
// 使用RabbitMQ的示例
const amqp = require('amqplib');

async function setup() {
  const conn = await amqp.connect('amqp://localhost');
  const channel = await conn.createChannel();
  
  // 声明交换器
  await channel.assertExchange('logs', 'fanout', { durable: false });
  
  // 发布消息
  channel.publish('logs', '', Buffer.from('Hello World!'));
  
  // 消费消息
  const { queue } = await channel.assertQueue('', { exclusive: true });
  channel.bindQueue(queue, 'logs', '');
  channel.consume(queue, (msg) => {
    console.log(msg.content.toString());
  }, { noAck: true });
}

性能监控与指标

对于生产环境,需要监控的关键指标:

  1. 事件吞吐量:单位时间内处理的事件数
  2. 订阅者延迟:从发布到处理的时间
  3. 错误率:失败的事件处理比例
// 添加监控装饰器
function monitoredEmitter(emitter) {
  const stats = {
    eventsEmitted: 0,
    handlersExecuted: 0,
    errors: 0
  };

  const originalEmit = emitter.emit;
  emitter.emit = function(event, ...args) {
    stats.eventsEmitted++;
    try {
      originalEmit.call(this, event, ...args);
    } catch (err) {
      stats.errors++;
      throw err;
    }
  };

  emitter.getStats = () => ({ ...stats });
  return emitter;
}

我的名片

网名:~川~

岗位:console.log 调试员

坐标:重庆市-九龙坡区

邮箱:cc@qdcc.cn

沙漏人生

站点信息

  • 建站时间:2013/03/16
  • 本站运行
  • 文章数量
  • 总访问量
微信公众号
每次关注
都是向财富自由迈进的一步