| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- import pika
- import uuid
- import json
- class RpcClient:
- def __init__(self):
- user_info = pika.PlainCredentials('user', 'J70e6K7BRrxrU1dO')
- self.connection = pika.BlockingConnection(
- pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
- )
- self.channel = self.connection.channel()
- # 1. 创建仅供自己使用的回调队列
- result = self.channel.queue_declare(queue='', exclusive=True)
- self.callback_queue = result.method.queue
- # 2. 监听回调队列
- self.channel.basic_consume(
- queue=self.callback_queue,
- on_message_callback=self.on_response,
- auto_ack=True
- )
- self.response = None
- self.corr_id = None
- def on_response(self, ch, method, props, body):
- # 3. 只接收自己的响应
- if props.correlation_id == self.corr_id:
- self.response = json.loads(body)['result']
- def call(self, func, a, b, timeout=5):
- self.response = None
- self.corr_id = str(uuid.uuid4())
- # 4. 发请求
- self.channel.basic_publish(
- exchange='',
- routing_key='rpc_queue',
- properties=pika.BasicProperties(
- reply_to=self.callback_queue, # 服务端把结果发回这里
- correlation_id=self.corr_id # 请求 id
- ),
- body=json.dumps({'func': func, 'params': {'a': a, 'b': b}})
- )
- # 5. 阻塞直到拿到结果(或超时)
- start = time.time()
- while self.response is None and (time.time() - start) < timeout:
- self.connection.process_data_events()
- if self.response is None:
- raise TimeoutError('RPC 调用超时')
- return self.response
- # ----------------- demo -----------------
- if __name__ == '__main__':
- import time
- rpc = RpcClient()
- try:
- print('调用 add(10, 5)...')
- result = rpc.call('add', 10, 5)
- print('服务端返回:', result)
- print('调用 sub(20, 8)...')
- result = rpc.call('sub', 20, 8)
- print('服务端返回:', result)
- except Exception as e:
- print('错误:', e)
|