rpc_server.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import pika
  2. import json
  3. # 连接信息
  4. user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
  5. connection = pika.BlockingConnection(
  6. pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
  7. )
  8. channel = connection.channel()
  9. # 1. 声明固定队列
  10. channel.queue_declare(queue='rpc_queue')
  11. # 2. 业务函数
  12. def add(a, b):
  13. return a + b
  14. def sub(a, b):
  15. return a - b
  16. func_map = {'add': add, 'sub': sub}
  17. def on_request(ch, method, props, body):
  18. # 3. 解析请求
  19. req = json.loads(body)
  20. func_name = req['func']
  21. params = req['params']
  22. a, b = params['a'], params['b']
  23. # 4. 本地计算
  24. result = func_map[func_name](a, b)
  25. print(f'收到 {func_name}({a},{b}) -> {result}')
  26. # 5. 回结果
  27. ch.basic_publish(
  28. exchange='',
  29. routing_key=props.reply_to,
  30. properties=pika.BasicProperties(
  31. correlation_id=props.correlation_id # 原样返回
  32. ),
  33. body=json.dumps({'result': result})
  34. )
  35. ch.basic_ack(delivery_tag=method.delivery_tag)
  36. # 6. 公平分发
  37. channel.basic_qos(prefetch_count=1)
  38. channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
  39. print('服务端已启动,等待 RPC 请求...')
  40. channel.start_consuming()