量化学习平台
文章
市场宽度
背离图
登录
注册
10年52倍,年化59%,全新因子方法超稳定
策略
作者: 水滴
```python # 风险及免责提示:该策略由聚宽用户在聚宽社区分享,仅供学习交流使用。 # 原文一般包含策略说明,如有疑问请到原文和作者交流讨论。 # 原文网址:https://www.joinquant.com/post/44699 # 标题:10年52倍,年化59%,全新因子方法超稳定 # 作者:小白F # 原回测条件:2014-10-31 到 2023-10-31, ¥100000, 每天 #导入函数库 from jqdata import * from jqfactor import * import numpy as np import pandas as pd industry_code = ['HY001', 'HY002', 'HY003', 'HY004', 'HY005', 'HY006', 'HY007', 'HY008', 'HY009', 'HY010', 'HY011'] #初始化函数 def initialize(context): # 设定基准 set_benchmark('000905.XSHG') # 用真实价格交易 set_option('use_real_price', True) # 打开防未来函数 set_option("avoid_future_data", True) # 将滑点设置为0 set_slippage(FixedSlippage(0)) # 设置交易成本万分之三,不同滑点影响可在归因分析中查看 set_order_cost(OrderCost(open_tax=0, close_tax=0.001, open_commission=0.0003, close_commission=0.0003, close_today_commission=0, min_commission=5),type='stock') # 过滤order中低于error级别的日志 log.set_level('order', 'error') log.set_level('system', 'error') #初始化全局变量 g.no_trading_today_signal = False g.stock_num = 3 g.hold_list = [] #当前持仓的全部股票 g.yesterday_HL_list = [] #记录持仓中昨日涨停的股票 g.factor_list = [ {'ARBR': (-0.9996444781983547, 0.9986148448690932)} ] g.chosen_factor = ['ARBR'] g.month_day = 1 # 设置交易运行时间 run_daily(prepare_stock_list, '9:05') run_weekly(weekly_adjustment,g.month_day, '9:30') run_daily(check_limit_up, '14:00') #检查持仓中的涨停股是否需要卖出 run_daily(close_account, '14:30') # run_daily(print_position_info, '15:10') #1-1 准备股票池 def prepare_stock_list(context): #获取已持有列表 g.hold_list= [] for position in list(context.portfolio.positions.values()): stock = position.security g.hold_list.append(stock) #获取昨日涨停列表 if g.hold_list != []: df = get_price(g.hold_list, end_date=context.previous_date, frequency='daily', fields=['close','high_limit'], count=1, panel=False, fill_paused=False) df = df[df['close'] == df['high_limit']] g.yesterday_HL_list = list(df.code) else: g.yesterday_HL_list = [] #判断今天是否为账户资金再平衡的日期 g.no_trading_today_signal = today_is_between(context, '04-05', '04-30') #1-2 选股模块 def get_stock_list(context): #指定日期防止未来数据 yesterday = context.previous_date today = context.current_dt #获取初始列表 initial_list = get_all_securities('stock', today).index.tolist() initial_list = filter_all_stock2(context, initial_list) final_list = [] #MS factor_list = list(g.factor_list[0].keys()) # print(factor_list) factor_data = get_factor_values(initial_list,factor_list, end_date=yesterday, count=1) df_jq_factor_value = pd.DataFrame(index=initial_list, columns=factor_list) for factor in factor_list: df_jq_factor_value[factor] = list(factor_data[factor].T.iloc[:,0]) #print('before:df_jq_factor_value is \n%s'% df_jq_factor_value) df_jq_factor_value = data_preprocessing(df_jq_factor_value,initial_list,industry_code,yesterday) #标准化 #print('after:df_jq_factor_value is \n%s'% df_jq_factor_value) #print('df_jq_factor_value.info is \n%s'% df_jq_factor_value.info) # tar = g.model.predict_proba(df_jq_factor_value) df = df_jq_factor_value df = df.dropna() for factor in g.chosen_factor : # print(df[factor_list[i]],factor_value[i][0]) df = df[(df[factor]>=g.factor_list[0][factor][0]) & (df[factor]<=g.factor_list[0][factor][1])] print(f'过滤完 {factor} ,剩余:{len(df)}') # df['total_score'] = list(tar[:,1]) # df = df.sort_values(by=['total_score'], ascending=False) #分数越高即预测未来收益越高,排序默认降序 # postive_list = list(df.index)[:int(0.1*len(list(df.index)))] postive_list = list(df.index) log.info(f'因子筛选后的数量:{len(postive_list)}/{len(df)}') # negative_list = list(df.index)[int(0.3*len(list(df.index))):-1] q = query(valuation.code,valuation.circulating_market_cap,indicator.eps).filter(valuation.code.in_(postive_list)).order_by(valuation.circulating_market_cap.asc()) # q = query(valuation.code,valuation.circulating_market_cap,indicator.eps).filter(valuation.code.in_(postive_list)).order_by(indicator.eps.desc()) df2 = get_fundamentals(q) df2 = df2[df2['eps']>0] lst = list(df2.code) lst = lst[:min(g.stock_num, len(lst))] # df['chosen'] = str(lst) # log.info(df[['total_score', 'chosen']].head(6)) for stock in lst: if stock not in final_list: final_list.append(stock) return final_list #1-3 整体调整持仓 def weekly_adjustment(context): if g.no_trading_today_signal == False: #获取应买入列表 target_list = get_stock_list(context) #调仓卖出 for stock in g.hold_list: if (stock not in target_list) and (stock not in g.yesterday_HL_list): log.info("卖出[%s]" % (stock)) position = context.portfolio.positions[stock] close_position(position) else: log.info("已持有[%s]" % (stock)) #调仓买入 position_count = len(context.portfolio.positions) target_num = len(target_list) if target_num > position_count: value = context.portfolio.cash / (target_num - position_count) for stock in target_list: if context.portfolio.positions[stock].total_amount == 0: if open_position(stock, value): if len(context.portfolio.positions) == target_num: break #1-4 调整昨日涨停股票 def check_limit_up(context): now_time = context.current_dt if g.yesterday_HL_list != []: #对昨日涨停股票观察到尾盘如不涨停则提前卖出,如果涨停即使不在应买入列表仍暂时持有 for stock in g.yesterday_HL_list: current_data = get_price(stock, end_date=now_time, frequency='1m', fields=['close','high_limit'], skip_paused=False, fq='pre', count=1, panel=False, fill_paused=True) if current_data.iloc[0,0] < current_data.iloc[0,1]: log.info("[%s]涨停打开,卖出" % (stock)) position = context.portfolio.positions[stock] close_position(position) else: log.info("[%s]涨停,继续持有" % (stock)) # 过滤股票,过滤停牌退市ST股票,选股时使用 def filter_all_stock2(context, stock_list): # 过滤次新股(新股、老股的分界日期,两种指定方法) # 新老股的分界日期, 自然日180天 # by_date = context.previous_date - datetime.timedelta(days=180) # 新老股的分界日期,120个交易日 by_date = get_trade_days(end_date=context.previous_date, count=252)[0] all_stocks = get_all_securities(date=by_date).index.tolist() stock_list = list(set(stock_list).intersection(set(all_stocks))) curr_data = get_current_data() return [stock for stock in stock_list if not ( stock.startswith(('3', '68', '4', '8')) or # 创业,科创,北交所 curr_data[stock].paused or # 停牌 curr_data[stock].is_st or # ST ('ST' in curr_data[stock].name) or # ST ('*' in curr_data[stock].name) or # 退市 ('退' in curr_data[stock].name) or # 退市 (curr_data[stock].day_open == curr_data[stock].high_limit) or # 涨停开盘, 其它时间用last_price (curr_data[stock].day_open == curr_data[stock].low_limit) # 跌停开盘, 其它时间用last_price )] #3-1 交易模块-自定义下单 def order_target_value_(security, value): if value == 0: log.debug("Selling out %s" % (security)) else: log.debug("Order %s to value %f" % (security, value)) return order_target_value(security, value) #3-2 交易模块-开仓 def open_position(security, value): order = order_target_value_(security, value) if order != None and order.filled > 0: return True return False #3-3 交易模块-平仓 def close_position(position): security = position.security order = order_target_value_(security, 0) # 可能会因停牌失败 if order != None: if order.status == OrderStatus.held and order.filled == order.amount: return True return False #4-1 判断今天是否为账户资金再平衡的日期 def today_is_between(context, start_date, end_date): today = context.current_dt.strftime('%m-%d') if (start_date <= today) and (today <= end_date): return True else: return False #4-2 清仓后次日资金可转 def close_account(context): if g.no_trading_today_signal == True: if len(g.hold_list) != 0: for stock in g.hold_list: position = context.portfolio.positions[stock] close_position(position) log.info("卖出[%s]" % (stock)) #4-3 打印每日持仓信息 def print_position_info(context): #打印当天成交记录 trades = get_trades() for _trade in trades.values(): print('成交记录:'+str(_trade)) #打印账户信息 for position in list(context.portfolio.positions.values()): securities=position.security cost=position.avg_cost price=position.price ret=100*(price/cost-1) value=position.value amount=position.total_amount print('代码:{}'.format(securities)) print('成本价:{}'.format(format(cost,'.2f'))) print('现价:{}'.format(price)) print('收益率:{}%'.format(format(ret,'.2f'))) print('持仓(股):{}'.format(amount)) print('市值:{}'.format(format(value,'.2f'))) print('———————————————————————————————————') print('———————————————————————————————————————分割线————————————————————————————————————————') #取股票对应行业 def get_industry_name(i_Constituent_Stocks, value): return [k for k, v in i_Constituent_Stocks.items() if value in v] #缺失值处理 def replace_nan_indu(factor_data,stockList,industry_code,date): #把nan用行业平均值代替,依然会有nan,此时用所有股票平均值代替 i_Constituent_Stocks={} data_temp=pd.DataFrame(index=industry_code,columns=factor_data.columns) for i in industry_code: temp = get_industry_stocks(i, date) i_Constituent_Stocks[i] = list(set(temp).intersection(set(stockList))) data_temp.loc[i]=mean(factor_data.loc[i_Constituent_Stocks[i],:]) for factor in data_temp.columns: #行业缺失值用所有行业平均值代替 null_industry=list(data_temp.loc[pd.isnull(data_temp[factor]),factor].keys()) for i in null_industry: data_temp.loc[i,factor]=mean(data_temp[factor]) null_stock=list(factor_data.loc[pd.isnull(factor_data[factor]),factor].keys()) for i in null_stock: industry=get_industry_name(i_Constituent_Stocks, i) if industry: factor_data.loc[i,factor]=data_temp.loc[industry[0],factor] else: factor_data.loc[i,factor]=mean(factor_data[factor]) return factor_data #数据预处理 def data_preprocessing(factor_data,stockList,industry_code,date): #去极值 factor_data = winsorize_med(factor_data, scale=5, inf2nan=False,axis=0) #缺失值处理 factor_data = replace_nan_indu(factor_data,stockList,industry_code,date) #标准化处理 factor_data = standardlize(factor_data,axis=0) return factor_data ```
文章分类
关于作者
水滴
注册时间: