513 浏览
0

自己写了一个用于追单的类,比较原始,可能存在一些错误,请大家参考,如果有任何问题欢迎交流

class TqTrader:
    def __init__(self, *args, **kwargs):
        self.tqapi:TqApi = kwargs.pop("tqapi", None)
        assert self.tqapi is not None, "tqapi is required"
                 self.symbol:str = kwargs.pop("symbol", "")
        assert self.symbol != "", "symbol is required"
        self.freq:int = kwargs.pop("freq", 60)
        self.position:Position = self.tqapi.get_position(self.symbol)
        self.ticks:pd.DataFrame = self.tqapi.get_tick_serial(self.symbol, 1)            
      def _decide_price(self, tick:Tick, amount:float, offset:str, premium:float=1):
        """下单价格,这里设计了比挂单价+一点溢价的方式"""
        price = {
            "OPEN": tick.ask_price1 + premium if amount > 0 else tick.bid_price1 - premium,
            "CLOSE": tick.bid_price1 - premium if amount > 0 else tick.ask_price1 + premium
        }
        return price[offset]
      def _decide_close_action(self, amount:float):
        """判断平仓的动作,上海交易所存在平今仓,这里采取了先平昨后平今的动作
        可以根据需要修改逻辑
        """
        offset:str = "CLOSETODAY"
        his_long_closeable = amount > 0 and self.position.pos_long_his > 0
        his_short_closeable = amount < 0 and self.position.pos_short_his > 0
        if his_long_closeable or his_short_closeable:
            offset = "CLOSE"
        return offset
      def _decide_direction(self, offset:str, amount:float):
        """判断恰当的交易方向,有时候在平仓时容易写错方向"""
        if offset == "OPEN":
            return "BUY" if amount > 0 else "SELL"
        return "SELL" if amount > 0 else "BUY"
      def trade(self, offset:str, amount, order_type="FAK", retry=0):
        """执行单次交易"""
        tick:Tick = self.ticks.iloc[-1]
        args = {
            "symbol": self.symbol,
            "direction": self._decide_direction(offset, amount),
            "volume": abs(amount),
            "advanced": order_type
        }
        traded, count = 0, 0
        order = None
        while traded < abs(amount):
            args["limit_price"] = self._decide_price(tick, amount, offset)
            args["offset"] = offset
            if offset == "CLOSE":
                args["offset"] = self._decide_close_action(amount)
            if (not order) or (not order.status == "ALIVE"):
                order:Order = self.tqapi.insert_order(**args)
            self.tqapi.wait_update()
            traded += order.volume_orign - order.volume_left
            args["volume"] = order.volume_left
            count += 1
            if retry > 0 and count > retry:
                print("超过重试次数")
                break
        return traded
       def buy(self, symbol, amount, retry=0):
        """开仓"""
        return self.trade(symbol, "OPEN", amount, retry)
      def sell(self, symbol, amount, retry=0):
        """平仓"""
        return self.trade(symbol, "CLOSE", amount, retry)
      def execute(self, signal, retry=0):
        """执行交易信号"""
        symbol = self.symbol
        if signal < 0 and self.position.pos >= 0:
            while self.position.pos > 0:
                self.sell(symbol, self.position.pos, retry=retry)
            self.buy(symbol, -1, retry=retry)
        elif signal > 0 and self.position.pos <= 0:
            while self.position.pos < 0:
                self.sell(symbol, self.position.pos, retry=retry)
            self.buy(symbol, 1)
        elif signal == 0:
            while self.position.pos != 0:
                self.sell(symbol, self.position.pos, retry=retry)
        else:
            print(f"当前持仓:({self.position.pos}), 忽略信号:({signal})")

tqsdkdzqh 已回答的问题 2023年9月21日