| 1234567891011121314151617181920212223242526272829303132 |
- # -*- coding: utf-8 -*-
- import pika
- import sys
- user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
- connection = pika.BlockingConnection(
- pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
- )
- channel = connection.channel()
- # 1. 声明同一个交换机
- channel.exchange_declare(exchange='header_logs', exchange_type='headers')
- # 2. 创建临时队列
- result = channel.queue_declare(queue='', exclusive=True)
- queue_name = result.method.queue
- # 3. 绑定:x-match 可以是 'all'(全匹配)或 'any'(任一匹配)
- bind_args = {
- 'x-match': 'all', # 所有键值都匹配才收
- 'x-version': '1',
- 'x-env': 'prod'
- }
- channel.queue_bind(exchange='header_logs', queue=queue_name, arguments=bind_args)
- print(' [*] Waiting for messages matching', bind_args)
- # 4. 回调
- def callback(ch, method, properties, body):
- print(' [x] %r' % body.decode())
- channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
- channel.start_consuming()
|