consumer_auto_ack.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. # coding=utf-8
  2. ### 消费者
  3. # 如果在消费者获取到队列里的消息后,在回调函数的处理过程中,
  4. # 消费者突然出错或程序崩溃等异常,那么就会造成这条消息并未被实际正常的处理掉。
  5. # 为了解决这个问题,我们只需在消费者basic_consume(auto_ack=False),
  6. # 并在回调函数中设置手动应答即可ch.basic_ack(delivery_tag=method.delivery_tag)
  7. import pika
  8. import time
  9. user_info = pika.PlainCredentials('user', 'J70e6K7BRrxrU1dO')
  10. connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info))
  11. channel = connection.channel()
  12. # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,生产者和消费者都做这一步的好处是
  13. # 这样生产者和消费者就没有必要的先后启动顺序了
  14. channel.queue_declare(queue='queue')
  15. # 回调函数
  16. def callback(ch, method, properties, body):
  17. time.sleep(5)
  18. ch.basic_ack(delivery_tag=method.delivery_tag)
  19. print('消费者收到:{}'.format(body.decode('utf-8')))
  20. # channel: 包含channel的一切属性和方法
  21. # method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
  22. # properties: basic_publish 通过 properties 传入的参数
  23. # body: basic_publish发送的消息
  24. channel.basic_consume(queue='queue', # 接收指定queue的消息
  25. auto_ack=False, # 指定为False,表示取消自动应答,交由回调函数手动应答
  26. on_message_callback=callback # 设置收到消息的回调函数
  27. )
  28. # 应答的本质是告诉消息队列可以将这条消息销毁了
  29. print('Waiting for messages. To exit press CTRL+C')
  30. # 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
  31. channel.start_consuming()