# 克隆自聚宽文章:https://www.joinquant.com/post/41718
# 标题:多因子宽基ETF择时轮动改进版-高收益大资金低回撤
# 作者:养家大哥
# 标题:ETF动量轮动RSRS择时-V15.0,2023/3/23
# 作者:养家大哥
# 标题:动量ETF轮动RSRS择时-v16
# 作者:杨德勇
# v2 养家大哥的思路:
# 趋势因子的特点是无法及时判断趋势的变向,往往趋势变向一段时间后才能跟上,
# 巨大回撤往往就发生在这种时候。因此基于动量因子的一阶导数,衡量趋势的潜在变化速度,
# 若变化速度过快则空仓,反之则按原计划操作。
# 可以进一步发散,衡量动量因子的二阶导、三阶导等等,暂时只测试过一阶导,就是目前这个升级2版本。
from jqdata import *
import numpy as np
from jqlib.technical_analysis import *
#初始化函数
def initialize(context):
set_benchmark('399006.XSHE')
set_option('use_real_price', True)
set_option("avoid_future_data", True) # 避免引入未来信息
set_slippage(FixedSlippage(0.001))
#set_slippage(PriceRelatedSlippage(0.002))
set_order_cost(OrderCost(open_tax=0, close_tax=0.000, open_commission=0.0001, close_commission=0.0001, close_today_commission=0, min_commission=0),
type='fund')
log.set_level('order', 'error')
g.stock_pool = [
# ======== 大盘 ===================
'510300.XSHG', # 沪深300ETF
'510050.XSHG', # 上证50ETF
# '510180.XSHG', # 上证180 (用于替换上证50或沪深300,其与创业板有重合)
'159949.XSHE', # 创业板500
# '159915.XSHE', # 创业指数,替代创业500
# '510500.XSHG', # 500ETF
# '159915.XSHE', # 创业板 ETF
'159928.XSHE', # 中证消费ETF
# '512120.XSHG', # 医药50ETF
# '510880.XSHG', # 红利ETF
# '512100.XSHG', # 中证1000
# '159845.XSHE', # 中证1000
]
# 备选池:用流动性和市值更大的50ETF分别代替宽指ETF,500与300ETF保留一个
g.stock_num = 1 #买入评分最高的前stock_num只股票
g.momentum_day = 20 #最新动量参考最近momentum_day的
g.ref_stock = '000300.XSHG' #用ref_stock做择时计算的基础数据
g.N = 18 # 计算最新斜率slope,拟合度r2参考最近N天
g.M = 600 # 计算最新标准分zscore,rsrs_score参考最近M天(600)
g.K = 8 # 计算 zscore 斜率的窗口大小
g.biasN = 90 #乖离动量的时间天数
g.lossN = 20 #止损MA20---60分钟
g.lossFactor = 1.005 #下跌止损的比例,相对前一天的收盘价
g.SwitchFactor = 1.04 # 换仓位的比例,待换股相对当前持股的分数
g.Motion_1diff = 19 # 股票前一天动量变化速度门限
g.raiser_thr = 4.8 # 股票前一天上涨的比例门限
g.hold_stock = 'null'
g.score_thr = -0.68 # rsrs标准分指标阈值
g.score_fall_thr = -0.43 # 当股票下跌趋势时候, 卖出阀值rsrs
g.idex_slope_raise_thr = 12 # 判断大盘指数强势的斜率门限
g.slope_series,g.rsrs_score_history= initial_slope_series() # 除去回测第一天的slope,避免运行时重复加入
g.stock_motion = initial_stock_motion(g.stock_pool) # 除去回测第一天的动量
run_daily(my_trade_prepare, time='7:00', reference_security='000300.XSHG')
run_daily(my_trade, time='9:30', reference_security='000300.XSHG')
run_daily(my_sell2buy, time='9:35', reference_security='000300.XSHG')
run_daily(check_lose, time='open', reference_security='000300.XSHG')
# run_daily(print_trade_info, time='15:10', reference_security='000300.XSHG')
run_daily(pre_hold_check, time='11:25')
run_daily(hold_check, time='11:27')
# 初始化准备数据,除去回测第一天的slope,zscores
def initial_slope_series():
length = g.N+g.M+g.K
data = attribute_history(g.ref_stock, length, '1d', ['high', 'low', 'close'])
multe_data = [get_ols(data.low[i:i+g.N], data.high[i:i+g.N]) for i in range(length-g.N)]
slopes = [i[1] for i in multe_data]
r2s = [i[2] for i in multe_data]
zscores =[(get_zscore(slopes[i+1:i+1+g.M])*r2s[i+g.M]) for i in range(g.K)]
return (slopes,zscores)
## 获取初始化动量因子,除去回测第一天
def initial_stock_motion(stock_pool):
stock_motion = {}
for stock in stock_pool:
motion_que = []
data = attribute_history(stock, g.biasN + g.momentum_day + 1, '1d', ['close'])
data = data[:-1]
bias = (data.close/data.close.rolling(g.biasN).mean())[-g.momentum_day:] # 乖离因子
score = np.polyfit(np.arange(g.momentum_day),bias/bias[0],1)[0].real*10000 # 乖离动量拟合
motion_que.append(score)
stock_motion[stock] = motion_que
return(stock_motion)
## 持仓检查,盘中动态止损:早盘结束后,若60分钟周期跌破MA20均线
## 并且当前价格相对昨天没有上涨,则卖出
def pre_hold_check(context):
if context.portfolio.positions:
for stk in context.portfolio.positions:
dt = attribute_history(stk,g.lossN+2,'60m',['close'])
dt['man'] = dt.close/dt.close.rolling(g.lossN).mean()
if(dt.man[-1] < 1.0):
stk_dict = context.portfolio.positions[stk]
log.info("盘中可能止损,卖出:{}".format(stk))
send_message("盘中可能止损,卖出:{}".format(stk))
## 并且当前价格相对昨天没有上涨,则卖出
def hold_check(context):
current_data = get_current_data()
if context.portfolio.positions:
for stk in context.portfolio.positions:
yesterday_di = attribute_history(stk,1,'1d',['close'])
dt = attribute_history(stk,g.lossN+2,'60m',['close'])
dt['man'] = dt.close/dt.close.rolling(g.lossN).mean()
#log.info("man=%0f, last_price=%0f, yester=%0f"%(dt.man[-1], current_data[stk].last_price*1.006, yesterday_di['close'][-1]))
if((dt.man[-1] < 1.0) and (current_data[stk].last_price*g.lossFactor <= yesterday_di['close'][-1])):
#if (dt.man[-1] < 1.0):
stk_dict = context.portfolio.positions[stk]
log.info('准备平仓,总仓位:{}, 可卖出:{}, '.format(stk_dict.total_amount,stk_dict.closeable_amount))
send_message("盘中止损,卖出:{}".format(stk))
if(stk_dict.closeable_amount):
order_target_value(stk,0)
log.info('盘中止损',stk)
else:
log.info('无法止损',stk)
## 动量因子:由收益率动量改为相对MA90均线的乖离动量
def get_rank(context,stock_pool):
rank = []
for stock in stock_pool:
data = attribute_history(stock, g.biasN + g.momentum_day, '1d', ['close'])
bias = (data.close/data.close.rolling(g.biasN).mean())[-g.momentum_day:] # 乖离因子
score = np.polyfit(np.arange(g.momentum_day),bias/bias[0],1)[0].real*10000 # 乖离动量拟合
adr = 100*(data.close[-1] - data.close[-2])/data.close[-2] #股票的涨跌幅度
if(stock == g.hold_stock): raise_x = g.SwitchFactor
else: raise_x = 1
# data = attribute_history(stock, g.momentum_day, '1d', ['close'])
# score = np.polyfit(np.arange(g.momentum_day),data.close/data.close[0],1)[0].real # 乖离动量拟合
#log.info("计算data.close[-1]=%f, data.close[-2]=%f,adr=%f"%(data.close[-1], data.close[-2], adr))
rank.append([stock, score*raise_x, adr])
g.stock_motion[stock].append(score)
if(len(g.stock_motion[stock])>5):g.stock_motion[stock].pop(0)
#log.info('rsrs_score:')
str = ''
for item in rank:
str += "%s:%.2f:%.2f; "%(item[0], item[1], item[2])
log.info(str)
rank = [ i for i in rank if math.isnan(i[1])==False ]
rank.sort(key=lambda x: x[1],reverse=True)
return rank[0]
## 线性回归:复现statsmodels的get_OLS函数
def get_ols(x, y):
slope, intercept = np.polyfit(x, y, 1)
r2 = 1 - (sum((y - (slope * x + intercept))**2) / ((len(y) - 1) * np.var(y, ddof=1)))
return (intercept, slope, r2)
## 因子标准化
def get_zscore(slope_series):
mean = np.mean(slope_series)
std = np.std(slope_series)
return (slope_series[-1] - mean) / std
def get_zscore_slope(z_scores):
y = z_scores
x = np.arange(len(z_scores))
slope, intercept = np.polyfit(x, y, 1)
return slope
# 只看RSRS因子值作为买入、持有和清仓依据,前版本还加入了移动均线的上行作为条件
def get_timing_signal(context,stock):
data = attribute_history(g.ref_stock, g.N, '1d', ['high', 'low', 'close'])
intercept, slope, r2 = get_ols(data.low, data.high)
g.slope_series.append(slope)
rsrs_score = get_zscore(g.slope_series[-g.M:]) * r2
g.rsrs_score_history.append(rsrs_score)
rsrs_slope = get_zscore_slope(g.rsrs_score_history[-g.K:])
#大盘指数收盘价趋势
idex_slope = np.polyfit(np.arange(8), data.close[-8:],1)[0].real
g.slope_series.pop(0)
g.rsrs_score_history.pop(0)
#record(rsrs_score=rsrs_score,rsrs_slope=rsrs_slope)
log.info('rsrs_slope {:.3f}'.format(rsrs_slope)+' rsrs_score {:.3f} '.format(rsrs_score)
+' idex_slope {:.3f} '.format(idex_slope))
#通过摆动指数,提早知道趋势的变化,这种情况优先于RSRS
WR2,WR1 = WR([g.ref_stock], check_date =context.previous_date, N = 21, N1 = 14, unit='1d', include_now=True)
#if WR1[g.ref_stock]<=2 and WR2[g.ref_stock] <=2: return "SELL"
if WR1[g.ref_stock]>=97 and WR2[g.ref_stock] >=97: return "BUY"
#表示上升趋势快结束了,即将出现下跌
if(rsrs_slope< 0 and rsrs_score >0):
return "SELL"
#大盘下跌趋势过程中,不能买入
if(idex_slope<0) and (rsrs_slope>0) and (rsrs_score < g.score_fall_thr): return "SELL"
#大盘上升过程当中,大胆买入
if(idex_slope>g.idex_slope_raise_thr) and (rsrs_slope>0): return "BUY"
#大盘可能上涨,这个时候可以买入
if (rsrs_score> g.score_thr) : return "BUY"
#elif(idex_slope > 5) : return "BUY"
else: return "SELL"
#4-2 交易模块-开仓
#买入指定价值的证券,报单成功并成交(包括全部成交或部分成交,此时成交量大于0)返回True,报单失败或者报单成功但被取消(此时成交量等于0),返回False
def open_position(security, value):
order = order_target_value(security, value)
if order != None and order.filled > 0:
return True
return False
#4-3 交易模块-平仓
#卖出指定持仓,报单成功并全部成交返回True,报单失败或者报单成功但被取消(此时成交量等于0),或者报单非全部成交,返回False
def close_position(position):
security = position.security
order = order_target_value(security, 0) # 可能会因停牌失败
if order != None:
if order.status == OrderStatus.held and order.filled == order.amount:
return True
return False
def adjust_position(context, buy_stocks):
for stock in context.portfolio.positions:
if stock not in buy_stocks:
# log.info("[%s]已不在应买入列表中" % (stock))
position = context.portfolio.positions[stock]
close_position(position)
g.hold_stock = 'null'
return
else:
pass
# log.info("[%s]已经持有无需重复买入" % (stock))
position_count = len(context.portfolio.positions)
if g.stock_num > position_count:
value = context.portfolio.cash / (g.stock_num - position_count)
for stock in buy_stocks:
if context.portfolio.positions[stock].total_amount == 0:
if open_position(stock, value):
if len(context.portfolio.positions) == g.stock_num:
g.hold_stock = stock
break
def buy_stocks(context, buy_stocks):
position_count = len(context.portfolio.positions)
if g.stock_num > position_count:
value = context.portfolio.cash / (g.stock_num - position_count)
for stock in buy_stocks:
if context.portfolio.positions[stock].total_amount == 0:
if open_position(stock, value):
if len(context.portfolio.positions) == g.stock_num:
g.hold_stock = stock
break
# 计算待买入的ETF和择时信号,判断股票动量变化一阶导数, 如果变化太大,则空仓
def my_trade_prepare(context):
hour = context.current_dt.hour
minute = context.current_dt.minute
#if hour == 9 and minute == 30: # 9:30开盘时买入(标的根据昨天之前的数据算出来)
g.check_out_list = get_rank(context,g.stock_pool)
g.timing_signal = get_timing_signal(context,g.ref_stock)
log.info('今日自选及择时信号:{} {}'.format(g.check_out_list[0],g.timing_signal))
#判断股票动量变化一阶导数, 如果变化太大,则空仓
cur_stock = g.check_out_list[0]
cur_adr = g.check_out_list[2]#股票价格相对前一天涨跌比例
change_rate = g.stock_motion[cur_stock][-1]-g.stock_motion[cur_stock][-2]
#log.info("涨跌比例:%f, 动量变化速度:%f"%(cur_adr, change_rate))
if(change_rate>g.Motion_1diff) or (cur_adr>g.raiser_thr):
g.timing_signal = 'SELL'
log.info("由于涨跌:%f, 动量变化%0f,今日空仓"%(cur_adr, change_rate))
if g.timing_signal == 'SELL':
for stock in context.portfolio.positions:
#print("准备卖出ETF [%s]"%stock)
send_message("准备卖出ETF [%s]"%stock)
elif g.timing_signal == 'BUY' or g.timing_signal == 'KEEP':
if g.check_out_list[0] not in context.portfolio.positions:
if(len(context.portfolio.positions)>0):
stock_tmps = list(context.portfolio.positions.keys())
#print("准备卖ETF [%s], 买入ETF [%s]"%(stock_tmps[0], g.check_out_list[0]))
send_message("准备卖ETF [%s], 买入ETF [%s]"%(stock_tmps[0], g.check_out_list[0]))
else:
#print("准备买入ETF [%s]"%g.check_out_list[0])
send_message("准备买入ETF [%s]"%g.check_out_list[0])
else:
send_message("保持原来仓位")
pass
# 交易主函数,先确定ETF最强的是谁,然后再根据择时信号判断是否需要切换或者清仓
def my_trade(context):
hour = context.current_dt.hour
minute = context.current_dt.minute
#if hour == 9 and minute == 30: # 9:30开盘时买入(标的根据昨天之前的数据算出来)
if g.timing_signal == 'SELL':
for stock in context.portfolio.positions:
position = context.portfolio.positions[stock]
close_position(position)
elif g.timing_signal == 'BUY' or g.timing_signal == 'KEEP':
adjust_position(context, g.check_out_list)
else: pass
def my_sell2buy(context):
hour = context.current_dt.hour
minute = context.current_dt.minute
#if hour == 9 and minute == 30: # 9:30开盘时买入(标的根据昨天之前的数据算出来)
if hour == 9:
if g.timing_signal == 'BUY' or g.timing_signal == 'KEEP':
buy_stocks(context, g.check_out_list)
else: pass
# 这个函数几乎没用
def check_lose(context):
for position in list(context.portfolio.positions.values()):
security=position.security
cost=position.avg_cost
price=position.price
ret=100*(price/cost-1)
if ret <=-90:
order_target_value(position.security, 0)
print("!!!!!!触发止损信号: 标的={},标的价值={},浮动盈亏={}% !!!!!!"
.format(security,format(value,'.2f'),format(ret,'.2f')))
def print_trade_info(context):
#打印当天成交记录
trades = get_trades()
for _trade in trades.values(): print('成交记录:'+str(_trade))
#打印账户信息
print('———————————————————————————————————————分割线1————————————————————————————————————————')
注册时间: