3.41K 浏览
0
from tqsdk import TqApi, TqSim, TqBacktest ,tafunc
from tqsdk.ta import ATR
from tqsdk.tafunc import ma
import datetime
import numpy as np
import pandas as pd
from datetime import date
from my_lib_log import main_fun
import argparse
import logging  # 引入logging模块
import os.path
import time
import sys
import threading
 class WorkerThread(threading.Thread):
    def __init__(self, api, Futures, Backtest_model):
        threading.Thread.__init__(self)
        self.api = api
        self.Futures = Futures
        self.Backtest_model = Backtest_model
        logger = logging.getLogger(self.Futures)
        self.logger = logger
     def run(self):
        main_fun(self.api, self.Futures, self.Backtest_model, self.logger)
  if __name__ == "__main__":
         SYMBOL_list = []
      parser = argparse.ArgumentParser()
    parser.add_argument('--EX')
    args = parser.parse_args()
     ex = args.EX
    config_file = "c:/config/"+ex+"_config.ini"
    with open(config_file, 'r') as f:
        SYMBOL_list = f.readlines()
      for i in range(0,len(SYMBOL_list)):
        SYMBOL_list[i] = SYMBOL_list[i].rstrip('\n')
     api_master = TqApi(TqSim())
     print("len" , len(SYMBOL_list))
    print(SYMBOL_list)
     thread_list = []
    for i in range(0,min(len(SYMBOL_list),15)):
        if SYMBOL_list[i] != "":
            # Create new threads
            thread1 = WorkerThread(api_master.copy(), str(SYMBOL_list[i]), -1)
            thread_list.append(thread1)
                 for i in range(0,len(thread_list)):            
        # Start new Threads
        thread_list[i].start()
        print(str(i)+" start " + str(SYMBOL_list[i]))
         while True:
        api_master.wait_update()

west 已回答的问题 2020年4月23日
0

你提供的代码里引用了外部文件,无法运行重现,请提供一个简单的可重现代码

west 发表新评论 2020年4月23日

我这边测试了一下是能获取到这个K线的

您正在查看1个答案中的1个,单击此处查看所有答案。