多策略2.0优化

策略 作者: 水滴

这篇博客,只是试一下效果,以后可能以这个方式,进行展示策略

# 克隆自聚宽文章:https://www.joinquant.com/post/51758
# 标题:多策略组合2.0(高手续费,近五年年化30%,回撤7%)
# 作者:O_iX

# 克隆自聚宽文章:https://www.joinquant.com/post/51758
# 标题:多策略组合2.0(高手续费,近五年年化30%,回撤7%)
# 作者:O_iX

# 导入函数库
from jqdata import *
import datetime
import math
import numpy as np
import pandas as pd

# 初始化函数,设定基准等等
def initialize(context):
    set_benchmark("515080.XSHG")
    set_option("avoid_future_data", True)
    set_option("use_real_price", True)
    log.info("初始函数开始运行且全局只运行一次")
    log.set_level("order", "error")
    set_slippage(FixedSlippage(0.02), type="stock")
    set_slippage(FixedSlippage(0.002), type="fund")
    set_order_cost(OrderCost(open_tax=0, close_tax=0.001, open_commission=0.0003, close_commission=0.0003, close_today_commission=0, min_commission=5), type="stock")
    set_order_cost(OrderCost(open_tax=0, close_tax=0, open_commission=0, close_commission=0, close_today_commission=0, min_commission=0), type="mmf")
    g.strategys = {}
    g.portfolio_value_proportion = [0, 0.3, 0.5, 0.2]
    set_subportfolios([SubPortfolioConfig(context.portfolio.starting_cash * g.portfolio_value_proportion[i], "stock") for i in range(4)])
    g.strategys["搅屎棍策略"] = JSG_Strategy(context, subportfolio_index=1, name="搅屎棍策略")
    g.strategys["全天候策略"] = All_Day_Strategy(context, subportfolio_index=2, name="全天候策略")
    g.strategys["核心资产轮动策略"] = Rotation_ETF_Strategy(context, subportfolio_index=3, name="核心资产轮动策略")
    run_monthly(balance_subportfolios, 1, "9:00")
    if g.portfolio_value_proportion[1] > 0:
        run_daily(prepare_jsg_strategy, "9:05")
        run_weekly(adjust_jsg_strategy, 1, "9:31")
        run_daily(check_jsg_strategy, "14:50")
    if g.portfolio_value_proportion[2] > 0:
        run_monthly(adjust_all_day_strategy, 1, "9:40")
    if g.portfolio_value_proportion[3] > 0:
        run_daily(adjust_rotation_etf_strategy, "9:32")


# 搅屎棍策略相关函数
def prepare_jsg_strategy(context):
    log.info("开始准备搅屎棍策略")
    g.strategys["搅屎棍策略"].prepare(context)

def adjust_jsg_strategy(context):
    log.info("开始调整搅屎棍策略")
    g.strategys["搅屎棍策略"].adjust(context)

def check_jsg_strategy(context):
    log.info("开始检查搅屎棍策略")
    g.strategys["搅屎棍策略"].check(context)


# 全天候策略相关函数
def adjust_all_day_strategy(context):
    log.info("开始调整全天候策略")
    g.strategys["全天候策略"].adjust(context)


# 核心资产轮动策略相关函数
def adjust_rotation_etf_strategy(context):
    log.info("开始调整核心资产轮动策略")
    g.strategys["核心资产轮动策略"].adjust(context)


# 平衡子账户仓位函数
def balance_subportfolios(context):
    log.info("开始平衡子账户仓位")
    length = len(g.portfolio_value_proportion)
    log.info("调整前:" + str([context.subportfolios[i].total_value / context.portfolio.total_value for i in range(length)]))
    for i in range(1, length):
        target = g.portfolio_value_proportion[i] * context.portfolio.total_value
        value = context.subportfolios[i].total_value
        if context.subportfolios[i].available_cash > 0 and target < value:
            transfer_cash(from_pindex=i, to_pindex=0, cash=min(value - target, context.subportfolios[i].available_cash))
    for i in range(1, length):
        target = g.portfolio_value_proportion[i] * context.portfolio.total_value
        value = context.subportfolios[i].total_value
        if target > value and context.subportfolios[0].available_cash > 0:
            transfer_cash(from_pindex=0, to_pindex=i, cash=min(target - value, context.subportfolios[0].available_cash))
    log.info("调整后:" + str([context.subportfolios[i].total_value / context.portfolio.total_value for i in range(length)]))


