consumer_delay.py 897 B

12345678910111213141516171819202122232425262728
  1. # -*- coding: utf-8 -*-
  2. import pika
  3. user_info = pika.PlainCredentials('user', 'J70e6K7BRrxrU1dO')
  4. connection = pika.BlockingConnection(
  5. pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
  6. )
  7. channel = connection.channel()
  8. # 1. 声明同样的延迟交换机
  9. channel.exchange_declare(
  10. exchange='delayed_exchange',
  11. exchange_type='x-delayed-message',
  12. arguments={'x-delayed-type': 'direct'}
  13. )
  14. # 2. 创建队列并绑定
  15. channel.queue_declare(queue='delayed_queue', durable=True)
  16. channel.queue_bind(exchange='delayed_exchange', queue='delayed_queue', routing_key='delayed')
  17. print(' [*] Waiting for delayed messages...')
  18. # 3. 回调
  19. def callback(ch, method, properties, body):
  20. print(' [x] Received after delay:', body.decode())
  21. channel.basic_consume(queue='delayed_queue', on_message_callback=callback, auto_ack=True)
  22. channel.start_consuming()