jack 5a5b93bf26 update 3 meses atrás
..
01-路由模式(Routing) ea166eb170 update 4 meses atrás
02-主题模式(Topic) ea166eb170 update 4 meses atrás
03-头部模式(Header) ea166eb170 update 4 meses atrás
04-任务队列(Work Queue) ea166eb170 update 4 meses atrás
05-RPC模式 ea166eb170 update 4 meses atrás
06-发布订阅模式(fanout) ea166eb170 update 4 meses atrás
07-延迟队列(Delayed Message) ea166eb170 update 4 meses atrás
08-优先级队列(Priority Queue) ea166eb170 update 4 meses atrás
09-死信队列(DLX DLQ) 5a5b93bf26 update 3 meses atrás
10-惰性队列(Lazy Queue) ea166eb170 update 4 meses atrás
11-仲裁队列(Quorum Queue) ea166eb170 update 4 meses atrás
12-流式日志(Stream) ea166eb170 update 4 meses atrás
Readme.md e4e258cbbf update 4 meses atrás

Readme.md

模式/特性 交换机类型 关键词 典型场景
Simple 默认直连 单队列 点对点
Work Queue 默认直连 prefetch 并行任务
Pub/Sub fanout 广播 群发通知
Routing direct routing_key 精确 日志分级
Topic topic 通配符 多维度路由
Header headers 键值对匹配 元数据路由
RPC 默认直连 reply_to+corr_id 远程调用
Delay x-delayed-message x-delay 定时任务
Priority 任意 priority 插队处理
DLX 任意 x-dead-letter-exchange 失败转移
Lazy 任意 x-queue-mode=lazy 海量堆积
Quorum 任意 x-queue-type=quorum 高可用
Stream stream offset 日志流

────────────────── 1️⃣ Routing(路由)模式 交换机类型:direct(直连) 只把消息送到 routing_key 精确匹配 的队列。 场景:按业务类型分 queue,比如日志分级(info、warn、error)。 最小 demo 指路: producer:channel.basic_publish(exchange='direct_logs', routing_key='error', body=…) consumer:channel.queue_bind(queue=xxx, exchange='direct_logs', routing_key='error') ────────────────── 2️⃣ Topic(主题)模式 交换机类型:topic routing_key 用 “点分单词” 通配符匹配(* 单层,# 多层)。 场景:日志按模块+级别(order.info、order.error、payment.)或 IM 离线消息多租户。 最小 demo 指路: producer:routing_key = "order.created.us" consumer:bind routing_key = "order..us" or "order.#" ────────────────── 3️⃣ Header(头部)模式 交换机类型:headers 路由规则由消息头里的键值对决定,完全忽略 routing_key。 场景:需要根据多个元数据字段(如 x-version:1,x-env:prod)灵活路由,且不想把信息放进 routing_key。 最小 demo 指路: producer:properties=pika.BasicProperties(headers={'x-version':'1','x-env':'prod'}) consumer:channel.queue_bind(queue=xxx, exchange='header_logs', arguments={'x-match':'all','x-version':'1','x-env':'prod'}) ────────────────── 4️⃣ Work Queue(竞争消费 / 任务队列) 本质:简单队列 + 多消费者 + basic_qos=1 把耗时任务均匀分发给多个 worker,一条消息只被一个消费者拿到。 场景:图片压缩、批量邮件、爬虫任务。 你已经学过,就是简单模式里加 prefetch_count 那段。 ────────────────── 5️⃣ RPC 模式 交换机类型:默认直连("") 客户端像调本地函数一样,把“函数名 + 参数”塞进队列;服务端算完后把结果塞回客户端的私有回调队列,用 correlation_id 保证“一问一答”不串线。 关键要素 队列:客户端创建独占临时队列当“回执邮箱”;服务端监听固定队列 rpc_queue。 消息属性:reply_to(回执邮箱地址)、correlation_id(请求 ID)。 阻塞/轮询:客户端用 process_data_events() 或 start_consuming() 等结果。 典型场景 微服务之间“同步”调用(下单服务 → 库存服务扣库存并返回剩余数量)。 异构语言互通(Python 调 Java 的算法库)。 ────────────────── 6️⃣ 发布订阅 / Fanout 模式 交换机类型:fanout 消息来了就广播给所有绑定到这个交换机的队列,每条消息可被所有消费者各自收到一次。 关键要素 队列:通常由消费者在绑定时自动生成(exclusive=True 或 queue=""),生命周期跟随连接。 绑定:任何队列只要 queue_bind(exchange='logs') 就能收到广播。 消息丢失:交换机没有队列绑定时,消息直接丢弃。 典型场景 群发通知 / 公告(所有在线用户收到同一推送)。 日志汇聚:一个交换器 logs 同时发给“文件落盘队列”“监控大屏队列”“实时告警队列”。 ────────────────── 7️⃣ 延迟队列(Delayed Message) RabbitMQ 官方插件:rabbitmq_delayed_message_exchange 让消息 延迟 N 秒 后才投递。 场景:订单 15 分钟未支付自动关单、定时提醒。 使用:声明交换机 type = 'x-delayed-message',发消息时 header 加 x-delay=15000。 ────────────────── 8️⃣ 优先级队列(Priority Queue) 官方插件已内置。 队列里高 priority 的消息先被消费。 场景:VIP 用户任务插队。 使用:queue_declare(arguments={'x-max-priority':10}),publish 时加 priority=5。 ────────────────── 9️⃣ 死信队列(DLX / DLQ) 消息“死”了(被拒绝、TTL 到期、队列满)就自动转到另一个交换机 → 死信队列。 场景:失败任务人工审计、防丢消息。 使用:queue_declare(arguments={'x-dead-letter-exchange':'dlx'}),再建 dlx 绑定的队列。 ────────────────── 🔟 惰性队列(Lazy Queue) 消息优先落磁盘,百万级堆积 场景下内存更稳。 使用:queue_declare(arguments={'x-queue-mode':'lazy'}) ────────────────── 1️⃣1️⃣ 仲裁队列(Quorum Queue) RabbitMQ 3.8+ 新特性,替代经典镜像队列。 Raft 协议复制,高可用 + 强一致,牺牲少量吞吐。 使用:queue_declare(queue='q1', durable=True, arguments={'x-queue-type':'quorum'}) ────────────────── 1️⃣2️⃣ Stream(流式日志) RabbitMQ 3.11+ 实验特性。 Kafka-like 的 append-only log,支持大量客户端、回溯消费。 使用:stream 声明 + offset tracking。