38 浏览
0

以下是通过Kline订阅后,再输出到csv文件的K线数据。 数据质量是量化成功的基础,连地基都不行,以后做出的量化策略就是笑话。Open,High,Low,Close四个价格都是一样的,成交量是0。 这么离谱的数据居然被服务器传出来。

吐槽完了,那小白群众怎么办?

我的方法是:延时,反复获取K线数据。

chaos 已回答的问题 8小时 前
0

使用软件之前需要先读一下说明文档的,回测的K线只在开始和结束的时候更新,文档链接如下

https://doc.shinnytech.com/tqsdk/latest/usage/backtest.html#id8

ambitjohnson 发表新评论 5小时 前

有其他问题欢迎加入官方Q群748265037一起交流

感谢回复。麻烦帮忙看下具体问题如何解决:
def check_kline_completeness(kline, logger=None):
“””
检查K线数据是否完整

Args:
kline: 单根K线数据
logger: 日志记录器(可选)

Returns:
tuple: (是否完整, 问题列表)
“””
issues = []

# 检查K线是否为Series类型
if not isinstance(kline, pd.Series):
issues.append(“K线数据不是Series类型”)
return False, issues

# 检查必要字段是否存在
required_fields = [‘datetime’, ‘open’, ‘high’, ‘low’, ‘close’, ‘volume’]
missing_fields = [field for field in required_fields if field not in kline.index]
if missing_fields:
issues.append(f”缺少必要字段: {‘, ‘.join(missing_fields)}”)

# 检查时间戳是否有效
if kline.get(‘datetime’, 0) <= 0:
issues.append("无效的时间戳")

# 检查价格是否为NaN
price_fields = ['open', 'high', 'low', 'close']
if any(np.isnan(kline.get(field, np.nan)) for field in price_fields):
issues.append("价格包含NaN值")

# 检查价格是否为正
if any(kline.get(field, 0) <= 0 for field in price_fields):
issues.append("价格包含非正值")

# 检查最高价是否大于等于最低价
high = kline.get('high', 0)
low = kline.get('low', 0)
if high = low and not (low <= open_price = low and not (low <= close_price <= high):
issues.append("收盘价不在高低价之间")

# 检查交易量是否为0(可选检查,因为某些市场可能存在零交易量)
if kline.get('volume', 0) <= 0:
issues.append("交易量为零")

# 检查OHLC是否全部相等(可选检查,因为可能存在一字板情况)
if open_price == high == low == close_price:
issues.append("OHLC全部相等")

if issues and logger:
#logger.warning(f"K线数据检查失败: {convert_timestamp(kline.get('datetime', '未知时间'))}, 问题: {', '.join(issues)}")
# print(f"K线数据检查失败: {convert_timestamp(kline.get('datetime', '未知时间'))}, 问题: {', '.join(issues)}")
pass
return len(issues) == 0, issues

def get_complete_kline(kline, api_api, logger=None):
"""
获取完整且正确的K线数据

Args:
kline: K线数据(通过api.get_kline_serial获取)
api_api: TqApi实例,用于获取最新数据
logger: 日志记录器(可选)
Returns:
tuple: (是否成功, K线数据/None)
"""
from tqsdk import TqApi
max_index = 299

for i in range(1, max_index):
# 每次循环都获取最新的K线数据
current_kline = kline.iloc[-1].copy()

# 确保是Series类型
if not isinstance(current_kline, pd.Series):
if logger:
logger.error("获取的K线不是Series类型,无法处理")
api_api.wait_update()
delay_ms(128)
continue

is_complete, issues = check_kline_completeness(current_kline, logger)

if is_complete:
if logger:
logger.info(f"循环{str(i)}次后获取到完整K线数据: {convert_timestamp(current_kline['datetime'])}")
return True, current_kline
else:
api_api.wait_update() # wait for volume update to be stable 解决网络数据传输不稳定导致的异常数据
delay_ms(128)

# 循环结束仍未获取到完整K线
current_kline = kline.iloc[-1].copy()
if logger:
logger.error(f"超过最大循环次数{str(max_index)}次,无法获取完整K线数据: {convert_timestamp(current_kline['datetime'])}")
# 严格按照用户要求,返回失败
return False, current_kline
def run(self):
"""运行策略主循环"""
# 遍历所有交易品种
for symbol in self.symbols:
# 这里不能限制等待时间
self.api.wait_update()
# “datetime”更新判断方式,可能因为时间太短,导致open,high,low,close相同,close_oi
if self.api.is_changing(self.klines[symbol].iloc[-1], ["datetime"]) : # 产生新k线
success, current_kline = get_complete_kline(self.klines[symbol],api_api=self.api, logger=self.logger)
if not success :
self.logger.error(f"获取K线数据不完整,程序退出: {convert_timestamp(current_kline['datetime'])}")
# 严格按照用户要求,K线失败则退出程序
self.api.close()
import sys
sys.exit(1)

您正在查看1个答案中的1个,单击此处查看所有答案。