from tqsdk import TqApi, TqSim, TargetPosTask, BacktestFinished, TqBacktest
from tqsdk.tafunc import ma
from datetime import date
import multiprocessing
from multiprocessing import Pool
import sys
def MyStrategy(SHORT):
LONG = 60
SYMBOL = "SHFE.cu1907"
acc = TqSim()
try:
api = TqApi(acc, backtest=TqBacktest(start_dt=date(2019, 5, 6), end_dt=date(2019, 5, 10)))
data_length = LONG + 2
klines = api.get_kline_serial(SYMBOL, duration_seconds=60, data_length=data_length)
target_pos = TargetPosTask(api, SYMBOL)
while True:
api.wait_update()
if api.is_changing(klines.iloc[-1], "datetime"):
short_avg = ma(klines.close, SHORT)
long_avg = ma(klines.close, LONG)
if long_avg.iloc[-2] < short_avg.iloc[-2] and long_avg.iloc[-1] > short_avg.iloc[-1]:
target_pos.set_target_volume(-3)
if short_avg.iloc[-2] < long_avg.iloc[-2] and short_avg.iloc[-1] > long_avg.iloc[-1]:
target_pos.set_target_volume(3)
except BacktestFinished:
api.close()
print("SHORT=", SHORT, "最终权益=", acc.account.balance) # 每次回测结束时, 输出使用的参数和最终权益
sys.stdout.flush()
except Exception as e:
print("e:", e)
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.freeze_support()
p = Pool(4) # 进程池, 建议小于cpu数
for s in range(20, 40):
p.apply_async(MyStrategy, args=(s,)) # 把20个回测任务交给进程池执行
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
回测以上代码,以下两段代码始终都没有被执行到:
except BacktestFinished:
api.close()
print("SHORT=", SHORT, "最终权益=", acc.account.balance) # 每次回测结束时, 输出使用的参数和最终权益
sys.stdout.flush()
except Exception as e:
print("e:", e)
sys.stdout.flush()