import pika import json # 连接信息 user_info = pika.PlainCredentials('user', 'J70e6K7BRrxrU1dO') connection = pika.BlockingConnection( pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info) ) channel = connection.channel() # 1. 声明固定队列 channel.queue_declare(queue='rpc_queue') # 2. 业务函数 def add(a, b): return a + b def sub(a, b): return a - b func_map = {'add': add, 'sub': sub} def on_request(ch, method, props, body): # 3. 解析请求 req = json.loads(body) func_name = req['func'] params = req['params'] a, b = params['a'], params['b'] # 4. 本地计算 result = func_map[func_name](a, b) print(f'收到 {func_name}({a},{b}) -> {result}') # 5. 回结果 ch.basic_publish( exchange='', routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id # 原样返回 ), body=json.dumps({'result': result}) ) ch.basic_ack(delivery_tag=method.delivery_tag) # 6. 公平分发 channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) print('服务端已启动,等待 RPC 请求...') channel.start_consuming()