main.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. import httpx
  2. import json
  3. import asyncio
  4. import threading
  5. from concurrent.futures import ThreadPoolExecutor, as_completed
  6. from os.path import expanduser, join, dirname, abspath
  7. from httpx import BasicAuth
  8. from time import sleep
  9. from datetime import datetime
  10. class WorldQuantBrainAPI:
  11. def __init__(self, credentials_file='brain_credentials_copy.txt'):
  12. """初始化API客户端"""
  13. self.credentials_file = expanduser(credentials_file)
  14. self.client = None
  15. self.brain_api_url = 'https://api.worldquantbrain.com'
  16. def login(self):
  17. """登录认证"""
  18. # Load credentials
  19. with open(self.credentials_file) as f:
  20. credentials = json.load(f)
  21. # Extract username and password from the list
  22. username, password = credentials
  23. # Create a client with basic authentication
  24. self.client = httpx.Client(auth=BasicAuth(username, password))
  25. # Send authentication request
  26. response = self.client.post(f'{self.brain_api_url}/authentication')
  27. print(f"认证状态: {response.status_code}")
  28. if response.status_code == 200:
  29. print("登录成功!")
  30. return True
  31. else:
  32. print(f"登录失败: {response.json()}")
  33. return False
  34. def simulate_alpha(self, alpha_config):
  35. """模拟单个Alpha因子"""
  36. if self.client is None:
  37. raise Exception("请先调用 login() 方法登录")
  38. name = alpha_config.get('name', '未命名Alpha')
  39. expression = alpha_config['expression']
  40. settings = alpha_config.get('settings', {})
  41. print(f"\n🚀 开始模拟: {name}")
  42. print(f"📝 表达式: {expression}")
  43. simulation_data = {
  44. 'type': 'REGULAR',
  45. 'settings': settings,
  46. 'regular': expression
  47. }
  48. try:
  49. # 发送模拟数据
  50. sim_resp = self.client.post(
  51. f'{self.brain_api_url}/simulations',
  52. json=simulation_data,
  53. )
  54. print(f"📤 模拟提交状态: {sim_resp.status_code}")
  55. if sim_resp.status_code != 200:
  56. print(f"❌ {name} 提交失败")
  57. return {'name': name, 'success': False, 'error': f'提交失败: {sim_resp.status_code}'}
  58. # 获取进度URL并轮询结果
  59. sim_progress_url = sim_resp.headers['location']
  60. print(f"⏳ {name} 进度URL获取成功")
  61. while True:
  62. sim_progress_resp = self.client.get(sim_progress_url)
  63. retry_after_sec = float(sim_progress_resp.headers.get("Retry-After", 0))
  64. if retry_after_sec == 0: # simulation done!
  65. break
  66. print(f"⏰ {name} 等待 {retry_after_sec} 秒...")
  67. sleep(retry_after_sec)
  68. # 获取最终的alpha ID
  69. result_data = sim_progress_resp.json()
  70. alpha_id = result_data["alpha"]
  71. print(f"✅ {name} 模拟完成! Alpha ID: {alpha_id}")
  72. return {
  73. 'name': name,
  74. 'expression': expression,
  75. 'alpha_id': alpha_id,
  76. 'success': True,
  77. 'result_data': result_data
  78. }
  79. except Exception as e:
  80. print(f"❌ {name} 模拟过程中出错: {str(e)}")
  81. return {'name': name, 'success': False, 'error': str(e)}
  82. def batch_simulate_alphas(self, alphas_config_file='alphas.json', max_workers=3):
  83. """批量测试多个Alpha因子(使用线程池)"""
  84. # 读取Alpha配置
  85. try:
  86. with open(alphas_config_file, 'r', encoding='utf-8') as f:
  87. alpha_configs = json.load(f)
  88. except FileNotFoundError:
  89. print(f"❌ 配置文件 {alphas_config_file} 不存在")
  90. return []
  91. except json.JSONDecodeError:
  92. print(f"❌ 配置文件 {alphas_config_file} JSON格式错误")
  93. return []
  94. print(f"📁 读取到 {len(alpha_configs)} 个Alpha配置")
  95. # 登录
  96. if not self.login():
  97. return []
  98. results = []
  99. # 使用线程池并发执行
  100. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  101. # 提交所有任务
  102. future_to_alpha = {
  103. executor.submit(self.simulate_alpha, config): config
  104. for config in alpha_configs
  105. }
  106. # 收集结果
  107. for future in as_completed(future_to_alpha):
  108. config = future_to_alpha[future]
  109. try:
  110. result = future.result()
  111. results.append(result)
  112. except Exception as e:
  113. print(f"❌ {config.get('name', '未知Alpha')} 执行失败: {str(e)}")
  114. results.append({
  115. 'name': config.get('name', '未知Alpha'),
  116. 'success': False,
  117. 'error': str(e)
  118. })
  119. # 打印汇总结果
  120. self._print_summary(results)
  121. return results
  122. async def batch_simulate_alphas_async(self, alphas_config_file='alphas.json'):
  123. """异步批量测试多个Alpha因子"""
  124. # 注意:httpx 的异步客户端需要额外处理
  125. # 这里先提供同步版本,异步版本需要重写
  126. print("⚠️ 异步版本暂未实现,使用同步版本")
  127. return self.batch_simulate_alphas(alphas_config_file)
  128. def _print_summary(self, results):
  129. """打印测试结果汇总"""
  130. print("\n" + "=" * 50)
  131. print("📊 Alpha测试结果汇总")
  132. print("=" * 50)
  133. success_count = sum(1 for r in results if r.get('success', False))
  134. failed_count = len(results) - success_count
  135. print(f"✅ 成功: {success_count} 个")
  136. print(f"❌ 失败: {failed_count} 个")
  137. print("\n成功详情:")
  138. for result in results:
  139. if result.get('success'):
  140. print(f" ✓ {result['name']}: {result['alpha_id']}")
  141. if failed_count > 0:
  142. print("\n失败详情:")
  143. for result in results:
  144. if not result.get('success'):
  145. print(f" ✗ {result['name']}: {result.get('error', '未知错误')}")
  146. def submit_alpha(self, alpha_id):
  147. if self.client is None:
  148. raise Exception("请先调用 login() 方法登录")
  149. result = self.client.post(f'{self.brain_api_url}/alphas/{alpha_id}/submit')
  150. while True:
  151. if "retry-after" in result.headers:
  152. sleep_time = float(result.headers["retry-after"])
  153. print(f"提交等待 {sleep_time} 秒...")
  154. sleep(sleep_time)
  155. result = self.client.get(f'{self.brain_api_url}/alphas/{alpha_id}/submit')
  156. print('检查提交状态...')
  157. else:
  158. break
  159. success = result.status_code == 200
  160. if success:
  161. print(f"Alpha {alpha_id} 提交成功!")
  162. else:
  163. print(f"Alpha {alpha_id} 提交失败,状态码: {result.status_code}")
  164. return success
  165. def close(self):
  166. """关闭客户端连接"""
  167. if self.client:
  168. self.client.close()
  169. self.client = None
  170. # 使用示例
  171. if __name__ == "__main__":
  172. # 创建API实例
  173. wq_api = WorldQuantBrainAPI()
  174. try:
  175. # 批量测试所有Alpha
  176. print("🎯 开始批量测试Alpha因子...")
  177. start_time = datetime.now()
  178. results = wq_api.batch_simulate_alphas(
  179. alphas_config_file='alphas.json',
  180. max_workers=2 # 并发数量,根据API限制调整
  181. )
  182. end_time = datetime.now()
  183. print(f"\n⏱️ 总耗时: {(end_time - start_time).total_seconds():.2f} 秒")
  184. finally:
  185. # 确保连接关闭
  186. wq_api.close()