| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- import httpx
- import json
- import asyncio
- import threading
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from os.path import expanduser, join, dirname, abspath
- from httpx import BasicAuth
- from time import sleep
- from datetime import datetime
- class WorldQuantBrainAPI:
- def __init__(self, credentials_file='brain_credentials_copy.txt'):
- """初始化API客户端"""
- self.credentials_file = expanduser(credentials_file)
- self.client = None
- self.brain_api_url = 'https://api.worldquantbrain.com'
- def login(self):
- """登录认证"""
- # Load credentials
- with open(self.credentials_file) as f:
- credentials = json.load(f)
- # Extract username and password from the list
- username, password = credentials
- # Create a client with basic authentication
- self.client = httpx.Client(auth=BasicAuth(username, password))
- # Send authentication request
- response = self.client.post(f'{self.brain_api_url}/authentication')
- print(f"认证状态: {response.status_code}")
- if response.status_code == 200:
- print("登录成功!")
- return True
- else:
- print(f"登录失败: {response.json()}")
- return False
- def simulate_alpha(self, alpha_config):
- """模拟单个Alpha因子"""
- if self.client is None:
- raise Exception("请先调用 login() 方法登录")
- name = alpha_config.get('name', '未命名Alpha')
- expression = alpha_config['expression']
- settings = alpha_config.get('settings', {})
- print(f"\n🚀 开始模拟: {name}")
- print(f"📝 表达式: {expression}")
- simulation_data = {
- 'type': 'REGULAR',
- 'settings': settings,
- 'regular': expression
- }
- try:
- # 发送模拟数据
- sim_resp = self.client.post(
- f'{self.brain_api_url}/simulations',
- json=simulation_data,
- )
- print(f"📤 模拟提交状态: {sim_resp.status_code}")
- if sim_resp.status_code != 200:
- print(f"❌ {name} 提交失败")
- return {'name': name, 'success': False, 'error': f'提交失败: {sim_resp.status_code}'}
- # 获取进度URL并轮询结果
- sim_progress_url = sim_resp.headers['location']
- print(f"⏳ {name} 进度URL获取成功")
- while True:
- sim_progress_resp = self.client.get(sim_progress_url)
- retry_after_sec = float(sim_progress_resp.headers.get("Retry-After", 0))
- if retry_after_sec == 0: # simulation done!
- break
- print(f"⏰ {name} 等待 {retry_after_sec} 秒...")
- sleep(retry_after_sec)
- # 获取最终的alpha ID
- result_data = sim_progress_resp.json()
- alpha_id = result_data["alpha"]
- print(f"✅ {name} 模拟完成! Alpha ID: {alpha_id}")
- return {
- 'name': name,
- 'expression': expression,
- 'alpha_id': alpha_id,
- 'success': True,
- 'result_data': result_data
- }
- except Exception as e:
- print(f"❌ {name} 模拟过程中出错: {str(e)}")
- return {'name': name, 'success': False, 'error': str(e)}
- def batch_simulate_alphas(self, alphas_config_file='alphas.json', max_workers=3):
- """批量测试多个Alpha因子(使用线程池)"""
- # 读取Alpha配置
- try:
- with open(alphas_config_file, 'r', encoding='utf-8') as f:
- alpha_configs = json.load(f)
- except FileNotFoundError:
- print(f"❌ 配置文件 {alphas_config_file} 不存在")
- return []
- except json.JSONDecodeError:
- print(f"❌ 配置文件 {alphas_config_file} JSON格式错误")
- return []
- print(f"📁 读取到 {len(alpha_configs)} 个Alpha配置")
- # 登录
- if not self.login():
- return []
- results = []
- # 使用线程池并发执行
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # 提交所有任务
- future_to_alpha = {
- executor.submit(self.simulate_alpha, config): config
- for config in alpha_configs
- }
- # 收集结果
- for future in as_completed(future_to_alpha):
- config = future_to_alpha[future]
- try:
- result = future.result()
- results.append(result)
- except Exception as e:
- print(f"❌ {config.get('name', '未知Alpha')} 执行失败: {str(e)}")
- results.append({
- 'name': config.get('name', '未知Alpha'),
- 'success': False,
- 'error': str(e)
- })
- # 打印汇总结果
- self._print_summary(results)
- return results
- async def batch_simulate_alphas_async(self, alphas_config_file='alphas.json'):
- """异步批量测试多个Alpha因子"""
- # 注意:httpx 的异步客户端需要额外处理
- # 这里先提供同步版本,异步版本需要重写
- print("⚠️ 异步版本暂未实现,使用同步版本")
- return self.batch_simulate_alphas(alphas_config_file)
- def _print_summary(self, results):
- """打印测试结果汇总"""
- print("\n" + "=" * 50)
- print("📊 Alpha测试结果汇总")
- print("=" * 50)
- success_count = sum(1 for r in results if r.get('success', False))
- failed_count = len(results) - success_count
- print(f"✅ 成功: {success_count} 个")
- print(f"❌ 失败: {failed_count} 个")
- print("\n成功详情:")
- for result in results:
- if result.get('success'):
- print(f" ✓ {result['name']}: {result['alpha_id']}")
- if failed_count > 0:
- print("\n失败详情:")
- for result in results:
- if not result.get('success'):
- print(f" ✗ {result['name']}: {result.get('error', '未知错误')}")
- def submit_alpha(self, alpha_id):
- if self.client is None:
- raise Exception("请先调用 login() 方法登录")
- result = self.client.post(f'{self.brain_api_url}/alphas/{alpha_id}/submit')
- while True:
- if "retry-after" in result.headers:
- sleep_time = float(result.headers["retry-after"])
- print(f"提交等待 {sleep_time} 秒...")
- sleep(sleep_time)
- result = self.client.get(f'{self.brain_api_url}/alphas/{alpha_id}/submit')
- print('检查提交状态...')
- else:
- break
- success = result.status_code == 200
- if success:
- print(f"Alpha {alpha_id} 提交成功!")
- else:
- print(f"Alpha {alpha_id} 提交失败,状态码: {result.status_code}")
- return success
- def close(self):
- """关闭客户端连接"""
- if self.client:
- self.client.close()
- self.client = None
- # 使用示例
- if __name__ == "__main__":
- # 创建API实例
- wq_api = WorldQuantBrainAPI()
- try:
- # 批量测试所有Alpha
- print("🎯 开始批量测试Alpha因子...")
- start_time = datetime.now()
- results = wq_api.batch_simulate_alphas(
- alphas_config_file='alphas.json',
- max_workers=2 # 并发数量,根据API限制调整
- )
- end_time = datetime.now()
- print(f"\n⏱️ 总耗时: {(end_time - start_time).total_seconds():.2f} 秒")
- finally:
- # 确保连接关闭
- wq_api.close()
|