consumer_routing.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. # -*- coding: utf-8 -*-
  2. import pika
  3. import sys
  4. # 连接参数
  5. user_info = pika.PlainCredentials('user', 'J70e6K7BRrxrU1dO')
  6. connection = pika.BlockingConnection(
  7. pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local',
  8. 5672,
  9. '/',
  10. user_info)
  11. )
  12. channel = connection.channel()
  13. # 1. 声明同一个 direct 交换机
  14. channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
  15. # 2. 创建临时队列
  16. result = channel.queue_declare(queue='', exclusive=True)
  17. queue_name = result.method.queue
  18. # 3. 从命令行读取要绑定的 routing_key,可多个
  19. severities = sys.argv[1:]
  20. if not severities:
  21. sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
  22. sys.exit(1)
  23. for severity in severities:
  24. channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
  25. print(' [*] Waiting for logs. To exit press CTRL+C')
  26. # 4. 回调
  27. def callback(ch, method, properties, body):
  28. print(f" [x] {method.routing_key}:{body.decode()}")
  29. channel.basic_consume(
  30. queue=queue_name,
  31. on_message_callback=callback,
  32. auto_ack=True
  33. )
  34. channel.start_consuming()