producer_delay.py 981 B

12345678910111213141516171819202122232425262728293031
  1. # -*- coding: utf-8 -*-
  2. import pika
  3. import sys
  4. user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
  5. connection = pika.BlockingConnection(
  6. pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
  7. )
  8. channel = connection.channel()
  9. # 1. 声明延迟交换机(插件必须启用)
  10. channel.exchange_declare(
  11. exchange='delayed_exchange',
  12. exchange_type='x-delayed-message',
  13. arguments={'x-delayed-type': 'direct'} # 交换机内部用直连
  14. )
  15. # 2. 从命令行读取延迟时间(毫秒)和消息
  16. delay_ms = int(sys.argv[1]) if len(sys.argv) > 1 else 5000
  17. message = ' '.join(sys.argv[2:]) or f'Hello after {delay_ms} ms'
  18. channel.basic_publish(
  19. exchange='delayed_exchange',
  20. routing_key='delayed',
  21. body=message,
  22. properties=pika.BasicProperties(
  23. headers={'x-delay': delay_ms} # 关键:延迟投递时间
  24. )
  25. )
  26. print(f" [x] Sent message to be delayed {delay_ms} ms: {message}")
  27. connection.close()