import pika import uuid import json class RpcClient: def __init__(self): user_info = pika.PlainCredentials('user', 'J70e6K7BRrxrU1dO') self.connection = pika.BlockingConnection( pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info) ) self.channel = self.connection.channel() # 1. 创建仅供自己使用的回调队列 result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue # 2. 监听回调队列 self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True ) self.response = None self.corr_id = None def on_response(self, ch, method, props, body): # 3. 只接收自己的响应 if props.correlation_id == self.corr_id: self.response = json.loads(body)['result'] def call(self, func, a, b, timeout=5): self.response = None self.corr_id = str(uuid.uuid4()) # 4. 发请求 self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, # 服务端把结果发回这里 correlation_id=self.corr_id # 请求 id ), body=json.dumps({'func': func, 'params': {'a': a, 'b': b}}) ) # 5. 阻塞直到拿到结果(或超时) start = time.time() while self.response is None and (time.time() - start) < timeout: self.connection.process_data_events() if self.response is None: raise TimeoutError('RPC 调用超时') return self.response # ----------------- demo ----------------- if __name__ == '__main__': import time rpc = RpcClient() try: print('调用 add(10, 5)...') result = rpc.call('add', 10, 5) print('服务端返回:', result) print('调用 sub(20, 8)...') result = rpc.call('sub', 20, 8) print('服务端返回:', result) except Exception as e: print('错误:', e)