量化学习平台
文章
市场宽度
背离图
登录
注册
多策略2.0优化
策略
作者: 水滴
这篇博客,只是试一下效果,以后可能以这个方式,进行展示策略 ```python # 克隆自聚宽文章:https://www.joinquant.com/post/51758 # 标题:多策略组合2.0(高手续费,近五年年化30%,回撤7%) # 作者:O_iX # 克隆自聚宽文章:https://www.joinquant.com/post/51758 # 标题:多策略组合2.0(高手续费,近五年年化30%,回撤7%) # 作者:O_iX # 导入函数库 from jqdata import * import datetime import math import numpy as np import pandas as pd # 初始化函数,设定基准等等 def initialize(context): set_benchmark("515080.XSHG") set_option("avoid_future_data", True) set_option("use_real_price", True) log.info("初始函数开始运行且全局只运行一次") log.set_level("order", "error") set_slippage(FixedSlippage(0.02), type="stock") set_slippage(FixedSlippage(0.002), type="fund") set_order_cost(OrderCost(open_tax=0, close_tax=0.001, open_commission=0.0003, close_commission=0.0003, close_today_commission=0, min_commission=5), type="stock") set_order_cost(OrderCost(open_tax=0, close_tax=0, open_commission=0, close_commission=0, close_today_commission=0, min_commission=0), type="mmf") g.strategys = {} g.portfolio_value_proportion = [0, 0.3, 0.5, 0.2] set_subportfolios([SubPortfolioConfig(context.portfolio.starting_cash * g.portfolio_value_proportion[i], "stock") for i in range(4)]) g.strategys["搅屎棍策略"] = JSG_Strategy(context, subportfolio_index=1, name="搅屎棍策略") g.strategys["全天候策略"] = All_Day_Strategy(context, subportfolio_index=2, name="全天候策略") g.strategys["核心资产轮动策略"] = Rotation_ETF_Strategy(context, subportfolio_index=3, name="核心资产轮动策略") run_monthly(balance_subportfolios, 1, "9:00") if g.portfolio_value_proportion[1] > 0: run_daily(prepare_jsg_strategy, "9:05") run_weekly(adjust_jsg_strategy, 1, "9:31") run_daily(check_jsg_strategy, "14:50") if g.portfolio_value_proportion[2] > 0: run_monthly(adjust_all_day_strategy, 1, "9:40") if g.portfolio_value_proportion[3] > 0: run_daily(adjust_rotation_etf_strategy, "9:32") # 搅屎棍策略相关函数 def prepare_jsg_strategy(context): log.info("开始准备搅屎棍策略") g.strategys["搅屎棍策略"].prepare(context) def adjust_jsg_strategy(context): log.info("开始调整搅屎棍策略") g.strategys["搅屎棍策略"].adjust(context) def check_jsg_strategy(context): log.info("开始检查搅屎棍策略") g.strategys["搅屎棍策略"].check(context) # 全天候策略相关函数 def adjust_all_day_strategy(context): log.info("开始调整全天候策略") g.strategys["全天候策略"].adjust(context) # 核心资产轮动策略相关函数 def adjust_rotation_etf_strategy(context): log.info("开始调整核心资产轮动策略") g.strategys["核心资产轮动策略"].adjust(context) # 平衡子账户仓位函数 def balance_subportfolios(context): log.info("开始平衡子账户仓位") length = len(g.portfolio_value_proportion) log.info("调整前:" + str([context.subportfolios[i].total_value / context.portfolio.total_value for i in range(length)])) for i in range(1, length): target = g.portfolio_value_proportion[i] * context.portfolio.total_value value = context.subportfolios[i].total_value if context.subportfolios[i].available_cash > 0 and target < value: transfer_cash(from_pindex=i, to_pindex=0, cash=min(value - target, context.subportfolios[i].available_cash)) for i in range(1, length): target = g.portfolio_value_proportion[i] * context.portfolio.total_value value = context.subportfolios[i].total_value if target > value and context.subportfolios[0].available_cash > 0: transfer_cash(from_pindex=0, to_pindex=i, cash=min(target - value, context.subportfolios[0].available_cash)) log.info("调整后:" + str([context.subportfolios[i].total_value / context.portfolio.total_value for i in range(length)])) # 策略基类 class Strategy: def __init__(self, context, subportfolio_index, name): self.subportfolio_index = subportfolio_index self.name = name self.stock_sum = 1 self.hold_list = [] self.limit_up_list = [] self.fill_stock = "511880.XSHG" # 准备策略,获取持仓列表和涨停列表 def _prepare(self, context): self.hold_list = list(context.subportfolios[self.subportfolio_index].long_positions.keys()) if self.hold_list: df = get_price(self.hold_list, end_date=context.previous_date, frequency="daily", fields=["close", "high_limit"], count=1, panel=False, fill_paused=False) df = df[df["close"] == df["high_limit"]] self.limit_up_list = list(df.code) else: self.limit_up_list = [] # 检查持仓股票是否涨停打开 def _check(self, context): if self.limit_up_list: current_data = get_current_data() for stock in self.limit_up_list: if current_data[stock].last_price < current_data[stock].high_limit: log.info("[%s]涨停打开,卖出" % (stock)) self.close_position(stock) else: log.info("[%s]涨停,继续持有" % (stock)) # 调整持仓,买入目标股票,卖出不在目标列表中的股票 def _adjust(self, context, target): subportfolio = context.subportfolios[self.subportfolio_index] for security in self.hold_list: if (security not in target) and (security not in self.limit_up_list): self.close_position(security) position_count = len(subportfolio.long_positions) if len(target) > position_count: buy_num = min(len(target), self.stock_sum - position_count) value = subportfolio.available_cash / buy_num for security in target: if security not in list(subportfolio.long_positions.keys()): if self.open_position(security, value): if position_count == len(target): break # 下单,调整股票仓位到目标价值 def order_target_value_(self, security, value): if value == 0: log.debug("Selling out %s" % (security)) else: log.debug("Order %s to value %f" % (security, value)) return order_target_value(security, value, pindex=self.subportfolio_index) # 开仓 def open_position(self, security, value): order = self.order_target_value_(security, value) if order is not None and order.filled > 0: return True return False # 平仓 def close_position(self, security): order = self.order_target_value_(security, 0) if order is not None: if order.status == OrderStatus.held and order.filled == order.amount: return True return False # 过滤停牌股票 def filter_paused_stock(self, stock_list): current_data = get_current_data() return [stock for stock in stock_list if not current_data[stock].paused] # 过滤ST股票 def filter_st_stock(self, stock_list): current_data = get_current_data() return [stock for stock in stock_list if not current_data[stock].is_st and "ST" not in current_data[stock].name and "*" not in current_data[stock].name and "退" not in current_data[stock].name] # 过滤科创板和北交所股票 def filter_kcbj_stock(self, stock_list): for stock in stock_list[:]: if stock[0] == "4" or stock[0] == "8" or stock[:2] == "68" or stock[0] == "3": stock_list.remove(stock) return stock_list # 过滤涨停和跌停股票 def filter_limitup_limitdown_stock(self, context, stock_list): subportfolio = context.subportfolios[self.subportfolio_index] current_data = get_current_data() return [stock for stock in stock_list if stock in subportfolio.long_positions.keys() or (current_data[stock].last_price < current_data[stock].high_limit and current_data[stock].last_price > current_data[stock].low_limit)] # 过滤新股 def filter_new_stock(self, context, stock_list, days): yesterday = context.previous_date filtered_stocks = [stock for stock in stock_list if not yesterday - get_security_info(stock).start_date < datetime.timedelta(days)] log.info(f"过滤新股后剩余股票数量:{len(filtered_stocks)}") return filtered_stocks # 过滤高价股 def filter_high_price_stock(self, stock_list): last_prices = history(1, unit="1m", field="close", security_list=stock_list).iloc[0] filtered_stocks = last_prices[last_prices < 10].index.tolist() log.info(f"过滤高价股后剩余股票数量:{len(filtered_stocks)}") return filtered_stocks class JSG_Strategy(Strategy): def __init__(self, context, subportfolio_index, name): super().__init__(context, subportfolio_index, name) self.stock_sum = 6 self.num = 1 self.pass_months = [1, 4] # 获取股票行业 def getStockIndustry(self, stocks): industry = get_industry(stocks) dict = { stock: info["sw_l1"]["industry_name"] for stock, info in industry.items() if "sw_l1" in info } return pd.Series(dict) # 计算全市场宽度 def get_market_breadth(self, context): yesterday = context.previous_date stocks = get_index_stocks("000985.XSHG") count = 1 h = get_price( stocks, end_date=yesterday, frequency="1d", fields=["close"], count=count + 20, panel=False, ) h["date"] = pd.DatetimeIndex(h.time).date df_close = h.pivot(index="code", columns="date", values="close").dropna(axis=0) df_ma20 = df_close.rolling(window=20, axis=1).mean().iloc[:, -count:] df_bias = df_close.iloc[:, -count:] > df_ma20 df_bias["industry_name"] = self.getStockIndustry(stocks) df_ratio = ( (df_bias.groupby("industry_name").sum() * 100.0) / df_bias.groupby("industry_name").count() ).round() top_values = df_ratio.loc[:, yesterday].nlargest(self.num) I = top_values.index.tolist() log.info(f"全市场宽度:{np.array(df_ratio.sum(axis=0).mean())}") log.info(f"行业热度排名前{self.num}的行业:{[name for name in I]}") return I # 过滤股票 def filter(self, context): stocks = get_index_stocks("399101.XSHE", context.current_dt) log.info(f"初始股票池数量:{len(stocks)}") stocks = self.filter_kcbj_stock(stocks) log.info(f"过滤科创板股票后剩余股票数量:{len(stocks)}") stocks = self.filter_st_stock(stocks) log.info(f"过滤ST股票后剩余股票数量:{len(stocks)}") stocks = self.filter_new_stock(context, stocks, 375) stocks = self.filter_paused_stock(stocks) log.info(f"过滤停牌股票后剩余股票数量:{len(stocks)}") stocks = get_fundamentals( query( valuation.code, ) .filter( valuation.code.in_(stocks), income.np_parent_company_owners > 0, income.net_profit > 0, income.operating_revenue > 1e8, ) .order_by(valuation.market_cap.asc()) )["code"].tolist() log.info(f"根据财务指标过滤后剩余股票数量:{len(stocks)}") stocks = self.filter_limitup_limitdown_stock(context, stocks) log.info(f"过滤涨跌停股票后剩余股票数量:{len(stocks)}") selected_stocks = stocks[: min(len(stocks), self.stock_sum)] log.info(f"最终选择的股票数量:{len(selected_stocks)}") return selected_stocks # 判断是否为空仓月份 def is_empty_month(self, context): month = context.current_dt.month return month in self.pass_months # 选股 def select(self, context): I = self.get_market_breadth(context) industries = {"银行I", "有色金属I", "煤炭I", "钢铁I", "采掘I"} if not industries.intersection(I) and not self.is_empty_month(context): log.info("开仓") L = self.filter(context) else: log.info("跑") L = [self.fill_stock] return L def prepare(self, context): self._prepare(context) def adjust(self, context): target = self.select(context) self._adjust(context, target) def check(self, context): self._check(context) class All_Day_Strategy(Strategy): def __init__(self, context, subportfolio_index, name): super().__init__(context, subportfolio_index, name) self.min_volume = 2000 self.etf_pool = [ "511010.XSHG", "518880.XSHG", "513100.XSHG", "515080.XSHG", "159980.XSHE", "162411.XSHE", "159985.XSHE", ] self.rates = [0.4, 0.2, 0.15, 0.1, 0.05, 0.05, 0.05] # 调整持仓 def adjust(self, context): subportfolio = context.subportfolios[self.subportfolio_index] targets = { etf: subportfolio.total_value * rate for etf, rate in zip(self.etf_pool, self.rates) } if not subportfolio.long_positions: log.info("无持仓,按目标权重下单") for etf, target in targets.items(): self.order_target_value_(etf, target) else: log.info("有持仓,调整仓位") for etf, target in targets.items(): value = subportfolio.long_positions[etf].value minV = subportfolio.long_positions[etf].price * 100 if value - target > self.min_volume and minV > value - target: log.info(f"减少{etf}的仓位") self.order_target_value_(etf, target) for etf, target in targets.items(): value = subportfolio.long_positions[etf].value minV = subportfolio.long_positions[etf].price * 100 if ( target - value > self.min_volume and minV < subportfolio.available_cash and minV < target - value ): log.info(f"增加{etf}的仓位") self.order_target_value_(etf, target) class Rotation_ETF_Strategy(Strategy): def __init__(self, context, subportfolio_index, name): super().__init__(context, subportfolio_index, name) self.stock_sum = 1 self.etf_pool = [ "518880.XSHG", "513100.XSHG", "159915.XSHE", "510180.XSHG", ] self.m_days = 25 # 计算动量 def MOM(self, etf): df = attribute_history(etf, self.m_days, "1d", ["close"]) y = np.log(df["close"].values) n = len(y) x = np.arange(n) weights = np.linspace(1, 2, n) slope, intercept = np.polyfit(x, y, 1, w=weights) annualized_returns = math.pow(math.exp(slope), 250) - 1 residuals = y - (slope * x + intercept) r_squared = 1 - ( np.sum(weights * residuals**2) / np.sum(weights * (y - np.mean(y)) ** 2) ) return annualized_returns * r_squared # 选股 def select(self): score_list = [self.MOM(etf) for etf in self.etf_pool] df = pd.DataFrame(index=self.etf_pool, data={"score": score_list}) df = df.sort_values(by="score", ascending=False) df = df[(df["score"] > 0) & (df["score"] <= 5)] target = df.index.tolist() log.info(f"根据动量策略选择的ETF:{target}") if not target: target = [self.fill_stock] return target[: min(len(target), self.stock_sum)] def adjust(self, context): target = self.select() self._prepare(context) self._adjust(context, target) ```
文章分类
关于作者
水滴
注册时间: