from tqsdk import TqApi, TqSim, TqBacktest ,tafunc
from tqsdk.ta import ATR
from tqsdk.tafunc import ma
import datetime
import numpy as np
import pandas as pd
from datetime import date
from my_lib_log import main_fun
import argparse
import logging # 引入logging模块
import os.path
import time
import sys
import threading
class WorkerThread(threading.Thread):
def __init__(self, api, Futures, Backtest_model):
threading.Thread.__init__(self)
self.api = api
self.Futures = Futures
self.Backtest_model = Backtest_model
logger = logging.getLogger(self.Futures)
self.logger = logger
def run(self):
main_fun(self.api, self.Futures, self.Backtest_model, self.logger)
if __name__ == "__main__":
SYMBOL_list = []
parser = argparse.ArgumentParser()
parser.add_argument('--EX')
args = parser.parse_args()
ex = args.EX
config_file = "c:/config/"+ex+"_config.ini"
with open(config_file, 'r') as f:
SYMBOL_list = f.readlines()
for i in range(0,len(SYMBOL_list)):
SYMBOL_list[i] = SYMBOL_list[i].rstrip('\n')
api_master = TqApi(TqSim())
print("len" , len(SYMBOL_list))
print(SYMBOL_list)
thread_list = []
for i in range(0,min(len(SYMBOL_list),15)):
if SYMBOL_list[i] != "":
# Create new threads
thread1 = WorkerThread(api_master.copy(), str(SYMBOL_list[i]), -1)
thread_list.append(thread1)
for i in range(0,len(thread_list)):
# Start new Threads
thread_list[i].start()
print(str(i)+" start " + str(SYMBOL_list[i]))
while True:
api_master.wait_update()


west 已回答的问题 2020年4月23日
我这边测试了一下是能获取到这个K线的