| 1234567891011121314151617181920212223242526272829303132333435363738394041 |
- # -*- coding: utf-8 -*-
- import pika
- USER = pika.PlainCredentials('user', 'J70e6K7BRrxrU1dO')
- PARAMS = pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', USER)
- conn = pika.BlockingConnection(PARAMS)
- ch = conn.channel()
- # 1. 声明死信交换机 & 死信队列
- dlx_name = 'dlx_exchange'
- dlq_name = 'dlq'
- ch.exchange_declare(exchange=dlx_name, exchange_type='direct')
- ch.queue_declare(queue=dlq_name, durable=True)
- ch.queue_bind(exchange=dlx_name, queue=dlq_name, routing_key='dead')
- # 2. 声明业务队列,并绑定 DLX
- biz_queue = 'business_queue'
- biz_args = {
- 'x-dead-letter-exchange': dlx_name, # DLX 名称
- 'x-dead-letter-routing-key': 'dead', # 进入 DLX 时使用的 routing_key
- 'x-message-ttl': 10000, # 消息 10 秒过期(演示 TTL 死信)
- 'x-max-length': 3 # 队列最大长度 3(演示溢出死信)
- }
- ch.queue_declare(queue=biz_queue, durable=True, arguments=biz_args)
- print(' [*] Waiting business messages, will nack odd numbers. Ctrl+C to quit.')
- def callback(ch, method, props, body):
- msg = body.decode()
- print(f' [>] Received: {msg}')
- if int(msg) % 2: # 奇数 -> 拒绝并死信
- ch.basic_reject(method.delivery_tag, requeue=False)
- print(f' [!] Rejected {msg} -> DLQ')
- else:
- ch.basic_ack(method.delivery_tag)
- print(f' [√] Processed {msg}')
- ch.basic_qos(prefetch_count=1)
- ch.basic_consume(queue=biz_queue, on_message_callback=callback)
- ch.start_consuming()
|