4.14K 浏览
0

from tqsdk import TqApi, TqSim, TargetPosTask, BacktestFinished, TqBacktest
from tqsdk.tafunc import ma
from datetime import date
import multiprocessing
from multiprocessing import Pool
import sys
def MyStrategy(SHORT):
  LONG = 60
  SYMBOL = "SHFE.cu1907"
  acc = TqSim()
  try:
    api = TqApi(acc, backtest=TqBacktest(start_dt=date(2019, 5, 6), end_dt=date(2019, 5, 10)))
    data_length = LONG + 2
    klines = api.get_kline_serial(SYMBOL, duration_seconds=60, data_length=data_length)
    target_pos = TargetPosTask(api, SYMBOL)
    while True:
      api.wait_update()
      if api.is_changing(klines.iloc[-1], "datetime"):
        short_avg = ma(klines.close, SHORT)
        long_avg = ma(klines.close, LONG)
        if long_avg.iloc[-2] < short_avg.iloc[-2] and long_avg.iloc[-1] > short_avg.iloc[-1]:
          target_pos.set_target_volume(-3)
        if short_avg.iloc[-2] < long_avg.iloc[-2] and short_avg.iloc[-1] > long_avg.iloc[-1]:
          target_pos.set_target_volume(3)
  except BacktestFinished:
    api.close()
    print("SHORT=", SHORT, "最终权益=", acc.account.balance)  # 每次回测结束时, 输出使用的参数和最终权益
    sys.stdout.flush()
  except Exception as e:
    print("e:", e)
    sys.stdout.flush()
  if __name__ == '__main__':
  multiprocessing.freeze_support()
  p = Pool(4)                               # 进程池, 建议小于cpu数
  for s in range(20, 40):
    p.apply_async(MyStrategy, args=(s,))  # 把20个回测任务交给进程池执行
  print('Waiting for all subprocesses done...')
  p.close()
  p.join()
  print('All subprocesses done.')

回测以上代码,以下两段代码始终都没有被执行到:

except BacktestFinished:
  api.close()
  print("SHORT=", SHORT, "最终权益=", acc.account.balance)  # 每次回测结束时, 输出使用的参数和最终权益
  sys.stdout.flush()
except Exception as e:
  print("e:", e)
  sys.stdout.flush()
west 已回答的问题 2020年2月5日
0

print("SHORT=", SHORT, "最终权益=", acc.account.balance)

这句代码有错,但exception没有被报出来,程序直接退出,回测还没有结束(即没有BacktestFinished),导致api没有被close。把“acc.account.balance”修改后可以正常运行:

TqSim中已经没有account变量,如果要查看balance,建议使用api.get_account()获取账户信息后输出显示。

west 已回答的问题 2020年2月5日