consumer_dlq.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. # -*- coding: utf-8 -*-
  2. import pika
  3. USER = pika.PlainCredentials('user', 'J70e6K7BRrxrU1dO')
  4. PARAMS = pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', USER)
  5. conn = pika.BlockingConnection(PARAMS)
  6. ch = conn.channel()
  7. # 1. 声明死信交换机 & 死信队列
  8. dlx_name = 'dlx_exchange'
  9. dlq_name = 'dlq'
  10. ch.exchange_declare(exchange=dlx_name, exchange_type='direct')
  11. ch.queue_declare(queue=dlq_name, durable=True)
  12. ch.queue_bind(exchange=dlx_name, queue=dlq_name, routing_key='dead')
  13. # 2. 声明业务队列,并绑定 DLX
  14. biz_queue = 'business_queue'
  15. biz_args = {
  16. 'x-dead-letter-exchange': dlx_name, # DLX 名称
  17. 'x-dead-letter-routing-key': 'dead', # 进入 DLX 时使用的 routing_key
  18. 'x-message-ttl': 10000, # 消息 10 秒过期(演示 TTL 死信)
  19. 'x-max-length': 3 # 队列最大长度 3(演示溢出死信)
  20. }
  21. ch.queue_declare(queue=biz_queue, durable=True, arguments=biz_args)
  22. print(' [*] Waiting business messages, will nack odd numbers. Ctrl+C to quit.')
  23. def callback(ch, method, props, body):
  24. msg = body.decode()
  25. print(f' [>] Received: {msg}')
  26. if int(msg) % 2: # 奇数 -> 拒绝并死信
  27. ch.basic_reject(method.delivery_tag, requeue=False)
  28. print(f' [!] Rejected {msg} -> DLQ')
  29. else:
  30. ch.basic_ack(method.delivery_tag)
  31. print(f' [√] Processed {msg}')
  32. ch.basic_qos(prefetch_count=1)
  33. ch.basic_consume(queue=biz_queue, on_message_callback=callback)
  34. ch.start_consuming()