from tqsdk import TqApi, TqSim, TqBacktest ,tafunc from tqsdk.ta import ATR from tqsdk.tafunc import ma import datetime import numpy as np import pandas as pd from datetime import date from my_lib_log import main_fun import argparse import logging # 引入logging模块 import os.path import time import sys import threading class WorkerThread(threading.Thread): def __init__(self, api, Futures, Backtest_model): threading.Thread.__init__(self) self.api = api self.Futures = Futures self.Backtest_model = Backtest_model logger = logging.getLogger(self.Futures) self.logger = logger def run(self): main_fun(self.api, self.Futures, self.Backtest_model, self.logger) if __name__ == "__main__": SYMBOL_list = [] parser = argparse.ArgumentParser() parser.add_argument('--EX') args = parser.parse_args() ex = args.EX config_file = "c:/config/"+ex+"_config.ini" with open(config_file, 'r') as f: SYMBOL_list = f.readlines() for i in range(0,len(SYMBOL_list)): SYMBOL_list[i] = SYMBOL_list[i].rstrip('\n') api_master = TqApi(TqSim()) print("len" , len(SYMBOL_list)) print(SYMBOL_list) thread_list = [] for i in range(0,min(len(SYMBOL_list),15)): if SYMBOL_list[i] != "": # Create new threads thread1 = WorkerThread(api_master.copy(), str(SYMBOL_list[i]), -1) thread_list.append(thread1) for i in range(0,len(thread_list)): # Start new Threads thread_list[i].start() print(str(i)+" start " + str(SYMBOL_list[i])) while True: api_master.wait_update()
west 已回答的问题 2020年4月23日
我这边测试了一下是能获取到这个K线的