我用的是TQSIM模拟账户来做回测。
from tqsdk import TqApi, TqBacktest, TqSim, TqAuth, ta
from tqsdk.tafunc import time_to_datetime
from datetime import date
import sys
# 设置回测时间
start_dt = date(2025, 1, 1)
end_dt = date(2025, 5, 1)
try:
# 初始化模拟账户,初始资金 10 万
acc = TqSim(init_balance=10000)
print("初始化模拟账户,初始资金: 10000")
# 初始化 TqApi
api = TqApi(
account=acc,
backtest=TqBacktest(start_dt=start_dt, end_dt=end_dt),
web_gui=True,
auth=TqAuth(tq_username, tq_password)
)
print("TqApi 初始化成功")
# 获取合约和 K 线数据
symbol = "SHFE.rb2501"
try:
klines = api.get_kline_serial(symbol, 60 * 60)
# 转换 K 线的时间戳为可读格式
klines["datetime_readable"] = klines["datetime"].apply(time_to_datetime)
print(f"K 线 datetime_readable 第一行: {klines['datetime_readable'].iloc[0]}")
print(f"K 线 datetime_readable 最后一行: {klines['datetime_readable'].iloc[-1]}")
except Exception as e:
print(f"获取 K 线数据失败: {str(e)}")
raise Exception(f"无法获取 {symbol} 的 K 线数据: {str(e)}")
# 获取账户和持仓对象
account = api.get_account()
position = api.get_position(symbol)
print(f"初始账户余额: {account.balance}, 可用资金: {account.available}")
# 主循环
while True:
try:
# 等待数据更新
api.wait_update()
# 等待新 K 线数据
if not api.is_changing(klines):
continue
### 策略 ###
except Exception as e:
# 捕获所有异常
print(f"运行时错误: {str(e)}")
break
except Exception as e:
# 捕获初始化阶段的错误
print(f"初始化失败: {str(e)}")
sys.exit(1)
finally:
# 确保 TqApi 关闭
try:
api.close()
print("TqApi 已关闭")
except Exception as e:
print(f"关闭 TqApi 失败: {str(e)}")
但是一直显示 ReadTimeout: HTTPSConnectionPool(host=’auth.shinnytech.com’, port=443): Read timed out. (read timeout=30)
请帮忙看一下谢谢!
比如整一个国内的云服务器之类的