量化学习平台
文章
市场宽度
背离图
登录
注册
【涨停研究三】连板股票收益统计与回测
策略
作者: 水滴
```python # 风险及免责提示:该策略由聚宽用户在聚宽社区分享,仅供学习交流使用。 # 原文一般包含策略说明,如有疑问请到原文和作者交流讨论。 # 原文网址:https://www.joinquant.com/view/community/detail/37070 # 标题:【涨停研究三】连板股票收益统计与回测 # 作者:wywy1995 # 导入函数库 from jqdata import * import pandas as pd # 初始化函数,设定基准等等 def initialize(context): set_benchmark('000300.XSHG') set_option('use_real_price', True) set_option("avoid_future_data", True) log.set_level('order', 'error') set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock') set_option("match_by_signal", True) #为了测试打开强制撮合,实际收益会大幅缩水 init_cash = context.portfolio.starting_cash / 2 set_subportfolios([ SubPortfolioConfig(cash=init_cash, type='stock'), SubPortfolioConfig(cash=init_cash, type='stock'), ]) g.buy_stock = 0 #初始化每日操作股票 g.count_days = 0 #记录天数,以指导子仓位轮动 g.fn = 250 #过滤上市不足250个交易日的股票 g.watch_days = 10 #观察连板已经持续几天,等于连板数,需要不小于2 g.code_list = ['None'] #记录每天信号股票,为去重做准备 g.count_list = [] #记录每日最大连板数,以及计算当前连板数与情绪周期内最大连板数的差值即emotion g.emo_cycle = 3 #情绪周期 g.p = 0.8 #控制子账户中的仓位 g.initial_cash = context.portfolio.available_cash / 2 #每期可用总资金 # 开盘前运行 run_daily(my_trade, time='9:30', reference_security='000300.XSHG') run_daily(get_stock_list, time='15:30', reference_security='000300.XSHG') #1.1选股 def get_stock_list(context): stat_date = context.current_dt.strftime('%Y-%m-%d') df = get_all_securities(types=['stock'], date=stat_date) stock_list = list(df.index) stock_list = filter_new_stock(context, stock_list, stat_date, g.fn) stock_list0 = filter_st_stock(context, stock_list, stat_date) stock_list1 = filter_hl_stock(context, stock_list0, stat_date) if len(stock_list1) != 0: stock_list = stock_list1 else: stock_list = stock_list0 #计算当前最高连板数 continue_count_df = [] days_count = g.watch_days + 1 while len(continue_count_df) == 0: days_count -= 1 if days_count <= 1: break else: MKT_df = limit_count(context, stock_list, stat_date, days_count) continue_count_df = MKT_df[MKT_df['count'] == days_count] #判断是否有连板数不小于2的股票,取市值最小的一只并计算情绪周期 if days_count > 1: smallest_stock = get_smallest(context, list(continue_count_df['code']), stat_date) count_df = continue_count_df.copy() count_df.index = count_df['code'] limit_counts = count_df.loc[smallest_stock]['count'] g.count_list.append(limit_counts) if len(g.count_list)>(g.emo_cycle-1): emotion = g.count_list[-1] - max(g.count_list[-g.emo_cycle:]) else: emotion = -100 else: limit_counts = 0 emotion = -100 g.count_list.append(limit_counts) #判断是否在情绪周期内,并对情绪周期内的股票信号去重 if emotion == 0: g.code_list.append(smallest_stock) if g.code_list[-1] != g.code_list[-2]: g.buy_stock = smallest_stock else: g.buy_stock = 0 else: g.buy_stock = 0 #判断市值是否处于合理区间 if g.buy_stock != 0: q = query(valuation.code,valuation.circulating_market_cap).filter(valuation.code == g.buy_stock) df = get_fundamentals(q, date=stat_date) df = df[(df['circulating_market_cap']>20) & (df['circulating_market_cap']<80)] if len(df) == 0: g.buy_stock = 0 #判断连板数是否处于合理区间 if g.buy_stock != 0: if (limit_counts < 4) or (limit_counts > 8): g.buy_stock = 0 print(g.buy_stock) return g.buy_stock #1.2交易 def my_trade(context): g.count_days += 1 hold_list0 = [] position_dict0 = context.subportfolios[0].long_positions for position in list(position_dict0.values()): hold_list0.append(position.security) hold_list1 = [] position_dict1 = context.subportfolios[1].long_positions for position in list(position_dict1.values()): hold_list1.append(position.security) if g.count_days % 120 == 1: if len(hold_list0) != 0: for stock in hold_list0: order_target_value(stock, 0, pindex=0) elif len(hold_list1) != 0: for stock in hold_list1: order_target_value(stock, 0, pindex=1) else: print('账户再平衡前没有持仓') g.initial_cash = context.portfolio.available_cash / 2 cash0 = context.subportfolios[0].available_cash cash1 = context.subportfolios[1].available_cash to_transfer_cash = (max(cash0,cash1) - min(cash0,cash1))/2 if cash0 > cash1: transfer_cash(from_pindex=0, to_pindex=1, cash=to_transfer_cash) else: transfer_cash(from_pindex=1, to_pindex=0, cash=to_transfer_cash) target_cash = g.p * g.initial_cash if g.count_days % 3 == 1: if len(hold_list1) != 0: order_target_value(hold_list1[0], 0, pindex=1) else: print('账户2无持仓') if g.buy_stock != 0: cash0 = context.subportfolios[0].available_cash if cash0 >= target_cash: order_target_value(g.buy_stock, target_cash, pindex=0) else: order_target_value(g.buy_stock, cash0, pindex=0) else: print('未选出股票') elif g.count_days % 3 == 2: if g.buy_stock != 0: cash1 = context.subportfolios[1].available_cash if cash1 >= target_cash: order_target_value(g.buy_stock, target_cash, pindex=1) else: order_target_value(g.buy_stock, cash1, pindex=1) else: print('未选出股票') elif g.count_days %3 == 0: if len(hold_list0) != 0: order_target_value(hold_list0[0], 0, pindex=0) else: print('账户1无持仓') if g.buy_stock != 0: cash0 = context.subportfolios[0].available_cash if cash0 >= target_cash: order_target_value(g.buy_stock, target_cash, pindex=0) else: order_target_value(g.buy_stock, cash0, pindex=0) else: print('未选出股票') #工具函数 #3.1过滤st def filter_st_stock(context, stock_list, stat_date): df = get_extras('is_st', stock_list, start_date=stat_date, end_date=stat_date, df=True) df =df.T df.columns = ['is_st'] df = df[df['is_st'] == False] filter_list = list(df.index) return filter_list #3.2过滤新股 def filter_new_stock(context, stock_list , ed, fn): #按交易日天数过滤 all_trade_days=[i.strftime('%Y-%m-%d') for i in list(get_all_trade_days())] tradingday = get_trade_days(start_date='2005-01-01', end_date=ed, count=None) trade_day_list = [] for item in list(tradingday): trade_day_list.append(str(item)) df = get_all_securities(types=['stock'], date=ed)[['start_date']].loc[stock_list] date = datetime.datetime.strptime(all_trade_days[all_trade_days.index(ed) - fn],"%Y-%m-%d").date() return list(df[df['start_date']< date].index) #3.3缩小选股范围先只保留昨日涨停 def filter_hl_stock(context, stock_list, stat_date): df = get_price(stock_list, end_date=stat_date, frequency='daily', fields=['close','high_limit'], count=1, panel=False, fill_paused=False) df = df[df['close'] == df['high_limit']] hl_list = list(df.code) return hl_list #3.4获取涨跌停次数 def limit_count(context, stock_list, stat_date, watch_days): df = get_price(stock_list, end_date=stat_date, frequency='daily', fields=['close','high_limit'], count=watch_days, panel=False, fill_paused=False) df['limit'] = df['high_limit'] df.index = df.code count_list = [] for stock in stock_list: df_sub = df.loc[stock] limit_days = df_sub[df_sub.close==df_sub.limit].close.count() count_list.append(limit_days) df = pd.DataFrame(columns=['code','count']) df['code'] = stock_list df['count'] = count_list df = df.dropna() return df #3.5选择较小市值 def get_smallest(context, stock_list, stat_date): q = query(valuation.code,valuation.circulating_market_cap).filter(valuation.code.in_(stock_list)).order_by(valuation.circulating_market_cap.asc()) df = get_fundamentals(q, date=stat_date) df.index = df.code stock = list(df.code)[0] return stock ```
文章分类
关于作者
水滴
注册时间: