使用软件之前需要先读一下说明文档的,回测的K线只在开始和结束的时候更新,文档链接如下
https://doc.shinnytech.com/tqsdk/latest/usage/backtest.html#id8
感谢回复。麻烦帮忙看下具体问题如何解决:
def check_kline_completeness(kline, logger=None):
“””
检查K线数据是否完整
Args:
kline: 单根K线数据
logger: 日志记录器(可选)
Returns:
tuple: (是否完整, 问题列表)
“””
issues = []
# 检查K线是否为Series类型
if not isinstance(kline, pd.Series):
issues.append(“K线数据不是Series类型”)
return False, issues
# 检查必要字段是否存在
required_fields = [‘datetime’, ‘open’, ‘high’, ‘low’, ‘close’, ‘volume’]
missing_fields = [field for field in required_fields if field not in kline.index]
if missing_fields:
issues.append(f”缺少必要字段: {‘, ‘.join(missing_fields)}”)
# 检查时间戳是否有效
if kline.get(‘datetime’, 0) <= 0:
issues.append("无效的时间戳")
# 检查价格是否为NaN
price_fields = ['open', 'high', 'low', 'close']
if any(np.isnan(kline.get(field, np.nan)) for field in price_fields):
issues.append("价格包含NaN值")
# 检查价格是否为正
if any(kline.get(field, 0) <= 0 for field in price_fields):
issues.append("价格包含非正值")
# 检查最高价是否大于等于最低价
high = kline.get('high', 0)
low = kline.get('low', 0)
if high = low and not (low <= open_price = low and not (low <= close_price <= high):
issues.append("收盘价不在高低价之间")
# 检查交易量是否为0(可选检查,因为某些市场可能存在零交易量)
if kline.get('volume', 0) <= 0:
issues.append("交易量为零")
# 检查OHLC是否全部相等(可选检查,因为可能存在一字板情况)
if open_price == high == low == close_price:
issues.append("OHLC全部相等")
if issues and logger:
#logger.warning(f"K线数据检查失败: {convert_timestamp(kline.get('datetime', '未知时间'))}, 问题: {', '.join(issues)}")
# print(f"K线数据检查失败: {convert_timestamp(kline.get('datetime', '未知时间'))}, 问题: {', '.join(issues)}")
pass
return len(issues) == 0, issues
def get_complete_kline(kline, api_api, logger=None):
"""
获取完整且正确的K线数据
Args:
kline: K线数据(通过api.get_kline_serial获取)
api_api: TqApi实例,用于获取最新数据
logger: 日志记录器(可选)
Returns:
tuple: (是否成功, K线数据/None)
"""
from tqsdk import TqApi
max_index = 299
for i in range(1, max_index):
# 每次循环都获取最新的K线数据
current_kline = kline.iloc[-1].copy()
# 确保是Series类型
if not isinstance(current_kline, pd.Series):
if logger:
logger.error("获取的K线不是Series类型,无法处理")
api_api.wait_update()
delay_ms(128)
continue
is_complete, issues = check_kline_completeness(current_kline, logger)
if is_complete:
if logger:
logger.info(f"循环{str(i)}次后获取到完整K线数据: {convert_timestamp(current_kline['datetime'])}")
return True, current_kline
else:
api_api.wait_update() # wait for volume update to be stable 解决网络数据传输不稳定导致的异常数据
delay_ms(128)
# 循环结束仍未获取到完整K线
current_kline = kline.iloc[-1].copy()
if logger:
logger.error(f"超过最大循环次数{str(max_index)}次,无法获取完整K线数据: {convert_timestamp(current_kline['datetime'])}")
# 严格按照用户要求,返回失败
return False, current_kline
def run(self):
"""运行策略主循环"""
# 遍历所有交易品种
for symbol in self.symbols:
# 这里不能限制等待时间
self.api.wait_update()
# “datetime”更新判断方式,可能因为时间太短,导致open,high,low,close相同,close_oi
if self.api.is_changing(self.klines[symbol].iloc[-1], ["datetime"]) : # 产生新k线
success, current_kline = get_complete_kline(self.klines[symbol],api_api=self.api, logger=self.logger)
if not success :
self.logger.error(f"获取K线数据不完整,程序退出: {convert_timestamp(current_kline['datetime'])}")
# 严格按照用户要求,K线失败则退出程序
self.api.close()
import sys
sys.exit(1)


有其他问题欢迎加入官方Q群748265037一起交流