用到的原型
Aberration 策略 (难度:初级)
完成后代码如下
此处完全照搬ringo老师的代码,改编如有错误之处请删帖,我怕误人
#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = "Ringo" ''' Aberration策略 (难度:初级) 参考: https://www.shinnytech.com/blog/aberration/ 注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改 ''' from tqsdk import TqApi, TargetPosTask from tqsdk.ta import BOLL from contextlib import closing # 使用BOLL指标计算中轨、上轨和下轨,其中26为周期N ,2为参数p def boll_line(klines): boll = BOLL(klines, 26, 2) midline = boll["mid"].iloc[-1] topline = boll["top"].iloc[-1] bottomline = boll["bottom"].iloc[-1] print("策略运行,中轨:%.2f,上轨为:%.2f,下轨为:%.2f" % (midline, topline, bottomline)) return midline, topline, bottomline async def demo(SYMBOL): quote = api.get_quote(SYMBOL) klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24) position = api.get_position(SYMBOL) target_pos = TargetPosTask(api, SYMBOL) async with api.register_update_notify([quote, klines]) as update_chan: # 确保收到了所有订阅的数据 while not api.is_serial_ready(klines): await update_chan.recv() midline, topline, bottomline = boll_line(klines) async for _ in update_chan: # 每次生成新的K线时重新计算BOLL指标 if api.is_changing(klines.iloc[-1], "datetime"): midline, topline, bottomline = boll_line(klines) # 每次最新价发生变化时进行判断 if api.is_changing(quote, "last_price"): # 判断开仓条件 if position.pos_long == 0 and position.pos_short == 0: # 如果最新价大于上轨,K线上穿上轨,开多仓 if quote.last_price > topline: print("K线上穿上轨,开多仓") target_pos.set_target_volume(20) # 如果最新价小于轨,K线下穿下轨,开空仓 elif quote.last_price < bottomline: print("K线下穿下轨,开空仓") target_pos.set_target_volume(-20) else: print("当前最新价%.2f,未穿上轨或下轨,不开仓" % quote.last_price) # 在多头情况下,空仓条件 elif position.pos_long > 0: # 如果最新价低于中线,多头清仓离场 if quote.last_price < midline: print("最新价低于中线,多头清仓离场") target_pos.set_target_volume(0) else: print("当前多仓,未穿越中线,仓位无变化") # 在空头情况下,空仓条件 elif position.pos_short > 0: # 如果最新价高于中线,空头清仓离场 if quote.last_price > midline: print("最新价高于中线,空头清仓离场") target_pos.set_target_volume(0) else: print("当前空仓,未穿越中线,仓位无变化") # 设置合约代码 list_SYMBOL = ["SHFE.cu2012", "DCE.m2105"] api = TqApi(auth="信易账户,账户密码") # 创建异步任务 for x in list_SYMBOL: api.create_task(demo(x)) # 关闭api,释放相应资源 with closing(api): while True: api.wait_update()
我觉得:
- 通常不需要import asyncio
- 并不是所有函数都要加async,通常需要等待(阻塞)的才加。
- 第一次获得K需要api.is_serial_ready(),用K计算前万万注意啊
- while True: api.wait_update()用async for _ in update_chan替换。for前的async不能写掉,它是一个死循环,完事儿用break退出
- 一个协程任务里,大概估计通常只需要一个update_chan实例就够了,如果函数中要用api.wait_update(),直接把update_chan传进去搞事情。例如这样
async def A(update_chan): #这是一个A函数 #。。。 async for _ in update_chan: #灯灯灯,一顿操作 break #然后在协程函数里调用它 async def demo(SYMBOL): #省略若干行 #调用A() await A(update_chan)
laolu123 已回答的问题 2022年6月23日
感谢分享~
感谢分享,请问楼主,如果策略中包含了class程序的话,要怎么改呢,比如类似官方示例程序中的海龟交易策略。他应该算是类实例中比较复杂的一个策略了。能否实现异步?
谢谢分享~
thuliubin 发表新评论 2021年5月20日
请教一下,按照这个方法对海龟策略进行了改写,程序能运行,但是会报task was destroyed,是哪儿处理有问题呢?
谢谢分享!