consumer_topic.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435
  1. # -*- coding: utf-8 -*-
  2. import pika
  3. import sys
  4. user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
  5. connection = pika.BlockingConnection(
  6. pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
  7. )
  8. channel = connection.channel()
  9. # 1. 声明同一个 topic 交换机
  10. channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
  11. # 2. 创建临时队列
  12. result = channel.queue_declare(queue='', exclusive=True)
  13. queue_name = result.method.queue
  14. # 3. 从命令行读取要监听的 pattern(可多个)
  15. binding_keys = sys.argv[1:]
  16. if not binding_keys:
  17. sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
  18. sys.exit(1)
  19. for binding_key in binding_keys:
  20. channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
  21. print(f" [*] Queue bound with pattern: {binding_key}")
  22. print(' [*] Waiting for logs. To exit press CTRL+C')
  23. # 4. 回调
  24. def callback(ch, method, properties, body):
  25. print(f" [x] {method.routing_key}:{body.decode()}")
  26. channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
  27. channel.start_consuming()