我用的是TQSIM模拟账户来做回测。
from tqsdk import TqApi, TqBacktest, TqSim, TqAuth, ta from tqsdk.tafunc import time_to_datetime from datetime import date import sys # 设置回测时间 start_dt = date(2025, 1, 1) end_dt = date(2025, 5, 1) try: # 初始化模拟账户,初始资金 10 万 acc = TqSim(init_balance=10000) print("初始化模拟账户,初始资金: 10000") # 初始化 TqApi api = TqApi( account=acc, backtest=TqBacktest(start_dt=start_dt, end_dt=end_dt), web_gui=True, auth=TqAuth(tq_username, tq_password) ) print("TqApi 初始化成功") # 获取合约和 K 线数据 symbol = "SHFE.rb2501" try: klines = api.get_kline_serial(symbol, 60 * 60) # 转换 K 线的时间戳为可读格式 klines["datetime_readable"] = klines["datetime"].apply(time_to_datetime) print(f"K 线 datetime_readable 第一行: {klines['datetime_readable'].iloc[0]}") print(f"K 线 datetime_readable 最后一行: {klines['datetime_readable'].iloc[-1]}") except Exception as e: print(f"获取 K 线数据失败: {str(e)}") raise Exception(f"无法获取 {symbol} 的 K 线数据: {str(e)}") # 获取账户和持仓对象 account = api.get_account() position = api.get_position(symbol) print(f"初始账户余额: {account.balance}, 可用资金: {account.available}") # 主循环 while True: try: # 等待数据更新 api.wait_update() # 等待新 K 线数据 if not api.is_changing(klines): continue ### 策略 ### except Exception as e: # 捕获所有异常 print(f"运行时错误: {str(e)}") break except Exception as e: # 捕获初始化阶段的错误 print(f"初始化失败: {str(e)}") sys.exit(1) finally: # 确保 TqApi 关闭 try: api.close() print("TqApi 已关闭") except Exception as e: print(f"关闭 TqApi 失败: {str(e)}")
但是一直显示 ReadTimeout: HTTPSConnectionPool(host=’auth.shinnytech.com’, port=443): Read timed out. (read timeout=30)
请帮忙看一下谢谢!