| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583 |
- """
- Enhanced Alpha Template Generator Script
- This script generates alpha templates with interactive user input for:
- - JSON file path selection
- - User authentication
- - Simulation parameters
- - Multi-simulation mode support
- - Real-time log monitoring
- """
- import asyncio
- import wqb
- import json
- import os
- import getpass
- import threading
- import time
- import sys
- import msvcrt # For Windows password input with asterisks
- from pathlib import Path
- def get_password_with_asterisks(prompt):
- """Get password input with asterisks shown for each character"""
- print(prompt, end='', flush=True)
- password = ""
-
- while True:
- char = msvcrt.getch()
-
- # Handle Enter key (carriage return)
- if char == b'\r':
- print() # New line
- break
- # Handle Backspace
- elif char == b'\x08':
- if len(password) > 0:
- password = password[:-1]
- # Move cursor back, print space, move cursor back again
- print('\b \b', end='', flush=True)
- # Handle Ctrl+C
- elif char == b'\x03':
- print()
- raise KeyboardInterrupt
- # Handle regular characters
- else:
- try:
- # Convert bytes to string
- char_str = char.decode('utf-8')
- if char_str.isprintable():
- password += char_str
- print('*', end='', flush=True)
- except UnicodeDecodeError:
- pass # Ignore non-printable characters
-
- return password
- def get_json_filepath():
- """Ask user to input the directory/filepath of expressions_with_settings.json"""
- while True:
- print("\n" + "="*60)
- print("JSON 文件配置")
- print("="*60)
- filepath = input("请复制粘贴 expressions_with_settings.json (即以json格式储存的带有setting的表达式列表)的目录或完整路径: ").strip()
-
- # Remove quotes if user copied with quotes
- filepath = filepath.strip('"').strip("'")
-
- # Check if it's a directory and try to find the file
- if os.path.isdir(filepath):
- json_path = os.path.join(filepath, "expressions_with_settings.json")
- else:
- json_path = filepath
-
- # Verify file exists
- if os.path.exists(json_path):
- try:
- with open(json_path, 'r') as f:
- data = json.load(f)
- print(f"✓ 成功加载 JSON 文件: {json_path}")
- return json_path, data
- except json.JSONDecodeError:
- print("❌ 错误: JSON 文件格式无效,请检查文件。")
- except Exception as e:
- print(f"❌ 读取文件错误: {e}")
- else:
- print("❌ 错误: 文件未找到,请检查路径后重试。")
- def get_user_credentials():
- """Ask user for brain username and password with asterisk password input"""
- print("\n" + "="*60)
- print("BRAIN 身份验证")
- print("="*60)
-
- username = input("请输入您的 BRAIN 用户名: ").strip()
- password = get_password_with_asterisks("请输入您的 BRAIN 密码 (显示为 *): ")
-
- return username, password
- def test_authentication(username, password):
- """Test authentication and return session if successful"""
- print("\n" + "="*60)
- print("API连通验证")
- print("="*60)
-
- try:
- logger = wqb.wqb_logger()
- wqbs = wqb.WQBSession((username, password), logger=logger)
-
- # Test connection
- resp = wqbs.locate_field('open')
- print(f"连接测试结果: resp.ok = {resp.ok}")
-
- if resp.ok:
- print("✓ 身份验证成功!")
- return wqbs, logger
- else:
- print("❌ 身份验证失败,请检查您的用户名和密码。")
- return None, None
-
- except Exception as e:
- print(f"❌ 身份验证错误: {e}")
- return None, None
- def get_simulation_parameters(expressions_count, json_path):
- """Get simulation parameters from user with validation"""
- print("\n" + "="*60)
- print("回测参数设置")
- print("="*60)
- print(f"JSON 中的表达式总数: {expressions_count}")
-
- # Get starting position
- while True:
- try:
- where_to_start = int(input(f"从列表中第几个表达式开始 (0 到 {expressions_count-1}): "))
- if 0 <= where_to_start < expressions_count:
- if where_to_start > 0:
- print(f"\n⚠️ 警告: 原始 JSON 文件将被直接覆盖!")
- print(f"📝 原始文件: {expressions_count} 个表达式")
- print(f"🔪 切割后: {expressions_count - where_to_start} 个表达式")
- print(f"📂 文件位置: {json_path}")
- print(f"\n🚨 重要提示: 如果您不希望覆盖原始文件,请立即关闭终端并手动备份文件!")
- print(f"⏰ 5秒后将继续执行覆盖操作...")
-
- # Give user 5 seconds to think/close terminal
- import time
- for i in range(5, 0, -1):
- print(f"倒计时: {i} 秒...", end='\r')
- time.sleep(1)
- print(" ") # Clear countdown line
-
- confirm = input("(继续程序,开始回测y/返回并重选列表起始位置n): ").lower().strip()
- if confirm != 'y':
- print("请重新选择表达式列表起始位置。")
- continue
- break
- else:
- print(f"❌ 起始位置无效,必须在 0 到 {expressions_count-1} 之间")
- except ValueError:
- print("❌ 请输入有效数字。")
-
- # Get concurrent count
- while True:
- try:
- concurrent_count = int(input("请输入并发回测数量 (最小值 1): "))
- if concurrent_count >= 1:
- break
- else:
- print("❌ 并发数量必须大于等于 1。")
- except ValueError:
- print("❌ 请输入有效数字。")
-
- return where_to_start, concurrent_count
- def cut_json_file(json_path, expressions_with_settings, where_to_start):
- """Cut the JSON file from the starting point and overwrite the original file"""
- if where_to_start == 0:
- return expressions_with_settings # No cutting needed
-
- # Cut the expressions list
- cut_expressions = expressions_with_settings[where_to_start:]
-
- # Overwrite the original JSON file
- try:
- with open(json_path, 'w', encoding='utf-8') as f:
- json.dump(cut_expressions, f, ensure_ascii=False, indent=2)
- print(f"✓ 原始 JSON 文件已被覆盖")
- print(f"📊 新文件包含 {len(cut_expressions)} 个表达式")
- return cut_expressions
- except Exception as e:
- print(f"❌ 覆盖 JSON 文件失败: {e}")
- print(f"⚠️ 将使用原始数据继续运行")
- return expressions_with_settings
- def shuffle_json_file(json_path, expressions_with_settings):
- """Randomly shuffle the JSON elements and overwrite the file"""
- import random
-
- # Create a copy and shuffle it
- shuffled_expressions = expressions_with_settings.copy()
- random.shuffle(shuffled_expressions)
-
- # Overwrite the JSON file with shuffled data
- try:
- with open(json_path, 'w', encoding='utf-8') as f:
- json.dump(shuffled_expressions, f, ensure_ascii=False, indent=2)
- print(f"✓ JSON 文件已随机打乱并覆盖")
- print(f"🔀 已打乱 {len(shuffled_expressions)} 个表达式的顺序")
- return shuffled_expressions
- except Exception as e:
- print(f"❌ 打乱 JSON 文件失败: {e}")
- print(f"⚠️ 将使用原始顺序继续运行")
- return expressions_with_settings
- def get_random_shuffle_choice():
- """Ask user if they want to randomly shuffle the expressions"""
- print("\n" + "="*60)
- print("随机模式选择")
- print("="*60)
- print("是否要随机打乱表达式顺序?")
- print("💡 这将改变表达式在文件中的排列顺序,以达到随机回测的目的")
-
- while True:
- choice = input("选择随机模式? (y/n): ").lower().strip()
- if choice in ['y', 'n']:
- return choice == 'y'
- else:
- print("❌ 请输入 y 或 n")
- def get_multi_simulation_choice():
- """Ask user if they want to use multi-simulation mode"""
- print("\n" + "="*60)
- print("多重回测(multi-simulatioin)模式选择")
- print("="*60)
- print("是否要使用多重回测(multi-simulatioin)模式?")
- print("💡 多重回测(multi-simulatioin)可以将多个alpha组合在一个回测槽中运行")
-
- while True:
- choice = input("使用多重回测(multi-simulatioin)模式? (y/n): ").lower().strip()
- if choice in ['y', 'n']:
- return choice == 'y'
- else:
- print("❌ 请输入 y 或 n")
- def get_alpha_count_per_slot():
- """Ask user how many alphas to put in one multi-simulation slot"""
- print("\n" + "="*60)
- print("多重回测(multi-simulatioin)槽配置")
- print("="*60)
- print("每个多重回测(multi-simulatioin)槽中放置多少个alpha?")
- print("💡 范围: 2-10 个alpha")
-
- while True:
- try:
- alpha_count = int(input("每个槽的alpha数量 (2-10): "))
- if 2 <= alpha_count <= 10:
- return alpha_count
- else:
- print("❌ 数量必须在 2 到 10 之间")
- except ValueError:
- print("❌ 请输入有效数字。")
- def monitor_log_file(logger, stop_event, use_multi_sim=False, alpha_count_per_slot=None):
- """Monitor log file and print new lines in real-time"""
- print("\n📊 开始监控日志文件...")
-
- # Get current directory to look for log files
- current_dir = os.getcwd()
- log_file_path = None
-
- # First, try to find any existing wqb log files (including older ones)
- print("🔍 查找 WQB 日志文件...")
-
- # Look for any wqb*.log files in current directory
- wqb_files = []
- try:
- for file in os.listdir(current_dir):
- if file.startswith('wqb') and file.endswith('.log'):
- file_path = os.path.join(current_dir, file)
- wqb_files.append((file_path, os.path.getmtime(file_path)))
- except Exception as e:
- print(f"⚠️ 扫描目录失败: {e}")
- return
-
- if wqb_files:
- # Sort by modification time, get the newest one
- log_file_path = sorted(wqb_files, key=lambda x: x[1], reverse=True)[0][0]
- print(f"✓ 监控已找到的最新日志文件: {log_file_path}")
- else:
- # Wait for new log file to be created
- print("等待新的 WQB 日志文件创建...")
- start_time = time.time()
-
- while not stop_event.is_set() and (time.time() - start_time) < 30: # Wait max 30 seconds
- try:
- for file in os.listdir(current_dir):
- if file.startswith('wqb') and file.endswith('.log'):
- file_path = os.path.join(current_dir, file)
- # Check if file was created recently (within last 120 seconds)
- if os.path.getctime(file_path) > (time.time() - 120):
- log_file_path = file_path
- break
- except Exception:
- pass
-
- if log_file_path:
- break
- time.sleep(1)
-
- if not log_file_path:
- print("⚠️ 未找到 WQB 日志文件,日志监控已禁用。")
- print("💡 提示: 日志文件通常在开始回测后才会创建")
- return
- else:
- print(f"✓ 找到新日志文件: {log_file_path}")
-
- if stop_event.is_set():
- return
-
- print("="*60)
-
- # Display multi-simulation information if applicable
- if use_multi_sim and alpha_count_per_slot:
- print("📌 重要提示:")
- print(f"以下是multi simulation的记录,你的设计是1个multi simulation中有{alpha_count_per_slot}个alpha,")
- print(f"因此需将实际回测数乘以该乘数,才得到实际已完成的Alpha个数。")
- print("="*60)
-
- try:
- # Start monitoring from current end of file
- with open(log_file_path, 'r', encoding='utf-8') as f:
- # Go to end of file
- f.seek(0, 2)
-
- while not stop_event.is_set():
- line = f.readline()
- if line:
- # Clean up the log line and print it
- clean_line = line.rstrip()
- if clean_line: # Only print non-empty lines
- print(f"[日志] {clean_line}")
- else:
- time.sleep(0.2)
- except Exception as e:
- print(f"⚠️ 监控日志文件时出错: {e}")
- async def automated_main(json_file_content, username, password, start_position=0, concurrent_count=3,
- random_shuffle=False, use_multi_sim=False, alpha_count_per_slot=3):
- """Automated main function for web interface - takes all parameters at once"""
- try:
- print("🧠 BRAIN Alpha 模板生成器 (自动模式)")
- print("="*60)
-
- # Parse JSON content directly
- import json
- expressions_with_settings = json.loads(json_file_content)
- expressions_count = len(expressions_with_settings)
-
- print(f"📊 已加载 {expressions_count} 个 alpha 配置")
-
- # Setup logger and session
- logger = wqb.wqb_logger()
- wqbs = wqb.WQBSession((username, password), logger=logger)
-
- # Test connection
- resp = wqbs.locate_field('open')
- print(f"连接测试结果: resp.ok = {resp.ok}")
-
- if not resp.ok:
- print("❌ 身份验证失败")
- return {"success": False, "error": "Authentication failed"}
-
- print("✅ 身份验证成功!")
-
- # Process expressions based on parameters
- if start_position > 0:
- expressions_with_settings = expressions_with_settings[start_position:]
- print(f"🔪 已从位置 {start_position} 开始切割,剩余 {len(expressions_with_settings)} 个表达式")
-
- if random_shuffle:
- import random
- random.shuffle(expressions_with_settings)
- print(f"🔀 已随机打乱 {len(expressions_with_settings)} 个表达式的顺序")
-
- if use_multi_sim:
- # Convert to multi-alphas format
- original_count = len(expressions_with_settings)
- expressions_with_settings = wqb.to_multi_alphas(expressions_with_settings, alpha_count_per_slot)
- print(f"✓ 已转换为多重回测(multi-simulatioin)格式")
- print(f"📊 原始表达式数: {original_count}")
- print(f"🎯 每槽alpha数: {alpha_count_per_slot}")
-
-
- # Write multi-simulation info to log
- multi_sim_msg = (f"[MULTI-SIMULATION MODE] 以下是multi simulation的记录,"
- f"你的设计是1个multi simulation中有{alpha_count_per_slot}个alpha,"
- f"因此需将实际回测数乘以该乘数,才得到实际已完成的Alpha个数。")
- logger.info("="*80)
- logger.info(multi_sim_msg)
- logger.info("="*80)
-
- print(f"🔄 使用 {concurrent_count} 个并发回测")
- print("\n" + "="*60)
- print("运行回测")
- print("="*60)
-
- if use_multi_sim:
- print("开始多重回测(multi-simulatioin)并发回测...")
- else:
- print("开始并发回测...")
-
- # Run simulations
- resps = await wqbs.concurrent_simulate(
- expressions_with_settings,
- concurrent_count,
- log_gap=10
- )
-
- # Collect results
- alpha_ids = []
- successful_count = 0
- failed_count = 0
-
- print("\n" + "="*60)
- print("回测结果")
- print("="*60)
-
- if use_multi_sim:
- print(f"成功完成 {len(resps)} 个多重回测(multi-simulatioin)槽的回测")
- else:
- print(f"成功完成 {len(resps)} 个回测")
-
- print("\nAlpha IDs:")
- for i, resp in enumerate(resps):
- try:
- alpha_id = resp.json()['alpha']
- alpha_ids.append(alpha_id)
- successful_count += 1
- print(f" {i+1:4d}. {alpha_id}")
- except Exception as e:
- failed_count += 1
- print(f" {i+1:4d}. 错误: {e}")
-
- print("\n✅ 处理完成!")
-
- return {
- "success": True,
- "results": {
- "total": len(resps),
- "successful": successful_count,
- "failed": failed_count,
- "alphaIds": alpha_ids,
- "use_multi_sim": use_multi_sim,
- "alpha_count_per_slot": alpha_count_per_slot if use_multi_sim else None
- }
- }
-
- except Exception as e:
- print(f"\n❌ 错误: {e}")
- return {"success": False, "error": str(e)}
- async def main():
- """Main function with interactive workflow"""
- print("🧠 BRAIN Alpha 模板生成器")
- print("="*60)
-
- # Step 1: Get JSON file and load expressions
- json_path, expressions_with_settings = get_json_filepath()
- expressions_count = len(expressions_with_settings)
-
- print(f"\n📊 已从以下位置加载 {expressions_count} 个 alpha 配置:")
- print(f" {json_path}")
-
- # Step 2: Get credentials and test authentication
- wqbs = None
- logger = None
-
- while wqbs is None:
- username, password = get_user_credentials()
- wqbs, logger = test_authentication(username, password)
-
- if wqbs is None:
- retry = input("\n是否要重试? (y/n): ").lower().strip()
- if retry != 'y':
- print("正在退出...")
- return
-
- # Step 3: Get simulation parameters
- where_to_start, concurrent_count = get_simulation_parameters(expressions_count, json_path)
-
- # Step 3.5: Cut JSON file if needed
- if where_to_start > 0:
- print(f"\n🔪 正在切割 JSON 文件...")
- expressions_with_settings = cut_json_file(json_path, expressions_with_settings, where_to_start)
- where_to_start = 0 # Reset to 0 since we cut the file
-
- # Step 3.6: Ask for random shuffle option
- if get_random_shuffle_choice():
- print(f"\n🔀 正在随机打乱表达式顺序...")
- expressions_with_settings = shuffle_json_file(json_path, expressions_with_settings)
-
- # Step 3.7: Ask for multi-simulation mode
- use_multi_sim = get_multi_simulation_choice()
- alpha_count_per_slot = None
-
- if use_multi_sim:
- alpha_count_per_slot = get_alpha_count_per_slot()
- # Convert to multi-alphas format
- original_count = len(expressions_with_settings)
- expressions_with_settings = wqb.to_multi_alphas(expressions_with_settings, alpha_count_per_slot)
- print(f"\n✓ 已转换为多重回测(multi-simulatioin)格式")
- print(f"📊 原始表达式数: {original_count}")
- print(f"🎯 每槽alpha数: {alpha_count_per_slot}")
-
- # Calculate how many expressions will be processed
- print(f"🔄 使用 {concurrent_count} 个并发回测")
-
- # Step 4: Write multi-simulation info to log if applicable
- if use_multi_sim and alpha_count_per_slot and logger:
- multi_sim_msg = (f"[MULTI-SIMULATION MODE] 以下是multi simulation的记录,"
- f"你的设计是1个multi simulation中有{alpha_count_per_slot}个alpha,"
- f"因此需将实际回测数乘以该乘数,才得到实际已完成的Alpha个数。")
- logger.info("="*80)
- logger.info(multi_sim_msg)
- logger.info("="*80)
-
- # Step 5: Start log monitoring in background
- stop_log_monitor = threading.Event()
- log_thread = threading.Thread(
- target=monitor_log_file,
- args=(logger, stop_log_monitor, use_multi_sim, alpha_count_per_slot),
- daemon=True
- )
- log_thread.start()
-
- # Step 6: Run simulations
- print("\n" + "="*60)
- print("运行回测")
- print("="*60)
- if use_multi_sim:
- print("开始多重回测(multi-simulatioin)并发回测...")
- else:
- print("开始并发回测...")
-
- try:
- resps = await wqbs.concurrent_simulate(
- expressions_with_settings,
- concurrent_count,
- log_gap=10
- )
-
- # Stop log monitoring
- stop_log_monitor.set()
-
- # Print results
- print("\n" + "="*60)
- print("回测结果")
- print("="*60)
-
- if use_multi_sim:
- print(f"成功完成 {len(resps)} 个多重回测(multi-simulatioin)槽的回测")
- else:
- print(f"成功完成 {len(resps)} 个回测")
-
- print("\nAlpha IDs:")
- for i, resp in enumerate(resps):
- try:
- alpha_id = resp.json()['alpha']
- print(f" {i+1:4d}. {alpha_id}")
- except Exception as e:
- print(f" {i+1:4d}. 错误: {e}")
-
- except KeyboardInterrupt:
- print("\n\n⚠️ 回测被用户中断")
- stop_log_monitor.set()
- except Exception as e:
- print(f"\n❌ 回测错误: {e}")
- stop_log_monitor.set()
-
- print("\n✅ 处理完成!")
- if __name__ == '__main__':
- asyncio.run(main())
|