# -*- coding: utf-8 -*- import pika USER = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh') PARAMS = pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', USER) conn = pika.BlockingConnection(PARAMS) ch = conn.channel() # 1. 声明死信交换机 & 死信队列 dlx_name = 'dlx_exchange' dlq_name = 'dlq' ch.exchange_declare(exchange=dlx_name, exchange_type='direct') ch.queue_declare(queue=dlq_name, durable=True) ch.queue_bind(exchange=dlx_name, queue=dlq_name, routing_key='dead') # 2. 声明业务队列,并绑定 DLX biz_queue = 'business_queue' biz_args = { 'x-dead-letter-exchange': dlx_name, # DLX 名称 'x-dead-letter-routing-key': 'dead', # 进入 DLX 时使用的 routing_key 'x-message-ttl': 10000, # 消息 10 秒过期(演示 TTL 死信) 'x-max-length': 3 # 队列最大长度 3(演示溢出死信) } ch.queue_declare(queue=biz_queue, durable=True, arguments=biz_args) print(' [*] Waiting business messages, will nack odd numbers. Ctrl+C to quit.') def callback(ch, method, props, body): msg = body.decode() print(f' [>] Received: {msg}') if int(msg) % 2: # 奇数 -> 拒绝并死信 ch.basic_reject(method.delivery_tag, requeue=False) print(f' [!] Rejected {msg} -> DLQ') else: ch.basic_ack(method.delivery_tag) print(f' [√] Processed {msg}') ch.basic_qos(prefetch_count=1) ch.basic_consume(queue=biz_queue, on_message_callback=callback) ch.start_consuming()