rpc_client.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import pika
  2. import uuid
  3. import json
  4. class RpcClient:
  5. def __init__(self):
  6. user_info = pika.PlainCredentials('user', 'J70e6K7BRrxrU1dO')
  7. self.connection = pika.BlockingConnection(
  8. pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
  9. )
  10. self.channel = self.connection.channel()
  11. # 1. 创建仅供自己使用的回调队列
  12. result = self.channel.queue_declare(queue='', exclusive=True)
  13. self.callback_queue = result.method.queue
  14. # 2. 监听回调队列
  15. self.channel.basic_consume(
  16. queue=self.callback_queue,
  17. on_message_callback=self.on_response,
  18. auto_ack=True
  19. )
  20. self.response = None
  21. self.corr_id = None
  22. def on_response(self, ch, method, props, body):
  23. # 3. 只接收自己的响应
  24. if props.correlation_id == self.corr_id:
  25. self.response = json.loads(body)['result']
  26. def call(self, func, a, b, timeout=5):
  27. self.response = None
  28. self.corr_id = str(uuid.uuid4())
  29. # 4. 发请求
  30. self.channel.basic_publish(
  31. exchange='',
  32. routing_key='rpc_queue',
  33. properties=pika.BasicProperties(
  34. reply_to=self.callback_queue, # 服务端把结果发回这里
  35. correlation_id=self.corr_id # 请求 id
  36. ),
  37. body=json.dumps({'func': func, 'params': {'a': a, 'b': b}})
  38. )
  39. # 5. 阻塞直到拿到结果(或超时)
  40. start = time.time()
  41. while self.response is None and (time.time() - start) < timeout:
  42. self.connection.process_data_events()
  43. if self.response is None:
  44. raise TimeoutError('RPC 调用超时')
  45. return self.response
  46. # ----------------- demo -----------------
  47. if __name__ == '__main__':
  48. import time
  49. rpc = RpcClient()
  50. try:
  51. print('调用 add(10, 5)...')
  52. result = rpc.call('add', 10, 5)
  53. print('服务端返回:', result)
  54. print('调用 sub(20, 8)...')
  55. result = rpc.call('sub', 20, 8)
  56. print('服务端返回:', result)
  57. except Exception as e:
  58. print('错误:', e)