consumer_headers.py 1.0 KB

1234567891011121314151617181920212223242526272829303132
  1. # -*- coding: utf-8 -*-
  2. import pika
  3. import sys
  4. user_info = pika.PlainCredentials('user', 'J70e6K7BRrxrU1dO')
  5. connection = pika.BlockingConnection(
  6. pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
  7. )
  8. channel = connection.channel()
  9. # 1. 声明同一个交换机
  10. channel.exchange_declare(exchange='header_logs', exchange_type='headers')
  11. # 2. 创建临时队列
  12. result = channel.queue_declare(queue='', exclusive=True)
  13. queue_name = result.method.queue
  14. # 3. 绑定:x-match 可以是 'all'(全匹配)或 'any'(任一匹配)
  15. bind_args = {
  16. 'x-match': 'all', # 所有键值都匹配才收
  17. 'x-version': '1',
  18. 'x-env': 'prod'
  19. }
  20. channel.queue_bind(exchange='header_logs', queue=queue_name, arguments=bind_args)
  21. print(' [*] Waiting for messages matching', bind_args)
  22. # 4. 回调
  23. def callback(ch, method, properties, body):
  24. print(' [x] %r' % body.decode())
  25. channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
  26. channel.start_consuming()