| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- import pika
- import json
- # 连接信息
- user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
- connection = pika.BlockingConnection(
- pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
- )
- channel = connection.channel()
- # 1. 声明固定队列
- channel.queue_declare(queue='rpc_queue')
- # 2. 业务函数
- def add(a, b):
- return a + b
- def sub(a, b):
- return a - b
- func_map = {'add': add, 'sub': sub}
- def on_request(ch, method, props, body):
- # 3. 解析请求
- req = json.loads(body)
- func_name = req['func']
- params = req['params']
- a, b = params['a'], params['b']
- # 4. 本地计算
- result = func_map[func_name](a, b)
- print(f'收到 {func_name}({a},{b}) -> {result}')
- # 5. 回结果
- ch.basic_publish(
- exchange='',
- routing_key=props.reply_to,
- properties=pika.BasicProperties(
- correlation_id=props.correlation_id # 原样返回
- ),
- body=json.dumps({'result': result})
- )
- ch.basic_ack(delivery_tag=method.delivery_tag)
- # 6. 公平分发
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
- print('服务端已启动,等待 RPC 请求...')
- channel.start_consuming()
|