| 模式/特性 |
交换机类型 |
关键词 |
典型场景 |
| Simple |
默认直连 |
单队列 |
点对点 |
| Work Queue |
默认直连 |
prefetch |
并行任务 |
| Pub/Sub |
fanout |
广播 |
群发通知 |
| Routing |
direct |
routing_key 精确 |
日志分级 |
| Topic |
topic |
通配符 |
多维度路由 |
| Header |
headers |
键值对匹配 |
元数据路由 |
| RPC |
默认直连 |
reply_to+corr_id |
远程调用 |
| Delay |
x-delayed-message |
x-delay |
定时任务 |
| Priority |
任意 |
priority |
插队处理 |
| DLX |
任意 |
x-dead-letter-exchange |
失败转移 |
| Lazy |
任意 |
x-queue-mode=lazy |
海量堆积 |
| Quorum |
任意 |
x-queue-type=quorum |
高可用 |
| Stream |
stream |
offset |
日志流 |
──────────────────
1️⃣ Routing(路由)模式
交换机类型:direct(直连)
只把消息送到 routing_key 精确匹配 的队列。
场景:按业务类型分 queue,比如日志分级(info、warn、error)。
最小 demo 指路:
producer:channel.basic_publish(exchange='direct_logs', routing_key='error', body=…)
consumer:channel.queue_bind(queue=xxx, exchange='direct_logs', routing_key='error')
──────────────────
2️⃣ Topic(主题)模式
交换机类型:topic
routing_key 用 “点分单词” 通配符匹配(* 单层,# 多层)。
场景:日志按模块+级别(order.info、order.error、payment.)或 IM 离线消息多租户。
最小 demo 指路:
producer:routing_key = "order.created.us"
consumer:bind routing_key = "order..us" or "order.#"
──────────────────
3️⃣ Header(头部)模式
交换机类型:headers
路由规则由消息头里的键值对决定,完全忽略 routing_key。
场景:需要根据多个元数据字段(如 x-version:1,x-env:prod)灵活路由,且不想把信息放进 routing_key。
最小 demo 指路:
producer:properties=pika.BasicProperties(headers={'x-version':'1','x-env':'prod'})
consumer:channel.queue_bind(queue=xxx, exchange='header_logs', arguments={'x-match':'all','x-version':'1','x-env':'prod'})
──────────────────
4️⃣ Work Queue(竞争消费 / 任务队列)
本质:简单队列 + 多消费者 + basic_qos=1
把耗时任务均匀分发给多个 worker,一条消息只被一个消费者拿到。
场景:图片压缩、批量邮件、爬虫任务。
你已经学过,就是简单模式里加 prefetch_count 那段。
──────────────────
5️⃣ RPC 模式
交换机类型:默认直连("")
客户端像调本地函数一样,把“函数名 + 参数”塞进队列;服务端算完后把结果塞回客户端的私有回调队列,用 correlation_id 保证“一问一答”不串线。
关键要素
队列:客户端创建独占临时队列当“回执邮箱”;服务端监听固定队列 rpc_queue。
消息属性:reply_to(回执邮箱地址)、correlation_id(请求 ID)。
阻塞/轮询:客户端用 process_data_events() 或 start_consuming() 等结果。
典型场景
微服务之间“同步”调用(下单服务 → 库存服务扣库存并返回剩余数量)。
异构语言互通(Python 调 Java 的算法库)。
──────────────────
6️⃣ 发布订阅 / Fanout 模式
交换机类型:fanout
消息来了就广播给所有绑定到这个交换机的队列,每条消息可被所有消费者各自收到一次。
关键要素
队列:通常由消费者在绑定时自动生成(exclusive=True 或 queue=""),生命周期跟随连接。
绑定:任何队列只要 queue_bind(exchange='logs') 就能收到广播。
消息丢失:交换机没有队列绑定时,消息直接丢弃。
典型场景
群发通知 / 公告(所有在线用户收到同一推送)。
日志汇聚:一个交换器 logs 同时发给“文件落盘队列”“监控大屏队列”“实时告警队列”。
──────────────────
7️⃣ 延迟队列(Delayed Message)
RabbitMQ 官方插件:rabbitmq_delayed_message_exchange
让消息 延迟 N 秒 后才投递。
场景:订单 15 分钟未支付自动关单、定时提醒。
使用:声明交换机 type = 'x-delayed-message',发消息时 header 加 x-delay=15000。
──────────────────
8️⃣ 优先级队列(Priority Queue)
官方插件已内置。
队列里高 priority 的消息先被消费。
场景:VIP 用户任务插队。
使用:queue_declare(arguments={'x-max-priority':10}),publish 时加 priority=5。
──────────────────
9️⃣ 死信队列(DLX / DLQ)
消息“死”了(被拒绝、TTL 到期、队列满)就自动转到另一个交换机 → 死信队列。
场景:失败任务人工审计、防丢消息。
使用:queue_declare(arguments={'x-dead-letter-exchange':'dlx'}),再建 dlx 绑定的队列。
──────────────────
🔟 惰性队列(Lazy Queue)
消息优先落磁盘,百万级堆积 场景下内存更稳。
使用:queue_declare(arguments={'x-queue-mode':'lazy'})
──────────────────
1️⃣1️⃣ 仲裁队列(Quorum Queue)
RabbitMQ 3.8+ 新特性,替代经典镜像队列。
Raft 协议复制,高可用 + 强一致,牺牲少量吞吐。
使用:queue_declare(queue='q1', durable=True, arguments={'x-queue-type':'quorum'})
──────────────────
1️⃣2️⃣ Stream(流式日志)
RabbitMQ 3.11+ 实验特性。
Kafka-like 的 append-only log,支持大量客户端、回溯消费。
使用:stream 声明 + offset tracking。