用到的原型
Aberration 策略 (难度:初级)

完成后代码如下
此处完全照搬ringo老师的代码,改编如有错误之处请删帖,我怕误人
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Ringo"
'''
Aberration策略 (难度:初级)
参考: https://www.shinnytech.com/blog/aberration/
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''
from tqsdk import TqApi, TargetPosTask
from tqsdk.ta import BOLL
from contextlib import closing
# 使用BOLL指标计算中轨、上轨和下轨,其中26为周期N ,2为参数p
def boll_line(klines):
boll = BOLL(klines, 26, 2)
midline = boll["mid"].iloc[-1]
topline = boll["top"].iloc[-1]
bottomline = boll["bottom"].iloc[-1]
print("策略运行,中轨:%.2f,上轨为:%.2f,下轨为:%.2f" % (midline, topline, bottomline))
return midline, topline, bottomline
async def demo(SYMBOL):
quote = api.get_quote(SYMBOL)
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)
position = api.get_position(SYMBOL)
target_pos = TargetPosTask(api, SYMBOL)
async with api.register_update_notify([quote, klines]) as update_chan:
# 确保收到了所有订阅的数据
while not api.is_serial_ready(klines):
await update_chan.recv()
midline, topline, bottomline = boll_line(klines)
async for _ in update_chan:
# 每次生成新的K线时重新计算BOLL指标
if api.is_changing(klines.iloc[-1], "datetime"):
midline, topline, bottomline = boll_line(klines)
# 每次最新价发生变化时进行判断
if api.is_changing(quote, "last_price"):
# 判断开仓条件
if position.pos_long == 0 and position.pos_short == 0:
# 如果最新价大于上轨,K线上穿上轨,开多仓
if quote.last_price > topline:
print("K线上穿上轨,开多仓")
target_pos.set_target_volume(20)
# 如果最新价小于轨,K线下穿下轨,开空仓
elif quote.last_price < bottomline:
print("K线下穿下轨,开空仓")
target_pos.set_target_volume(-20)
else:
print("当前最新价%.2f,未穿上轨或下轨,不开仓" % quote.last_price)
# 在多头情况下,空仓条件
elif position.pos_long > 0:
# 如果最新价低于中线,多头清仓离场
if quote.last_price < midline:
print("最新价低于中线,多头清仓离场")
target_pos.set_target_volume(0)
else:
print("当前多仓,未穿越中线,仓位无变化")
# 在空头情况下,空仓条件
elif position.pos_short > 0:
# 如果最新价高于中线,空头清仓离场
if quote.last_price > midline:
print("最新价高于中线,空头清仓离场")
target_pos.set_target_volume(0)
else:
print("当前空仓,未穿越中线,仓位无变化")
# 设置合约代码
list_SYMBOL = ["SHFE.cu2012", "DCE.m2105"]
api = TqApi(auth="信易账户,账户密码")
# 创建异步任务
for x in list_SYMBOL:
api.create_task(demo(x))
# 关闭api,释放相应资源
with closing(api):
while True:
api.wait_update()
我觉得:
- 通常不需要import asyncio
- 并不是所有函数都要加async,通常需要等待(阻塞)的才加。
- 第一次获得K需要api.is_serial_ready(),用K计算前万万注意啊
- while True: api.wait_update()用async for _ in update_chan替换。for前的async不能写掉,它是一个死循环,完事儿用break退出
- 一个协程任务里,大概估计通常只需要一个update_chan实例就够了,如果函数中要用api.wait_update(),直接把update_chan传进去搞事情。例如这样
async def A(update_chan): #这是一个A函数 #。。。 async for _ in update_chan: #灯灯灯,一顿操作 break #然后在协程函数里调用它 async def demo(SYMBOL): #省略若干行 #调用A() await A(update_chan)
laolu123 已回答的问题 2022年6月23日
感谢分享~
感谢分享,请问楼主,如果策略中包含了class程序的话,要怎么改呢,比如类似官方示例程序中的海龟交易策略。他应该算是类实例中比较复杂的一个策略了。能否实现异步?
谢谢分享~
thuliubin 发表新评论 2021年5月20日
请教一下,按照这个方法对海龟策略进行了改写,程序能运行,但是会报task was destroyed,是哪儿处理有问题呢?
谢谢分享!