# 策略基类
class Strategy:
    def __init__(self, context, subportfolio_index, name):
        self.subportfolio_index = subportfolio_index
        self.name = name
        self.stock_sum = 1
        self.hold_list = []
        self.limit_up_list = []
        self.fill_stock = "511880.XSHG"

    # 准备策略,获取持仓列表和涨停列表
    def _prepare(self, context):
        self.hold_list = list(context.subportfolios[self.subportfolio_index].long_positions.keys())
        if self.hold_list:
            df = get_price(self.hold_list, end_date=context.previous_date, frequency="daily", fields=["close", "high_limit"], count=1, panel=False, fill_paused=False)
            df = df[df["close"] == df["high_limit"]]
            self.limit_up_list = list(df.code)
        else:
            self.limit_up_list = []

    # 检查持仓股票是否涨停打开
    def _check(self, context):
        if self.limit_up_list:
            current_data = get_current_data()
            for stock in self.limit_up_list:
                if current_data[stock].last_price < current_data[stock].high_limit:
                    log.info("[%s]涨停打开,卖出" % (stock))
                    self.close_position(stock)
                else:
                    log.info("[%s]涨停,继续持有" % (stock))

    # 调整持仓,买入目标股票,卖出不在目标列表中的股票
    def _adjust(self, context, target):
        subportfolio = context.subportfolios[self.subportfolio_index]
        for security in self.hold_list:
            if (security not in target) and (security not in self.limit_up_list):
                self.close_position(security)
        position_count = len(subportfolio.long_positions)
        if len(target) > position_count:
            buy_num = min(len(target), self.stock_sum - position_count)
            value = subportfolio.available_cash / buy_num
            for security in target:
                if security not in list(subportfolio.long_positions.keys()):
                    if self.open_position(security, value):
                        if position_count == len(target):
                            break

    # 下单,调整股票仓位到目标价值
    def order_target_value_(self, security, value):
        if value == 0:
            log.debug("Selling out %s" % (security))
        else:
            log.debug("Order %s to value %f" % (security, value))
        return order_target_value(security, value, pindex=self.subportfolio_index)

    # 开仓
    def open_position(self, security, value):
        order = self.order_target_value_(security, value)
        if order is not None and order.filled > 0:
            return True
        return False

    # 平仓
    def close_position(self, security):
        order = self.order_target_value_(security, 0)
        if order is not None:
            if order.status == OrderStatus.held and order.filled == order.amount:
                return True
        return False

    # 过滤停牌股票
    def filter_paused_stock(self, stock_list):
        current_data = get_current_data()
        return [stock for stock in stock_list if not current_data[stock].paused]

    # 过滤ST股票
    def filter_st_stock(self, stock_list):
        current_data = get_current_data()
        return [stock for stock in stock_list if not current_data[stock].is_st and "ST" not in current_data[stock].name and "*" not in current_data[stock].name and "退" not in current_data[stock].name]

    # 过滤科创板和北交所股票
    def filter_kcbj_stock(self, stock_list):
        for stock in stock_list[:]:
            if stock[0] == "4" or stock[0] == "8" or stock[:2] == "68" or stock[0] == "3":
                stock_list.remove(stock)
        return stock_list

    # 过滤涨停和跌停股票
    def filter_limitup_limitdown_stock(self, context, stock_list):
        subportfolio = context.subportfolios[self.subportfolio_index]
        current_data = get_current_data()
        return [stock for stock in stock_list if stock in subportfolio.long_positions.keys() or (current_data[stock].last_price < current_data[stock].high_limit and current_data[stock].last_price > current_data[stock].low_limit)]

    # 过滤新股
    def filter_new_stock(self, context, stock_list, days):
        yesterday = context.previous_date
        filtered_stocks = [stock for stock in stock_list if not yesterday - get_security_info(stock).start_date < datetime.timedelta(days)]
        log.info(f"过滤新股后剩余股票数量:{len(filtered_stocks)}")
        return filtered_stocks
    
    # 过滤高价股
    def filter_high_price_stock(self, stock_list):
        last_prices = history(1, unit="1m", field="close", security_list=stock_list).iloc[0]
        filtered_stocks = last_prices[last_prices < 10].index.tolist()
        log.info(f"过滤高价股后剩余股票数量:{len(filtered_stocks)}")
        return filtered_stocks

class JSG_Strategy(Strategy):
    def __init__(self, context, subportfolio_index, name):
        super().__init__(context, subportfolio_index, name)
        self.stock_sum = 6
        self.num = 1
        self.pass_months = [1, 4]

    # 获取股票行业
    def getStockIndustry(self, stocks):
        industry = get_industry(stocks)
        dict = {
            stock: info["sw_l1"]["industry_name"]
            for stock, info in industry.items()
            if "sw_l1" in info
        }
        return pd.Series(dict)

    # 计算全市场宽度
    def get_market_breadth(self, context):
        yesterday = context.previous_date
        stocks = get_index_stocks("000985.XSHG")
        count = 1
        h = get_price(
            stocks,
            end_date=yesterday,
            frequency="1d",
            fields=["close"],
            count=count + 20,
            panel=False,
        )
        h["date"] = pd.DatetimeIndex(h.time).date
        df_close = h.pivot(index="code", columns="date", values="close").dropna(axis=0)
        df_ma20 = df_close.rolling(window=20, axis=1).mean().iloc[:, -count:]
        df_bias = df_close.iloc[:, -count:] > df_ma20
        df_bias["industry_name"] = self.getStockIndustry(stocks)
        df_ratio = (
            (df_bias.groupby("industry_name").sum() * 100.0)
            / df_bias.groupby("industry_name").count()
        ).round()
        top_values = df_ratio.loc[:, yesterday].nlargest(self.num)
        I = top_values.index.tolist()
        log.info(f"全市场宽度:{np.array(df_ratio.sum(axis=0).mean())}")
        log.info(f"行业热度排名前{self.num}的行业:{[name for name in I]}")
        return I

    # 过滤股票
    def filter(self, context):
        stocks = get_index_stocks("399101.XSHE", context.current_dt)
        log.info(f"初始股票池数量:{len(stocks)}")
        stocks = self.filter_kcbj_stock(stocks)
        log.info(f"过滤科创板股票后剩余股票数量:{len(stocks)}")
        stocks = self.filter_st_stock(stocks)
        log.info(f"过滤ST股票后剩余股票数量:{len(stocks)}")
        stocks = self.filter_new_stock(context, stocks, 375)
        stocks = self.filter_paused_stock(stocks)
        log.info(f"过滤停牌股票后剩余股票数量:{len(stocks)}")
        stocks = get_fundamentals(
            query(
                valuation.code,
            )
            .filter(
                valuation.code.in_(stocks),
                income.np_parent_company_owners > 0,  
                income.net_profit > 0,  
                income.operating_revenue > 1e8,  
            )
            .order_by(valuation.market_cap.asc())
        )["code"].tolist()
        log.info(f"根据财务指标过滤后剩余股票数量:{len(stocks)}")
        stocks = self.filter_limitup_limitdown_stock(context, stocks)
        log.info(f"过滤涨跌停股票后剩余股票数量:{len(stocks)}")
        selected_stocks = stocks[: min(len(stocks), self.stock_sum)]
        log.info(f"最终选择的股票数量:{len(selected_stocks)}")
        return selected_stocks

    # 判断是否为空仓月份
    def is_empty_month(self, context):
        month = context.current_dt.month
        return month in self.pass_months

    # 选股
    def select(self, context):
        I = self.get_market_breadth(context)
        industries = {"银行I", "有色金属I", "煤炭I", "钢铁I", "采掘I"}
        if not industries.intersection(I) and not self.is_empty_month(context):
            log.info("开仓")
            L = self.filter(context)
        else:
            log.info("跑")
            L = [self.fill_stock]
        return L

    def prepare(self, context):
        self._prepare(context)

    def adjust(self, context):
        target = self.select(context)
        self._adjust(context, target)

    def check(self, context):
        self._check(context)

