import httpx import json import asyncio import threading from concurrent.futures import ThreadPoolExecutor, as_completed from os.path import expanduser, join, dirname, abspath from httpx import BasicAuth from time import sleep from datetime import datetime class WorldQuantBrainAPI: def __init__(self, credentials_file='brain_credentials_copy.txt'): """初始化API客户端""" self.credentials_file = expanduser(credentials_file) self.client = None self.brain_api_url = 'https://api.worldquantbrain.com' def login(self): """登录认证""" # Load credentials with open(self.credentials_file) as f: credentials = json.load(f) # Extract username and password from the list username, password = credentials # Create a client with basic authentication self.client = httpx.Client(auth=BasicAuth(username, password)) # Send authentication request response = self.client.post(f'{self.brain_api_url}/authentication') print(f"认证状态: {response.status_code}") if response.status_code == 200: print("登录成功!") return True else: print(f"登录失败: {response.json()}") return False def simulate_alpha(self, alpha_config): """模拟单个Alpha因子""" if self.client is None: raise Exception("请先调用 login() 方法登录") name = alpha_config.get('name', '未命名Alpha') expression = alpha_config['expression'] settings = alpha_config.get('settings', {}) print(f"\n🚀 开始模拟: {name}") print(f"📝 表达式: {expression}") simulation_data = { 'type': 'REGULAR', 'settings': settings, 'regular': expression } try: # 发送模拟数据 sim_resp = self.client.post( f'{self.brain_api_url}/simulations', json=simulation_data, ) print(f"📤 模拟提交状态: {sim_resp.status_code}") if sim_resp.status_code != 200: print(f"❌ {name} 提交失败") return {'name': name, 'success': False, 'error': f'提交失败: {sim_resp.status_code}'} # 获取进度URL并轮询结果 sim_progress_url = sim_resp.headers['location'] print(f"⏳ {name} 进度URL获取成功") while True: sim_progress_resp = self.client.get(sim_progress_url) retry_after_sec = float(sim_progress_resp.headers.get("Retry-After", 0)) if retry_after_sec == 0: # simulation done! break print(f"⏰ {name} 等待 {retry_after_sec} 秒...") sleep(retry_after_sec) # 获取最终的alpha ID result_data = sim_progress_resp.json() alpha_id = result_data["alpha"] print(f"✅ {name} 模拟完成! Alpha ID: {alpha_id}") return { 'name': name, 'expression': expression, 'alpha_id': alpha_id, 'success': True, 'result_data': result_data } except Exception as e: print(f"❌ {name} 模拟过程中出错: {str(e)}") return {'name': name, 'success': False, 'error': str(e)} def batch_simulate_alphas(self, alphas_config_file='alphas.json', max_workers=3): """批量测试多个Alpha因子(使用线程池)""" # 读取Alpha配置 try: with open(alphas_config_file, 'r', encoding='utf-8') as f: alpha_configs = json.load(f) except FileNotFoundError: print(f"❌ 配置文件 {alphas_config_file} 不存在") return [] except json.JSONDecodeError: print(f"❌ 配置文件 {alphas_config_file} JSON格式错误") return [] print(f"📁 读取到 {len(alpha_configs)} 个Alpha配置") # 登录 if not self.login(): return [] results = [] # 使用线程池并发执行 with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 future_to_alpha = { executor.submit(self.simulate_alpha, config): config for config in alpha_configs } # 收集结果 for future in as_completed(future_to_alpha): config = future_to_alpha[future] try: result = future.result() results.append(result) except Exception as e: print(f"❌ {config.get('name', '未知Alpha')} 执行失败: {str(e)}") results.append({ 'name': config.get('name', '未知Alpha'), 'success': False, 'error': str(e) }) # 打印汇总结果 self._print_summary(results) return results async def batch_simulate_alphas_async(self, alphas_config_file='alphas.json'): """异步批量测试多个Alpha因子""" # 注意:httpx 的异步客户端需要额外处理 # 这里先提供同步版本,异步版本需要重写 print("⚠️ 异步版本暂未实现,使用同步版本") return self.batch_simulate_alphas(alphas_config_file) def _print_summary(self, results): """打印测试结果汇总""" print("\n" + "=" * 50) print("📊 Alpha测试结果汇总") print("=" * 50) success_count = sum(1 for r in results if r.get('success', False)) failed_count = len(results) - success_count print(f"✅ 成功: {success_count} 个") print(f"❌ 失败: {failed_count} 个") print("\n成功详情:") for result in results: if result.get('success'): print(f" ✓ {result['name']}: {result['alpha_id']}") if failed_count > 0: print("\n失败详情:") for result in results: if not result.get('success'): print(f" ✗ {result['name']}: {result.get('error', '未知错误')}") def submit_alpha(self, alpha_id): if self.client is None: raise Exception("请先调用 login() 方法登录") result = self.client.post(f'{self.brain_api_url}/alphas/{alpha_id}/submit') while True: if "retry-after" in result.headers: sleep_time = float(result.headers["retry-after"]) print(f"提交等待 {sleep_time} 秒...") sleep(sleep_time) result = self.client.get(f'{self.brain_api_url}/alphas/{alpha_id}/submit') print('检查提交状态...') else: break success = result.status_code == 200 if success: print(f"Alpha {alpha_id} 提交成功!") else: print(f"Alpha {alpha_id} 提交失败,状态码: {result.status_code}") return success def close(self): """关闭客户端连接""" if self.client: self.client.close() self.client = None # 使用示例 if __name__ == "__main__": # 创建API实例 wq_api = WorldQuantBrainAPI() try: # 批量测试所有Alpha print("🎯 开始批量测试Alpha因子...") start_time = datetime.now() results = wq_api.batch_simulate_alphas( alphas_config_file='alphas.json', max_workers=2 # 并发数量,根据API限制调整 ) end_time = datetime.now() print(f"\n⏱️ 总耗时: {(end_time - start_time).total_seconds():.2f} 秒") finally: # 确保连接关闭 wq_api.close()