| 12345678910111213141516171819202122232425262728 |
- # -*- coding: utf-8 -*-
- import pika
- user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
- connection = pika.BlockingConnection(
- pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
- )
- channel = connection.channel()
- # 1. 声明同样的延迟交换机
- channel.exchange_declare(
- exchange='delayed_exchange',
- exchange_type='x-delayed-message',
- arguments={'x-delayed-type': 'direct'}
- )
- # 2. 创建队列并绑定
- channel.queue_declare(queue='delayed_queue', durable=True)
- channel.queue_bind(exchange='delayed_exchange', queue='delayed_queue', routing_key='delayed')
- print(' [*] Waiting for delayed messages...')
- # 3. 回调
- def callback(ch, method, properties, body):
- print(' [x] Received after delay:', body.decode())
- channel.basic_consume(queue='delayed_queue', on_message_callback=callback, auto_ack=True)
- channel.start_consuming()
|