class All_Day_Strategy(Strategy):
    def __init__(self, context, subportfolio_index, name):
        super().__init__(context, subportfolio_index, name)
        self.min_volume = 2000
        self.etf_pool = [
            "511010.XSHG",  
            "518880.XSHG",  
            "513100.XSHG",  
            "515080.XSHG",  
            "159980.XSHE",  
            "162411.XSHE",  
            "159985.XSHE",  
        ]
        self.rates = [0.4, 0.2, 0.15, 0.1, 0.05, 0.05, 0.05]

    # 调整持仓
    def adjust(self, context):
        subportfolio = context.subportfolios[self.subportfolio_index]
        targets = {
            etf: subportfolio.total_value * rate
            for etf, rate in zip(self.etf_pool, self.rates)
        }
        if not subportfolio.long_positions:
            log.info("无持仓,按目标权重下单")
            for etf, target in targets.items():
                self.order_target_value_(etf, target)
        else:
            log.info("有持仓,调整仓位")
            for etf, target in targets.items():
                value = subportfolio.long_positions[etf].value
                minV = subportfolio.long_positions[etf].price * 100
                if value - target > self.min_volume and minV > value - target:
                    log.info(f"减少{etf}的仓位")
                    self.order_target_value_(etf, target)
            for etf, target in targets.items():
                value = subportfolio.long_positions[etf].value
                minV = subportfolio.long_positions[etf].price * 100
                if (
                    target - value > self.min_volume
                    and minV < subportfolio.available_cash
                    and minV < target - value
                ):
                    log.info(f"增加{etf}的仓位")
                    self.order_target_value_(etf, target)

class Rotation_ETF_Strategy(Strategy):
    def __init__(self, context, subportfolio_index, name):
        super().__init__(context, subportfolio_index, name)
        self.stock_sum = 1
        self.etf_pool = [
            "518880.XSHG",  
            "513100.XSHG",  
            "159915.XSHE",  
            "510180.XSHG",  
        ]
        self.m_days = 25  

    # 计算动量
    def MOM(self, etf):
        df = attribute_history(etf, self.m_days, "1d", ["close"])
        y = np.log(df["close"].values)
        n = len(y)
        x = np.arange(n)
        weights = np.linspace(1, 2, n)  
        slope, intercept = np.polyfit(x, y, 1, w=weights)
        annualized_returns = math.pow(math.exp(slope), 250) - 1
        residuals = y - (slope * x + intercept)
        r_squared = 1 - (
            np.sum(weights * residuals**2) / np.sum(weights * (y - np.mean(y)) ** 2)
        )
        return annualized_returns * r_squared

    # 选股
    def select(self):
        score_list = [self.MOM(etf) for etf in self.etf_pool]
        df = pd.DataFrame(index=self.etf_pool, data={"score": score_list})
        df = df.sort_values(by="score", ascending=False)
        df = df[(df["score"] > 0) & (df["score"] <= 5)]
        target = df.index.tolist()
        log.info(f"根据动量策略选择的ETF:{target}")
        if not target:
            target = [self.fill_stock]
        return target[: min(len(target), self.stock_sum)]

    def adjust(self, context):
        target = self.select()
        self._prepare(context)
        self._adjust(context, target)
文章分类
关于作者
水滴

注册时间: