量化学习平台
文章
市场宽度
背离图
登录
注册
因子分析 营业利润TTM
策略
作者: 水滴
```python # 风险及免责提示:该策略由聚宽用户分享,仅供学习交流使用。 # 原文一般包含策略说明,如有疑问建议到原文和作者交流讨论。 # 克隆自聚宽文章:https://www.joinquant.com/post/28465 # 标题:因子分析 | 营业利润TTM # 作者:止一之路 import pandas as pd import numpy as np import time import scipy.stats as st from datetime import datetime, timedelta from multiprocessing.dummy import Pool as ThreadPool import statsmodels.api as sm from pandas import DataFrame,Series import jqdata from jqfactor import get_all_factors from jqfactor import get_factor_values # 初始化函数,设定基准等等 def initialize(context): # 设定沪深300作为基准 set_benchmark('000300.XSHG') # 开启动态复权模式(真实价格) set_option('use_real_price', True) # 输出内容到日志 log.info() log.info('初始函数开始运行且全局只运行一次') # 过滤掉order系列API产生的比error级别低的log log.set_level('order', 'error') log.set_level('strategy', 'error') ### 股票相关设定 ### # 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱 set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock') set_param() ## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的) run_daily(daily_Var, time='open', reference_security='000905.XSHG') # 开盘时运行 run_daily(market_open, time='open', reference_security='000905.XSHG') run_daily(trade, '14:50') # 收盘后运行 def set_param(): g.facName = 'operating_profit_ttm' g.index = 'all' g.pool_num = 1000 g.days = 0 g.refresh_rate = 20 g.stocknum = 20 g.stocks_to_have = [] # 是否取倒数 g.reciprocal = False # 是否中性化 g.neutralize = True # 行业代码(如果为空,则不筛选) g.indu_code = '801140' def daily_Var(context): g.isRefresh = g.days % g.refresh_rate == 0 if g.isRefresh: index = g.index fdate = context.current_dt g.all_stocks = get_all_securities(types=['stock'], date=fdate).index.tolist() #g.stocknum = len(g.all_stocks) // 10 #g.stocknum = 20 # 从上一个交易日开始获得前12个月的交易日 def get_trade_dates(end, interval = 20, count = 250): date_list = list(jqdata.get_trade_days(end_date=end, count = count)) date_list.reverse() date_list = filter(lambda x: date_list.index(x) % interval == 0, date_list) date_list.reverse() return date_list # 得到因子数据 def get_factor_data(fdate, factor, index): if index == 'all': stock_list = get_all_securities(types=['stock'], date=fdate).index.tolist() else: stock_list = get_index_stocks(index, date=fdate) factor_data = get_factor_values(securities=stock_list, factors=[factor],start_date=fdate, end_date=fdate)[factor].T factor_data.columns = [factor] factor_data['tradeDate'] = fdate factor_data = factor_data[['tradeDate',factor]] factor_data = factor_data.reset_index() return factor_data def get_factor_by_day(tdate): ''' 根据日期,获取当天的因子值 tdate:str,'YYYYMMDD'格式 ''' cnt = 0 while True: try: #x = get_all_factors1(tdate, factors, TTM_factors, fac_dict, index) x = get_factor_data(tdate, g.facName, g.index) return x except Exception as e: cnt += 1 if cnt >= 3: print('error get factor data: ', tdate) break # 行情数据 def getPriceData(date): price=get_price(g.all_stocks, start_date=date, end_date=date, frequency='1d',fields=['close'])['close'] return price # 获得行业数据 def getStockIndustry(fdate): stock_list = get_all_securities(types=['stock'], date=fdate).index.tolist() industry_set = ['801010', '801020', '801030', '801040', '801050', '801080', '801110', '801120', '801130', '801140', '801150', '801160', '801170', '801180', '801200', '801210', '801230', '801710', '801720', '801730', '801740', '801750', '801760', '801770', '801780', '801790', '801880','801890'] df = pd.DataFrame(index = stock_list,columns = [fdate]) df.index.name = 'code' for i in range(len(industry_set)): print(industry_set[i]) print(fdate) industry = get_industry_stocks(industry_set[i], date = fdate) industry = list(set(industry) & set(df.index.tolist())) df[fdate].ix[industry] = industry_set[i] return df.T # 获得市值数据 def getStockMktValue(fdate): df = get_factor_data(fdate, "market_cap", g.index) df.columns = ['code','tradeDate','MC'] df = df.reset_index() df = df.pivot(index='tradeDate', columns='code', values='MC') return df def dateTransform(date_list_to_transform): date_str_list = list(map(lambda x:x.strftime('%Y-%m-%d'),date_list_to_transform)) date_list_result = list(map(lambda x:datetime.strptime(x, '%Y-%m-%d'),date_str_list)) return date_list_result def prepareData(trade_date_list): # 准备数据 print(trade_date_list) univ = pd.DataFrame() for date in (trade_date_list): if g.index == 'all': univ_data = get_Atickers(date) else: univ_data = get_idx_cons(g.index, date) current_universe = pd.Series(univ_data).to_frame(name='code') current_universe.index = [date] * len(current_universe) univ = univ.append(current_universe) print ('股票池生成结束') print ('--------------------' ) # 月度收益 """ print ('个股行情数据开始计算...') end = trade_date_list[-1] price = get_price(g.all_stocks, start_date=None, end_date=end,count=250, frequency='1d',fields=['close'])['close'] price = price.ix[trade_date_list] month_return = price.pct_change().shift(-1) # 向前一个月收益 print ('个股行情数据计算完成') print ('---------------------') """ # 因子数据 print ('开始计算因子数据...') frame_list = [] for date in trade_date_list: temp_df = get_factor_by_day(date) frame_list.append(temp_df) factor_csv = pd.concat(frame_list, axis=0) factor_csv.reset_index(inplace=True, drop=True) print ('因子数据计算完成') print ('--------------------') print ('开始计算行业数据...') frame_list = [] for date in trade_date_list: tmp = getStockIndustry(date) frame_list.append(tmp) indu = pd.concat(frame_list,axis=0) print ('行业数据计算完成') print ('--------------------') print ('开始计算市值数据...') frame_list = [] for date in trade_date_list: temp_df = getStockMktValue(date) frame_list.append(temp_df) mkt = pd.concat(frame_list, axis=0) print ('市值数据计算完成') print ('--------------------') univ.index = dateTransform(univ.index) #month_return.index = dateTransform(month_return.index) indu.index = dateTransform(indu.index) mkt.index = dateTransform(mkt.index) factor_csv.tradeDate = dateTransform(factor_csv.tradeDate) return univ,factor_csv,indu,mkt def get_idx_cons(idx, date): """ 获取某天指数成分股ticker列表 输入: idx:str,指数代码 date:str,'YYYY-MM-DD'格式 返回: list:指数成份股的ticker """ universe_idx = get_index_stocks(idx, date=date) universe_A = get_Atickers(date) return list(set(universe_idx) & set(universe_A)) def get_Atickers(date): """ 给定日期,获取这一天上市时间不低于60天的股票(参照中证全指指数编制) 输入: date: str, 'YYYYMMDD'格式 返回: list: 元素为股票ticker """ df = get_all_securities(types=['stock'], date=None) daysBefore = jqdata.get_trade_days(end_date=date, count=60)[0] df['60DaysBefore'] = daysBefore df = (df[df['start_date'] < df['60DaysBefore']]) return df.index.tolist() def get_universe_factor(factor, idx=None, univ=None): """ 筛选出某指数成份股或者指定域内的因子值 输入: factor:DataFrame,index为日期,columns为股票代码,value为因子值 idx:指数代码,000300:沪深300,000905:中证500,000985:中证全指 univ:DataFrame,index为日期,'YYYYMMDD'格式。columns为'code',value为股票代码 返回: factor:DataFrame,指定域下的因子值,index为日期,columns为股票代码,value为因子值 """ universe_factor = pd.DataFrame() if idx is not None: for date in factor.index: universe = get_idx_cons(idx, date) universe_factor = universe_factor.append(factor.loc[date, universe].to_frame(date).T) else: if univ is not None: for date in factor.index: universe = univ.loc[date, 'code'].tolist() universe_factor = universe_factor.append(factor.loc[date, universe].to_frame(date).T) else: raise Exception('请指定成分股或域') return universe_factor # 缺失值使用行业中位数填充 def replace_nan_indu(indu,factor): """缺失值填充函数,使用行业中位数进行填充 输入: factor:DataFrame,index为日期,columns为股票代码,value为因子值 返回: factor:格式保持不变,为填充后的因子 """ fill_factor = pd.DataFrame() for date in factor.index: # 因子值 factor_array = factor.ix[date, :].to_frame('values') # 行业值 indu_array = indu.ix[date, :].dropna().to_frame('industryName1') # 合并 factor_array = factor_array.merge(indu_array, left_index=True, right_index=True, how='inner') # 行业中值 mid = factor_array.groupby('industryName1').median() factor_array = factor_array.merge(mid, left_on='industryName1', right_index=True, how='left') # 行业中值填充缺失 factor_array['values_x'][pd.isnull(factor_array['values_x'])] = factor_array['values_y'][pd.isnull(factor_array['values_x'])] # 将当前日期的因子数据追加到结果 fill_factor = fill_factor.append(factor_array['values_x'].to_frame(date).T) return fill_factor def winsorize(se): q = se.quantile([0.025, 0.975]) if isinstance(q, pd.Series) and len(q) == 2: se[se < q.iloc[0]] = q.iloc[0] se[se > q.iloc[1]] = q.iloc[1] return se def standardize(se): mean = se.mean() std = se.std() se = (se - mean)/std return se def neutralize(factor_se, market_cap_se, concept_se): stock_list = factor_se.index.tolist() # 行业数据哑变量 groups = array(concept_se.ix[stock_list].tolist()) dummy = sm.categorical(groups, drop=True) # 市值对数化 market_cap_log = np.log(market_cap_se.ix[stock_list].tolist()) # 自变量 X = np.c_[dummy,market_cap_log] # 因变量 y = factor_se.ix[stock_list] # 拟合 model = sm.OLS(y,X) results = model.fit() # 拟合结果 y_fitted = results.fittedvalues neutralize_factor_se = factor_se - y_fitted return neutralize_factor_se def pretreat_factor(indu, mkt, factor_df, neu=True): """ 因子处理函数 输入: factor_df:DataFrame,index为日期,columns为股票代码,value为因子值 neu:Bool,是否进行行业+市值中性化,若为True,则进行去极值->中性化->标准化;若为否,则进行去极值->标准化 返回: factor_df:DataFrame,处理之后的因子 """ pretreat_data = factor_df.copy(deep=True) for dt in pretreat_data.index: market_cap_se = mkt.loc[dt].dropna() stock_list = market_cap_se.index.tolist() concept_se = indu.loc[dt].loc[stock_list] factor_dt = pretreat_data.loc[dt].loc[stock_list].dropna() if neu: pretreat_data.ix[dt] = standardize(neutralize(winsorize(factor_dt),market_cap_se,concept_se)) else: pretreat_data.ix[dt] = standardize(winsorize(factor_dt)) return pretreat_data def filter_by_industry(factor, indu): factor_data = factor.copy() for date in factor_data.index.tolist(): # 找到金融类(银行,非银金融)股票,便于后面进行剔除 finance = indu.loc[date, :] finance = finance[finance.isin(['801780', '801790'])].index factor_data.loc[date, finance] = np.nan return factor_data def get_rank_ic(factor, forward_return): """ 计算因子的信息系数 输入: factor:DataFrame,index为日期,columns为股票代码,value为因子值 forward_return:DataFrame,index为日期,columns为股票代码,value为下一期的股票收益率 返回: DataFrame:index为日期,columns为IC,IC t检验的pvalue 注意:factor与forward_return的index及columns应保持一致 """ common_index = factor.index.intersection(forward_return.index) ic_data = pd.DataFrame(index=common_index, columns=['IC','pValue']) # 计算相关系数 for dt in ic_data.index: tmp_factor = factor.ix[dt] tmp_ret = forward_return.ix[dt] cor = pd.DataFrame(tmp_factor) ret = pd.DataFrame(tmp_ret) cor.columns = ['corr'] ret.columns = ['ret'] cor['ret'] = ret['ret'] cor = cor[~pd.isnull(cor['corr'])][~pd.isnull(cor['ret'])] if len(cor) < 5: continue ic, p_value = st.spearmanr(cor['corr'], cor['ret']) # 计算秩相关系数RankIC ic_data['IC'][dt] = ic ic_data['pValue'][dt] = p_value return ic_data class FactorWeight(): def __init__(self): pass @staticmethod def weighted(factor_dict, factor_weight): """ 用于因子合成的函数。因子之间需要对齐,因子和其对应的权重也应进行对齐 输入: factor_dict:列表,用于存储因子,key为因子名,值为DataFrame(index为日期,columns为股票代码) factor_weight:因子权重,用于对因子进行配权,为DataFrame,index为日期,列对应着因子名称,值为当期因子的权重 返回: DataFrame:最终合成后的因子 """ weighted_factor = 0 for factor_name, factor in factor_dict.items(): weighted_factor += factor.multiply(factor_weight[factor_name], axis=0) return weighted_factor @staticmethod def equal_weight(factor_dict): factor_weight = pd.Series([1. / len(factor_dict)] * len(factor_dict), index=factor_dict.keys()).to_dict() weighted_factor = FactorWeight.weighted(factor_dict, factor_weight) return weighted_factor @staticmethod def ic_weight(factor_dict, forward_month_return, window): # 获得IC序列 all_rolling_ic_list = [] for factor_name, factor in factor_dict.items(): ic = get_rank_ic(factor, forward_month_return)['IC'] # 计算得到当前因子的IC ic = pd.rolling_mean(ic, window=window) ic = ic.shift(1) ic.name = factor_name all_rolling_ic_list.append(ic) # 合并成一个DataFrame all_rolling_ic_df = pd.concat(all_rolling_ic_list, axis=1) all_rolling_ic_df = all_rolling_ic_df.divide(all_rolling_ic_df.sum(axis=1), axis=0) # 因子汇总 weighted_factor = FactorWeight.weighted(factor_dict, all_rolling_ic_df) return weighted_factor @staticmethod def ic_ir_weight(factor_dict, forward_month_return, window): # 获得IC_IR序列 all_rolling_ic_ir_list = [] for factor_name, factor in factor_dict.items(): ic = get_rank_ic(factor, forward_month_return)['IC'] # 计算得到当前因子的IC_IR ic_ir = pd.rolling_mean(ic, window=window) / pd.rolling_std(ic, window=window) ic_ir = ic_ir.shift(1) ic_ir.name = factor_name all_rolling_ic_ir_list.append(ic_ir) # 合并成一个DataFrame,并计算权重 all_rolling_ic_ir_df = pd.concat(all_rolling_ic_ir_list, axis=1) all_rolling_ic_ir_df = all_rolling_ic_ir_df.divide(all_rolling_ic_ir_df.sum(axis=1), axis=0) # 因子汇总 weighted_factor = FactorWeight.weighted(factor_dict, all_rolling_ic_ir_df) return weighted_factor, all_rolling_ic_ir_df def selectStock(context): # 交易日 pre_day = context.previous_date # 上一个交易日 cur_day = context.current_dt # 当前交易日 # 上一个交易日的数据 trade_date_list = [pre_day] univ,factor_csv,indu,mkt = prepareData(trade_date_list) # 因子数据 facName = g.facName factor_csv = factor_csv.drop_duplicates(subset=['code','tradeDate']) factor_data = factor_csv.pivot(index='tradeDate', columns='code', values=facName) # 行业中位数填充 factor_data = replace_nan_indu(indu, factor_data) # 去极值、中性化、标准化 factor_data = pretreat_factor(indu,mkt,factor_data,neu = g.neutralize) # 行业过滤 factor_data = filter_by_industry(factor_data, indu) # 根据当日市场股票 factor_data = get_universe_factor(factor_data,univ=univ) print("indu:{}".format(indu.head())) print("factor_data:{}".format(factor_data.head())) # 取倒数 if g.reciprocal: factor_data = (1.0 / factor_data).replace([-np.inf, np.inf], np.NaN) # 因子降序排序 stock_se = factor_data.iloc[0] stock_se = stock_se.dropna() stock_se = stock_se.sort_values(ascending=False) # 股票列表 stock_list = stock_se.index.tolist() if g.indu_code != "": indu_stock = get_industry_stocks(g.indu_code, date = cur_day) stock_list = list(filter(lambda x:x in indu_stock,stock_list)) g.stocknum = len(stock_list) //10 # 截取股票池 stock_list = stock_list[:g.pool_num] return stock_list def filter_paused_and_st_stock(stock_list): current_data = get_current_data() return [stock for stock in stock_list if not current_data[stock].paused and not current_data[stock].is_st and 'ST' not in current_data[stock].name and '*' not in current_data[stock].name and '退' not in current_data[stock].name] def unStartWith300(stockspool): return [stock for stock in stockspool if stock[0:3] != '300'] pass def filter_paused_stock(stock_list): current_data = get_current_data() return [stock for stock in stock_list if not current_data[stock].paused] def filter_limitdown_stock(context, stock_list): last_prices = history(1, unit='1m', field='close', security_list=stock_list) current_data = get_current_data() return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or last_prices[stock][-1] > current_data[stock].low_limit] def filter_limitup_stock(context, stock_list): last_prices = history(1, unit='1m', field='close', security_list=stock_list) current_data = get_current_data() # 已存在于持仓的股票即使涨停也不过滤,避免此股票再次可买,但因被过滤而导致选择别的股票 return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or last_prices[stock][-1] < current_data[stock].high_limit] ## 开盘时运行函数 def market_open(context): if not g.isRefresh: return stock_list = selectStock(context) stock_list = filter_paused_and_st_stock(stock_list) g.stocks_to_have = stock_list #清仓 def sell_all_stocks(context): #log.info("卖出全部:") for stock in context.portfolio.positions.keys(): order = order_target_value(stock, 0) if order != None : log.info("卖出 %s" % stock) def stock_trade(context,stocks_to_have,buyStockCount): cur_data = get_current_data() value = context.portfolio.total_value / buyStockCount # 卖出:不再预计持仓中的股票 for stock in context.portfolio.positions.keys(): if stock not in stocks_to_have[:buyStockCount]: Order = order_target_value(stock, 0) if Order != None: log.info("卖出 %s" % stock) else: log.info("卖出 %s 不成功。" % stock) # 调仓:调整在持仓并且预计持仓的股票数量 for stock in context.portfolio.positions.keys(): if stock in stocks_to_have[:buyStockCount]: Order = order_target_value(stock, value) if Order != None: log.info("卖出 %s" % stock) else: log.info("卖出 %s 不成功。" % stock) # 如果持股数量不足,则补充预计持仓的股票 position_count = len(context.portfolio.positions.keys()) if buyStockCount > position_count: for stock in stocks_to_have: # 如果买够了数量,就退出 if len(context.portfolio.positions) == buyStockCount: break if stock not in context.portfolio.positions.keys(): last_price = cur_data[stock].last_price limitPrice = last_price * (1 + 0.005) Order = order_target_value(stock, value, LimitOrderStyle(limitPrice)) if Order != None: log.info("买入 %s" % stock) # 持仓已经达到买入的数量,退出 else: log.info("买入 %s 不成功。" % stock) def trade(context): # 择时 isTrade = True if isTrade: if g.isRefresh: # 过滤 g.stocks_to_have = filter_paused_stock(g.stocks_to_have) g.stocks_to_have = filter_limitup_stock(context, g.stocks_to_have) g.stocks_to_have = filter_limitdown_stock(context, g.stocks_to_have) g.stocks_to_have = g.stocks_to_have[:g.stocknum] stock_trade(context,g.stocks_to_have,g.stocknum) g.days += 1 else: sell_all_stocks(context) g.days = 0 ```
文章分类
关于作者
水滴
注册时间: