5.54K 浏览
0

原帖提问:我是根据多线程的例子,监控12个合约,开了12个线程,在每个线程中用了该合约的4个周期,是不是太多了,导致每半个小时就出现一次MemoryError。该如何改进呢?

补充:我简化了代码,每个周期就是一个计算指标以及显示信息两条语句了,结果运行了一个小时又崩了。而且在运行中,信号有时会延时10几分钟才显示出来。我查了一下,是不是这个是python本身多线程的问题?有什么好的建议吗?

from tqsdk import TqApi
from tqsdk.ta import MA, MACD
from tqsdk.tafunc import hhv, llv
import datetime
import threading
import sys, time
  def logfile(content):
    sys.stdout.write('{}\n\n'.format(content))
 def ksignal(symbolcode, klines, ticks, freq):
             set1 = 0
    set2 = 0
    setall = 0
         ma5 = MA(klines, 5)
    ma10 = MA(klines, 10)
    ma20 = MA(klines, 20)
    ma30 = MA(klines, 30)
    ma60 = MA(klines, 60)
    ma120 = MA(klines, 120)
            klines["ma5"] = ma5.ma
    klines["ma10"] = ma10.ma
    klines["ma20"] = ma20.ma
    klines["ma30"] = ma30.ma
    klines["ma60"] = ma60.ma
    klines["ma120"] = ma120.ma
     macd = MACD(klines, 12, 26, 9)
             #判断均线是否金叉
    if ma5.ma.iloc[-1]>= ma10.ma.iloc[-1]:
        set1 = 1
    else:
        set1 = 0
      #判断macd走势
    if list(macd["bar"])[-1] >= 0:
        set2 = 1
    else:
        set2 = 0
      if set1 == 1 and set2 == 1:
        setall = 1
        #logfile(symbolcode.split('.')[1]+" :  "+str(freq)+" 分钟图出现多头信号,"  + str(datetime.datetime.fromtimestamp(ticks.datetime.iloc[-1] / 1e9)))
    else:
        setall = -1
        #logfile(symbolcode.split('.')[1]+" :  "+str(freq)+" 分钟图出现空头信号,"  + str(datetime.datetime.fromtimestamp(ticks.datetime.iloc[-1] / 1e9)))
     return setall
    class WorkerThread(threading.Thread):
    def __init__(self, api, symbol):
        threading.Thread.__init__(self)
        self.api = api
        self.symbol = symbol
     def run(self): #每个合约的3、5、15分钟告警
         klines3 = self.api.get_kline_serial(self.symbol, 3 * 60, 150)
        klines5 = self.api.get_kline_serial(self.symbol, 5 * 60, 150)
        klines15 = self.api.get_kline_serial(self.symbol, 15 * 60, 150)
        tickssymbol = self.api.get_tick_serial(self.symbol) 
                 logfile(self.symbol + threading.currentThread().getName() + ' start at ' + time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()))
                          while True:
                         self.api.wait_update()
                         if self.api.is_changing(klines3.iloc[-1]):
                ksignal(self.symbol, klines3, tickssymbol, 3)
            if self.api.is_changing(klines3.iloc[-1], 'datetime'):
                logfile(self.symbol + ' 3k ' + threading.currentThread().getName() + ' kline datetime: ' + str(datetime.datetime.fromtimestamp(tickssymbol.datetime.iloc[-1] / 1e9)) + ' curtime: ' + time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()))
                             if self.api.is_changing(klines5.iloc[-1]):
                ksignal(self.symbol, klines5, tickssymbol, 5)
            if self.api.is_changing(klines5.iloc[-1], 'datetime'):
                logfile(self.symbol + ' 5k ' + threading.currentThread().getName() + ' kline datetime: ' + str(datetime.datetime.fromtimestamp(tickssymbol.datetime.iloc[-1] / 1e9)) + ' curtime: ' + time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()))
                          if self.api.is_changing(klines15.iloc[-1]):  
                ksignal(self.symbol, klines15, tickssymbol, 15)
            if self.api.is_changing(klines15.iloc[-1], 'datetime'):  
                logfile(self.symbol + ' 15k ' + threading.currentThread().getName() + ' kline datetime: ' + str(datetime.datetime.fromtimestamp(tickssymbol.datetime.iloc[-1] / 1e9)) + ' curtime: ' + time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()))                
  if __name__ == "__main__":
    api_master = TqApi()
     # Create new threads
    thread1 = WorkerThread(api_master.copy(), "SHFE.rb2010") #合约SHFE.rb2010 
    thread2 = WorkerThread(api_master.copy(), "SHFE.ru2009") #合约SHFE.ru2009
    thread3 = WorkerThread(api_master.copy(), "DCE.i2009") #合约DCE.i2009
    thread4 = WorkerThread(api_master.copy(), "CZCE.CF009") #合约CZCE.CF009
    thread5 = WorkerThread(api_master.copy(), "DCE.m2009") #合约DCE.m2009
    thread6 = WorkerThread(api_master.copy(), "DCE.y2009") #合约DCE.y2009
    thread7 = WorkerThread(api_master.copy(), "DCE.p2009") #合约DCE.p2009
    thread8 = WorkerThread(api_master.copy(), "CZCE.OI009") #合约CZCE.OI009
    thread9 = WorkerThread(api_master.copy(), "CZCE.SR009") #合约CZCE.SR009
    thread10 = WorkerThread(api_master.copy(), "DCE.c2009") #合约DCE.c2009
    thread11 = WorkerThread(api_master.copy(), "SHFE.au2012") #合约SHFE.au2012 
    thread12 = WorkerThread(api_master.copy(), "SHFE.ag2012") #合约SHFE.ag2012
    #thread13 = WorkerThread(api_master.copy(), "SHFE.cu2007") #合约SHFE.cu2007 
      thread1.daemon = True
    thread2.daemon = True
    thread3.daemon = True
    thread4.daemon = True
    thread5.daemon = True
    thread6.daemon = True
    thread7.daemon = True
    thread8.daemon = True
    thread9.daemon = True
    thread10.daemon = True 
    thread11.daemon = True 
    thread12.daemon = True  
    #thread13.daemon = True 
                  # Start new Threads
    thread1.start()
    thread2.start()
    thread3.start()
    thread4.start()
    thread5.start()
    thread6.start()
    thread7.start()
    thread8.start()
    thread9.start()
    thread10.start()
    thread11.start()
    thread12.start()           
    #thread13.start()
                   while True:
        api_master.wait_update()

yangbin2590 发表新评论 2021年10月16日

代码格式乱了

from tqsdk import TqApi
from tqsdk.ta import MA, MACD
from tqsdk.tafunc import hhv, llv
import datetime
import threading
import sys, time

def logfile(content):
sys.stdout.write(‘{}nn’.format(content))

def ksignal(symbolcode, klines, ticks, freq):

set1 = 0
set2 = 0
setall = 0

ma5 = MA(klines, 5)
ma10 = MA(klines, 10)
ma20 = MA(klines, 20)
ma30 = MA(klines, 30)
ma60 = MA(klines, 60)
ma120 = MA(klines, 120)

klines[“ma5”] = ma5.ma
klines[“ma10”] = ma10.ma
klines[“ma20”] = ma20.ma
klines[“ma30”] = ma30.ma
klines[“ma60”] = ma60.ma
klines[“ma120”] = ma120.ma

macd = MACD(klines, 12, 26, 9)

#判断均线是否金叉
if ma5.ma.iloc[-1]>= ma10.ma.iloc[-1]:
set1 = 1
else:
set1 = 0

#判断macd走势
if list(macd[“bar”])[-1] >= 0:
set2 = 1
else:
set2 = 0

if set1 == 1 and set2 == 1:
setall = 1
#logfile(symbolcode.split(‘.’)[1]+” : “+str(freq)+” 分钟图出现多头信号,” + str(datetime.datetime.fromtimestamp(ticks.datetime.iloc[-1] / 1e9)))
else:
setall = -1
#logfile(symbolcode.split(‘.’)[1]+” : “+str(freq)+” 分钟图出现空头信号,” + str(datetime.datetime.fromtimestamp(ticks.datetime.iloc[-1] / 1e9)))

return setall

class WorkerThread(threading.Thread):
def __init__(self, api, symbol):
threading.Thread.__init__(self)
self.api = api
self.symbol = symbol

def run(self): #每个合约的3、5、15分钟告警

klines3 = self.api.get_kline_serial(self.symbol, 3 * 60, 150)
klines5 = self.api.get_kline_serial(self.symbol, 5 * 60, 150)
klines15 = self.api.get_kline_serial(self.symbol, 15 * 60, 150)
tickssymbol = self.api.get_tick_serial(self.symbol)

logfile(self.symbol + threading.currentThread().getName() + ‘ start at ‘ + time.strftime(“%Y-%m-%d %H:%M:%S “, time.localtime()))

while True:

self.api.wait_update()

if self.api.is_changing(klines3.iloc[-1]):
ksignal(self.symbol, klines3, tickssymbol, 3)
if self.api.is_changing(klines3.iloc[-1], ‘datetime’):
logfile(self.symbol + ‘ 3k ‘ + threading.currentThread().getName() + ‘ kline datetime: ‘ + str(datetime.datetime.fromtimestamp(tickssymbol.datetime.iloc[-1] / 1e9)) + ‘ curtime: ‘ + time.strftime(“%Y-%m-%d %H:%M:%S “, time.localtime()))

if self.api.is_changing(klines5.iloc[-1]):
ksignal(self.symbol, klines5, tickssymbol, 5)
if self.api.is_changing(klines5.iloc[-1], ‘datetime’):
logfile(self.symbol + ‘ 5k ‘ + threading.currentThread().getName() + ‘ kline datetime: ‘ + str(datetime.datetime.fromtimestamp(tickssymbol.datetime.iloc[-1] / 1e9)) + ‘ curtime: ‘ + time.strftime(“%Y-%m-%d %H:%M:%S “, time.localtime()))

if self.api.is_changing(klines15.iloc[-1]):
ksignal(self.symbol, klines15, tickssymbol, 15)
if self.api.is_changing(klines15.iloc[-1], ‘datetime’):
logfile(self.symbol + ‘ 15k ‘ + threading.currentThread().getName() + ‘ kline datetime: ‘ + str(datetime.datetime.fromtimestamp(tickssymbol.datetime.iloc[-1] / 1e9)) + ‘ curtime: ‘ + time.strftime(“%Y-%m-%d %H:%M:%S “, time.localtime()))

if __name__ == “__main__”:
api_master = TqApi()

# Create new threads
thread1 = WorkerThread(api_master.copy(), “SHFE.rb2010”) #合约SHFE.rb2010
thread2 = WorkerThread(api_master.copy(), “SHFE.ru2009”) #合约SHFE.ru2009
thread3 = WorkerThread(api_master.copy(), “DCE.i2009”) #合约DCE.i2009
thread4 = WorkerThread(api_master.copy(), “CZCE.CF009”) #合约CZCE.CF009
thread5 = WorkerThread(api_master.copy(), “DCE.m2009”) #合约DCE.m2009
thread6 = WorkerThread(api_master.copy(), “DCE.y2009”) #合约DCE.y2009
thread7 = WorkerThread(api_master.copy(), “DCE.p2009”) #合约DCE.p2009
thread8 = WorkerThread(api_master.copy(), “CZCE.OI009”) #合约CZCE.OI009
thread9 = WorkerThread(api_master.copy(), “CZCE.SR009”) #合约CZCE.SR009
thread10 = WorkerThread(api_master.copy(), “DCE.c2009”) #合约DCE.c2009
thread11 = WorkerThread(api_master.copy(), “SHFE.au2012”) #合约SHFE.au2012
thread12 = WorkerThread(api_master.copy(), “SHFE.ag2012”) #合约SHFE.ag2012
#thread13 = WorkerThread(api_master.copy(), “SHFE.cu2007”) #合约SHFE.cu2007

thread1.daemon = True
thread2.daemon = True
thread3.daemon = True
thread4.daemon = True
thread5.daemon = True
thread6.daemon = True
thread7.daemon = True
thread8.daemon = True
thread9.daemon = True
thread10.daemon = True
thread11.daemon = True
thread12.daemon = True
#thread13.daemon = True

# Start new Threads
thread1.start()
thread2.start()
thread3.start()
thread4.start()
thread5.start()
thread6.start()
thread7.start()
thread8.start()
thread9.start()
thread10.start()
thread11.start()
thread12.start()
#thread13.start()

while True:
api_master.wait_update()

请问问题解决了码?我也是这个问题,4个合约就把内存吃光了,怀疑是api_master.copy() 方法有问题

0

这个问题楼主解决了没?

helloworld 已回答的问题 2021年2月16日
0

我也用的WorkerThread(threading.Thread)做多线程,才几个品种同时运行,就有部分品种的线程收不到数据出问题了。

没法做多品种同时运行

rue 已回答的问题 2020年5月23日
0

不是吧,才10几个就崩了?我还想多策略多品种,最少也上百个线程的。

Jandy 已回答的问题 2020年5月23日