# 获取合约的k线数据,计算均值,比较均值和最新价,根据偏移度给出提示 def get_kline_data(contracts): try: api = TqApi(TqKq(),auth=TqAuth("13817289146","232680")) async def demo(SYMBOL, cycle, ma_num, offset): # get_kline_serial 支持 await 异步写法,这里会订阅 K 线,等到收到 k 线数据才返回 klines = await api.get_kline_serial(SYMBOL, duration_seconds=cycle) async with api.register_update_notify() as update_chan: async for _ in update_chan: if api.is_changing(klines.iloc[-1], "datetime"): ma = sum(klines.close.iloc[-ma_num:]) / ma_num print(SYMBOL) klines_datetime = Timestamp(klines.datetime.iloc[-1]).to_pydatetime() + timedelta(hours=8) logger.log(logging.INFO, f"{SYMBOL}的均值为{ma}, {klines_datetime}") if abs((klines.close.iloc[-1] - ma)/ma) > offset: print(f"{SYMBOL}的最新价偏移{cycle}秒k线MA{ma_num}均线的{offset*100}%!") # 为每个合约创建异步任务 for symbol in contracts: api.create_task(demo(symbol[0], symbol[1], symbol[2], symbol[3])) while True: api.wait_update() except TqTimeoutError: api.close() print("不在交易时间,连接已断开")