这篇博客,只是试一下效果,以后可能以这个方式,进行展示策略
# 克隆自聚宽文章:https://www.joinquant.com/post/51758
# 标题:多策略组合2.0(高手续费,近五年年化30%,回撤7%)
# 作者:O_iX
# 克隆自聚宽文章:https://www.joinquant.com/post/51758
# 标题:多策略组合2.0(高手续费,近五年年化30%,回撤7%)
# 作者:O_iX
# 导入函数库
from jqdata import *
import datetime
import math
import numpy as np
import pandas as pd
# 初始化函数,设定基准等等
def initialize(context):
set_benchmark("515080.XSHG")
set_option("avoid_future_data", True)
set_option("use_real_price", True)
log.info("初始函数开始运行且全局只运行一次")
log.set_level("order", "error")
set_slippage(FixedSlippage(0.02), type="stock")
set_slippage(FixedSlippage(0.002), type="fund")
set_order_cost(OrderCost(open_tax=0, close_tax=0.001, open_commission=0.0003, close_commission=0.0003, close_today_commission=0, min_commission=5), type="stock")
set_order_cost(OrderCost(open_tax=0, close_tax=0, open_commission=0, close_commission=0, close_today_commission=0, min_commission=0), type="mmf")
g.strategys = {}
g.portfolio_value_proportion = [0, 0.3, 0.5, 0.2]
set_subportfolios([SubPortfolioConfig(context.portfolio.starting_cash * g.portfolio_value_proportion[i], "stock") for i in range(4)])
g.strategys["搅屎棍策略"] = JSG_Strategy(context, subportfolio_index=1, name="搅屎棍策略")
g.strategys["全天候策略"] = All_Day_Strategy(context, subportfolio_index=2, name="全天候策略")
g.strategys["核心资产轮动策略"] = Rotation_ETF_Strategy(context, subportfolio_index=3, name="核心资产轮动策略")
run_monthly(balance_subportfolios, 1, "9:00")
if g.portfolio_value_proportion[1] > 0:
run_daily(prepare_jsg_strategy, "9:05")
run_weekly(adjust_jsg_strategy, 1, "9:31")
run_daily(check_jsg_strategy, "14:50")
if g.portfolio_value_proportion[2] > 0:
run_monthly(adjust_all_day_strategy, 1, "9:40")
if g.portfolio_value_proportion[3] > 0:
run_daily(adjust_rotation_etf_strategy, "9:32")
# 搅屎棍策略相关函数
def prepare_jsg_strategy(context):
log.info("开始准备搅屎棍策略")
g.strategys["搅屎棍策略"].prepare(context)
def adjust_jsg_strategy(context):
log.info("开始调整搅屎棍策略")
g.strategys["搅屎棍策略"].adjust(context)
def check_jsg_strategy(context):
log.info("开始检查搅屎棍策略")
g.strategys["搅屎棍策略"].check(context)
# 全天候策略相关函数
def adjust_all_day_strategy(context):
log.info("开始调整全天候策略")
g.strategys["全天候策略"].adjust(context)
# 核心资产轮动策略相关函数
def adjust_rotation_etf_strategy(context):
log.info("开始调整核心资产轮动策略")
g.strategys["核心资产轮动策略"].adjust(context)
# 平衡子账户仓位函数
def balance_subportfolios(context):
log.info("开始平衡子账户仓位")
length = len(g.portfolio_value_proportion)
log.info("调整前:" + str([context.subportfolios[i].total_value / context.portfolio.total_value for i in range(length)]))
for i in range(1, length):
target = g.portfolio_value_proportion[i] * context.portfolio.total_value
value = context.subportfolios[i].total_value
if context.subportfolios[i].available_cash > 0 and target < value:
transfer_cash(from_pindex=i, to_pindex=0, cash=min(value - target, context.subportfolios[i].available_cash))
for i in range(1, length):
target = g.portfolio_value_proportion[i] * context.portfolio.total_value
value = context.subportfolios[i].total_value
if target > value and context.subportfolios[0].available_cash > 0:
transfer_cash(from_pindex=0, to_pindex=i, cash=min(target - value, context.subportfolios[0].available_cash))
log.info("调整后:" + str([context.subportfolios[i].total_value / context.portfolio.total_value for i in range(length)]))
# 策略基类
class Strategy:
def __init__(self, context, subportfolio_index, name):
self.subportfolio_index = subportfolio_index
self.name = name
self.stock_sum = 1
self.hold_list = []
self.limit_up_list = []
self.fill_stock = "511880.XSHG"
# 准备策略,获取持仓列表和涨停列表
def _prepare(self, context):
self.hold_list = list(context.subportfolios[self.subportfolio_index].long_positions.keys())
if self.hold_list:
df = get_price(self.hold_list, end_date=context.previous_date, frequency="daily", fields=["close", "high_limit"], count=1, panel=False, fill_paused=False)
df = df[df["close"] == df["high_limit"]]
self.limit_up_list = list(df.code)
else:
self.limit_up_list = []
# 检查持仓股票是否涨停打开
def _check(self, context):
if self.limit_up_list:
current_data = get_current_data()
for stock in self.limit_up_list:
if current_data[stock].last_price < current_data[stock].high_limit:
log.info("[%s]涨停打开,卖出" % (stock))
self.close_position(stock)
else:
log.info("[%s]涨停,继续持有" % (stock))
# 调整持仓,买入目标股票,卖出不在目标列表中的股票
def _adjust(self, context, target):
subportfolio = context.subportfolios[self.subportfolio_index]
for security in self.hold_list:
if (security not in target) and (security not in self.limit_up_list):
self.close_position(security)
position_count = len(subportfolio.long_positions)
if len(target) > position_count:
buy_num = min(len(target), self.stock_sum - position_count)
value = subportfolio.available_cash / buy_num
for security in target:
if security not in list(subportfolio.long_positions.keys()):
if self.open_position(security, value):
if position_count == len(target):
break
# 下单,调整股票仓位到目标价值
def order_target_value_(self, security, value):
if value == 0:
log.debug("Selling out %s" % (security))
else:
log.debug("Order %s to value %f" % (security, value))
return order_target_value(security, value, pindex=self.subportfolio_index)
# 开仓
def open_position(self, security, value):
order = self.order_target_value_(security, value)
if order is not None and order.filled > 0:
return True
return False
# 平仓
def close_position(self, security):
order = self.order_target_value_(security, 0)
if order is not None:
if order.status == OrderStatus.held and order.filled == order.amount:
return True
return False
# 过滤停牌股票
def filter_paused_stock(self, stock_list):
current_data = get_current_data()
return [stock for stock in stock_list if not current_data[stock].paused]
# 过滤ST股票
def filter_st_stock(self, stock_list):
current_data = get_current_data()
return [stock for stock in stock_list if not current_data[stock].is_st and "ST" not in current_data[stock].name and "*" not in current_data[stock].name and "退" not in current_data[stock].name]
# 过滤科创板和北交所股票
def filter_kcbj_stock(self, stock_list):
for stock in stock_list[:]:
if stock[0] == "4" or stock[0] == "8" or stock[:2] == "68" or stock[0] == "3":
stock_list.remove(stock)
return stock_list
# 过滤涨停和跌停股票
def filter_limitup_limitdown_stock(self, context, stock_list):
subportfolio = context.subportfolios[self.subportfolio_index]
current_data = get_current_data()
return [stock for stock in stock_list if stock in subportfolio.long_positions.keys() or (current_data[stock].last_price < current_data[stock].high_limit and current_data[stock].last_price > current_data[stock].low_limit)]
# 过滤新股
def filter_new_stock(self, context, stock_list, days):
yesterday = context.previous_date
filtered_stocks = [stock for stock in stock_list if not yesterday - get_security_info(stock).start_date < datetime.timedelta(days)]
log.info(f"过滤新股后剩余股票数量:{len(filtered_stocks)}")
return filtered_stocks
# 过滤高价股
def filter_high_price_stock(self, stock_list):
last_prices = history(1, unit="1m", field="close", security_list=stock_list).iloc[0]
filtered_stocks = last_prices[last_prices < 10].index.tolist()
log.info(f"过滤高价股后剩余股票数量:{len(filtered_stocks)}")
return filtered_stocks
class JSG_Strategy(Strategy):
def __init__(self, context, subportfolio_index, name):
super().__init__(context, subportfolio_index, name)
self.stock_sum = 6
self.num = 1
self.pass_months = [1, 4]
# 获取股票行业
def getStockIndustry(self, stocks):
industry = get_industry(stocks)
dict = {
stock: info["sw_l1"]["industry_name"]
for stock, info in industry.items()
if "sw_l1" in info
}
return pd.Series(dict)
# 计算全市场宽度
def get_market_breadth(self, context):
yesterday = context.previous_date
stocks = get_index_stocks("000985.XSHG")
count = 1
h = get_price(
stocks,
end_date=yesterday,
frequency="1d",
fields=["close"],
count=count + 20,
panel=False,
)
h["date"] = pd.DatetimeIndex(h.time).date
df_close = h.pivot(index="code", columns="date", values="close").dropna(axis=0)
df_ma20 = df_close.rolling(window=20, axis=1).mean().iloc[:, -count:]
df_bias = df_close.iloc[:, -count:] > df_ma20
df_bias["industry_name"] = self.getStockIndustry(stocks)
df_ratio = (
(df_bias.groupby("industry_name").sum() * 100.0)
/ df_bias.groupby("industry_name").count()
).round()
top_values = df_ratio.loc[:, yesterday].nlargest(self.num)
I = top_values.index.tolist()
log.info(f"全市场宽度:{np.array(df_ratio.sum(axis=0).mean())}")
log.info(f"行业热度排名前{self.num}的行业:{[name for name in I]}")
return I
# 过滤股票
def filter(self, context):
stocks = get_index_stocks("399101.XSHE", context.current_dt)
log.info(f"初始股票池数量:{len(stocks)}")
stocks = self.filter_kcbj_stock(stocks)
log.info(f"过滤科创板股票后剩余股票数量:{len(stocks)}")
stocks = self.filter_st_stock(stocks)
log.info(f"过滤ST股票后剩余股票数量:{len(stocks)}")
stocks = self.filter_new_stock(context, stocks, 375)
stocks = self.filter_paused_stock(stocks)
log.info(f"过滤停牌股票后剩余股票数量:{len(stocks)}")
stocks = get_fundamentals(
query(
valuation.code,
)
.filter(
valuation.code.in_(stocks),
income.np_parent_company_owners > 0,
income.net_profit > 0,
income.operating_revenue > 1e8,
)
.order_by(valuation.market_cap.asc())
)["code"].tolist()
log.info(f"根据财务指标过滤后剩余股票数量:{len(stocks)}")
stocks = self.filter_limitup_limitdown_stock(context, stocks)
log.info(f"过滤涨跌停股票后剩余股票数量:{len(stocks)}")
selected_stocks = stocks[: min(len(stocks), self.stock_sum)]
log.info(f"最终选择的股票数量:{len(selected_stocks)}")
return selected_stocks
# 判断是否为空仓月份
def is_empty_month(self, context):
month = context.current_dt.month
return month in self.pass_months
# 选股
def select(self, context):
I = self.get_market_breadth(context)
industries = {"银行I", "有色金属I", "煤炭I", "钢铁I", "采掘I"}
if not industries.intersection(I) and not self.is_empty_month(context):
log.info("开仓")
L = self.filter(context)
else:
log.info("跑")
L = [self.fill_stock]
return L
def prepare(self, context):
self._prepare(context)
def adjust(self, context):
target = self.select(context)
self._adjust(context, target)
def check(self, context):
self._check(context)
class All_Day_Strategy(Strategy):
def __init__(self, context, subportfolio_index, name):
super().__init__(context, subportfolio_index, name)
self.min_volume = 2000
self.etf_pool = [
"511010.XSHG",
"518880.XSHG",
"513100.XSHG",
"515080.XSHG",
"159980.XSHE",
"162411.XSHE",
"159985.XSHE",
]
self.rates = [0.4, 0.2, 0.15, 0.1, 0.05, 0.05, 0.05]
# 调整持仓
def adjust(self, context):
subportfolio = context.subportfolios[self.subportfolio_index]
targets = {
etf: subportfolio.total_value * rate
for etf, rate in zip(self.etf_pool, self.rates)
}
if not subportfolio.long_positions:
log.info("无持仓,按目标权重下单")
for etf, target in targets.items():
self.order_target_value_(etf, target)
else:
log.info("有持仓,调整仓位")
for etf, target in targets.items():
value = subportfolio.long_positions[etf].value
minV = subportfolio.long_positions[etf].price * 100
if value - target > self.min_volume and minV > value - target:
log.info(f"减少{etf}的仓位")
self.order_target_value_(etf, target)
for etf, target in targets.items():
value = subportfolio.long_positions[etf].value
minV = subportfolio.long_positions[etf].price * 100
if (
target - value > self.min_volume
and minV < subportfolio.available_cash
and minV < target - value
):
log.info(f"增加{etf}的仓位")
self.order_target_value_(etf, target)
class Rotation_ETF_Strategy(Strategy):
def __init__(self, context, subportfolio_index, name):
super().__init__(context, subportfolio_index, name)
self.stock_sum = 1
self.etf_pool = [
"518880.XSHG",
"513100.XSHG",
"159915.XSHE",
"510180.XSHG",
]
self.m_days = 25
# 计算动量
def MOM(self, etf):
df = attribute_history(etf, self.m_days, "1d", ["close"])
y = np.log(df["close"].values)
n = len(y)
x = np.arange(n)
weights = np.linspace(1, 2, n)
slope, intercept = np.polyfit(x, y, 1, w=weights)
annualized_returns = math.pow(math.exp(slope), 250) - 1
residuals = y - (slope * x + intercept)
r_squared = 1 - (
np.sum(weights * residuals**2) / np.sum(weights * (y - np.mean(y)) ** 2)
)
return annualized_returns * r_squared
# 选股
def select(self):
score_list = [self.MOM(etf) for etf in self.etf_pool]
df = pd.DataFrame(index=self.etf_pool, data={"score": score_list})
df = df.sort_values(by="score", ascending=False)
df = df[(df["score"] > 0) & (df["score"] <= 5)]
target = df.index.tolist()
log.info(f"根据动量策略选择的ETF:{target}")
if not target:
target = [self.fill_stock]
return target[: min(len(target), self.stock_sum)]
def adjust(self, context):
target = self.select()
self._prepare(context)
self._adjust(context, target)
注册时间: