consumer.py 1.4 KB

123456789101112131415161718192021222324252627282930313233
  1. # coding=utf-8
  2. ### 消费者
  3. import pika
  4. user_info = pika.PlainCredentials('user', 'J70e6K7BRrxrU1dO')
  5. connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info))
  6. channel = connection.channel()
  7. # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,生产者和消费者都做这一步的好处是
  8. # 这样生产者和消费者就没有必要的先后启动顺序了
  9. channel.queue_declare(queue='hello')
  10. # 回调函数
  11. def callback(ch, method, properties, body):
  12. print('消费者收到:{}'.format(body))
  13. # channel: 包含channel的一切属性和方法
  14. # method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
  15. # properties: basic_publish 通过 properties 传入的参数
  16. # body: basic_publish发送的消息
  17. channel.basic_consume(queue='hello', # 接收指定queue的消息
  18. auto_ack=True, # 指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息
  19. on_message_callback=callback # 设置收到消息的回调函数
  20. )
  21. print('Waiting for messages. To exit press CTRL+C')
  22. # 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
  23. channel.start_consuming()