# 风险及免责提示:该策略由聚宽用户在聚宽社区分享,仅供学习交流使用。
# 原文一般包含策略说明,如有疑问请到原文和作者交流讨论。
# 原文网址:https://www.joinquant.com/post/47407
# 标题:微盘股扩散指数双均线择时
# 作者:无逻辑的光
# 原回测条件:2018-01-01 到 2024-04-02, ¥100000, 每天
# 原文网址:https://www.joinquant.com/post/40981
# 标题:差不多得了
# 作者:wywy1995
# 原文网址:https://www.joinquant.com/post/40407
# 标题:wywy1995大侠的小市值AI因子选股 5组参数50股测试
# 作者:Bruce_Lee
#https://www.joinquant.com/view/community/detail/30684f8d65a74eef0d704239f0eec8be?type=1&page=2
#导入函数库
from jqdata import *
from jqfactor import *
import numpy as np
import pandas as pd
import talib
import datetime
import pickle
from six import BytesIO
#初始化函数 000852.XSHG (ZZ1000)
def initialize(context):
# 设定基准
set_benchmark('399303.XSHE')
# 用真实价格交易
set_option('use_real_price', True)
# 打开防未来函数
set_option("avoid_future_data", True)
# 将滑点设置为0
set_slippage(FixedSlippage(0.002))
# 设置交易成本万分之三,不同滑点影响可在归因分析中查看
set_order_cost(OrderCost(open_tax=0, close_tax=0.001, open_commission=0.0003, close_commission=0.0003, close_today_commission=0, min_commission=5),type='stock')
# 过滤order中低于error级别的日志
#log.set_level('order', 'error')
#初始化全局变量
g.no_trading_today_signal = True
g.stock_num = 5
g.hold_list = [] #当前持仓的全部股票
g.yesterday_HL_list = [] #记录持仓中昨日涨停的股票
g.watch_day = 20
g.base = pd.read_csv(BytesIO(read_file('wp_ks_index.csv')),encoding='utf_8_sig')
g.factor_list = [
(#P1Y-TPtCR-VOL120
[
'Price1Y', #动量类因子 当前股价除以过去一年股价均值再减1
'total_profit_to_cost_ratio', #质量类因子 成本费用利润率
'VOL120' #情绪类因子 120日平均换手率
],
[
-1.6481969388084845,
-0.17062057099935446,
-0.061842557079243125
]
)
]
# 设置交易运行时间
run_daily(prepare_stock_list, '9:05')
run_daily(daily_adjustment,'9:30')
run_daily(check_limit_up, '14:00') #检查持仓中的涨停股是否需要卖出
#run_daily(close_account, '14:30')
run_daily(print_position_info, '15:10')
#1-1 准备股票池
def prepare_stock_list(context):
#获取已持有列表
g.hold_list= []
for position in list(context.portfolio.positions.values()):
stock = position.security
g.hold_list.append(stock)
#获取昨日涨停列表
if g.hold_list != []:
df = get_price(g.hold_list, end_date=context.previous_date, frequency='daily', fields=['close','high_limit'], count=1, panel=False, fill_paused=False)
df = df[df['close'] == df['high_limit']]
g.yesterday_HL_list = list(df.code)
else:
g.yesterday_HL_list = []
#判断今天是否为账户资金再平衡的日期
#g.no_trading_today_signal = today_is_between(context, '04-01', '04-30') or today_is_between(context, '01-01', '02-28')
#获取扩散系数
def get_ks_index(context,watch_day):
yesterday = context.previous_date
ks_index = []
dates = get_trade_days(end_date = yesterday, count=30)
for date in dates:
stocks = get_index_stocks('000852.XSHG',date=date)
df0 = get_price(stocks, end_date=date, fields=['close'], count=watch_day, fill_paused=False, skip_paused=False, panel=False).dropna()
end_price = df0.groupby('code').apply(lambda df0: df0.iloc[-1,-1])
open_price=df0.groupby('code').apply(lambda df0: df0.iloc[0,-1])
result = pd.DataFrame()
result['ks']=end_price-open_price
ks = len(result[result['ks']>0])/len(result['ks']>0)
ks_index.append(ks)
df = pd.DataFrame()
df['date'] = dates
df['ks_index'] = ks_index
return df
'''
#获取择时信号
def get_timing_signal(context):
yesterday = context.previous_date
df = get_ks_index(context,g.watch_day)
df['EMA6']=talib.EMA(df['ks_index'],6)
df['EMA28']=talib.EMA(df['ks_index'],28)
df['signal'] = np.where(df['EMA6']-df['EMA28']<0,1,0)
today_sig=np.array(df['signal'])[-1]
return today_sig
'''
#获取择时信号
def get_timing_signal(context):
yesterday = context.previous_date
yesterday = yesterday.strftime("%Y-%m-%d")
base = g.base
base = base.loc[base['date']==yesterday]
today_sig = base['signal'].tolist()[0]
return today_sig
#获取微盘股列表
def _get_stocks(date):
initial_list = get_all_securities().index.tolist()
initial_list = filter_new_stock(context, initial_list)
initial_list = filter_kcbj_stock(initial_list)
initial_list = filter_st_stock(initial_list)
q = query(valuation.code,valuation.circulating_market_cap).filter(valuation.code.in_(initial_list)).order_by(valuation.circulating_market_cap.asc())
df = get_fundamentals(q,date=date)
stock_list = df.code.tolist()[0:1000]
return stock_list
#1-2 选股模块
def get_stock_list(context):
yesterday = context.previous_date
final_list = []
for factor_list,coef_list in g.factor_list:
initial_list = get_all_securities().index.tolist()
initial_list = filter_new_stock(context, initial_list)
initial_list = filter_kcbj_stock(initial_list)
initial_list = filter_st_stock(initial_list)
#MS
factor_values = get_factor_values(initial_list,factor_list, end_date=yesterday, count=1)
df = pd.DataFrame(index=initial_list, columns=factor_values.keys())
for i in range(len(factor_list)):
df[factor_list[i]] = list(factor_values[factor_list[i]].T.iloc[:,0])
df = df.dropna()
df['total_score'] = 0
for i in range(len(factor_list)):
df['total_score'] += coef_list[i]*df[factor_list[i]]
df = df.sort_values(by=['total_score'], ascending=False) #分数越高即预测未来收益越高,排序默认降序
complex_factor_list = list(df.index)[:int(0.1*len(list(df.index)))]
q = query(valuation.code,valuation.circulating_market_cap,indicator.eps).filter(valuation.code.in_(complex_factor_list)).order_by(valuation.circulating_market_cap.asc())
df = get_fundamentals(q)
df = df[df['eps']>0]
lst = list(df.code)
lst = filter_paused_stock(lst)
lst = filter_limitup_stock(context, lst)
lst = filter_limitdown_stock(context, lst)
lst = lst[:min(g.stock_num, len(lst))]
final_list.extend(lst)
final_list = list(set(final_list))
return final_list
#1-3 整体调整持仓
def daily_adjustment(context):
#获取应买入列表
target_stock = get_stock_list(context)
target_list = target_stock[:g.stock_num]
if g.no_trading_today_signal:
#获得今日开平仓信号
today_sig= get_timing_signal(context)
try:
if today_sig > 0:
for stock in g.hold_list:
position = context.portfolio.positions[stock]
close_position(position)
print(context.previous_date,f'大盘风控止损触发,全仓卖出{stock}')
return
except:
print('开仓!')
#调仓卖出
for stock in g.hold_list:
if (stock not in target_stock) and (stock not in g.yesterday_HL_list):
log.info("卖出[%s]" % (stock))
position = context.portfolio.positions[stock]
close_position(position)
else:
log.info("已持有[%s]" % (stock))
#调仓买入
position_count = len(context.portfolio.positions)
target_num = len(target_list)
if target_num > position_count:
value = context.portfolio.cash / (target_num - position_count)
for stock in target_list:
if context.portfolio.positions[stock].total_amount == 0:
if open_position(stock, value):
if len(context.portfolio.positions) == target_num:
break
#1-4 调整昨日涨停股票
def check_limit_up(context):
now_time = context.current_dt
if g.yesterday_HL_list != []:
#对昨日涨停股票观察到尾盘如不涨停则提前卖出,如果涨停即使不在应买入列表仍暂时持有
for stock in g.yesterday_HL_list:
current_data = get_price(stock, end_date=now_time, frequency='1m', fields=['close','high_limit'], skip_paused=False, fq='pre', count=1, panel=False, fill_paused=True)
if current_data.iloc[0,0] < current_data.iloc[0,1]:
log.info("[%s]涨停打开,卖出" % (stock))
position = context.portfolio.positions[stock]
close_position(position)
else:
log.info("[%s]涨停,继续持有" % (stock))
#2-1 过滤停牌股票
def filter_paused_stock(stock_list):
current_data = get_current_data()
return [stock for stock in stock_list if not current_data[stock].paused]
#2-2 过滤ST及其他具有退市标签的股票
def filter_st_stock(stock_list):
current_data = get_current_data()
return [stock for stock in stock_list
if not current_data[stock].is_st
and 'ST' not in current_data[stock].name
and '*' not in current_data[stock].name
and '退' not in current_data[stock].name]
#2-3 过滤科创北交股票
def filter_kcbj_stock(stock_list):
for stock in stock_list[:]:
if stock[0] == '4' or stock[0] == '8' or stock[:2] == '68':
stock_list.remove(stock)
return stock_list
#2-4 过滤涨停的股票
def filter_limitup_stock(context, stock_list):
last_prices = history(1, unit='1m', field='close', security_list=stock_list)
current_data = get_current_data()
return [stock for stock in stock_list if stock in context.portfolio.positions.keys()
or last_prices[stock][-1] < current_data[stock].high_limit]
#2-5 过滤跌停的股票
def filter_limitdown_stock(context, stock_list):
last_prices = history(1, unit='1m', field='close', security_list=stock_list)
current_data = get_current_data()
return [stock for stock in stock_list if stock in context.portfolio.positions.keys()
or last_prices[stock][-1] > current_data[stock].low_limit]
#2-6 过滤次新股
def filter_new_stock(context,stock_list):
yesterday = context.previous_date
return [stock for stock in stock_list if not yesterday - get_security_info(stock).start_date < datetime.timedelta(days=250)]
#3-1 交易模块-自定义下单
def order_target_value_(security, value):
if value == 0:
log.debug("Selling out %s" % (security))
else:
log.debug("Order %s to value %f" % (security, value))
return order_target_value(security, value)
#3-2 交易模块-开仓
def open_position(security, value):
order = order_target_value_(security, value)
if order != None and order.filled > 0:
return True
return False
#3-3 交易模块-平仓
def close_position(position):
security = position.security
order = order_target_value_(security, 0) # 可能会因停牌失败
if order != None:
if order.status == OrderStatus.held and order.filled == order.amount:
return True
return False
#4-1 判断今天是否为账户资金再平衡的日期
def today_is_between(context, start_date, end_date):
today = context.current_dt.strftime('%m-%d')
if (start_date <= today) and (today <= end_date):
return True
else:
return False
#4-2 清仓后次日资金可转
def close_account(context):
if g.no_trading_today_signal == True:
if len(g.hold_list) != 0:
for stock in g.hold_list:
position = context.portfolio.positions[stock]
close_position(position)
log.info("卖出[%s]" % (stock))
#4-3 打印每日持仓信息
def print_position_info(context):
#打印当天成交记录
trades = get_trades()
for _trade in trades.values():
print('成交记录:'+str(_trade))
#打印账户信息
for position in list(context.portfolio.positions.values()):
securities=position.security
name=get_security_info(securities).display_name
cost=position.avg_cost
price=position.price
ret=100*(price/cost-1)
value=position.value
amount=position.total_amount
print('代码:{}'.format(securities))
print('名字:{}'.format(name))
print('成本价:{}'.format(format(cost,'.2f')))
print('现价:{}'.format(price))
print('收益率:{}%'.format(format(ret,'.2f')))
print('持仓(股):{}'.format(amount))
print('市值:{}'.format(format(value,'.2f')))
print('———————————————————————————————————')
print('———————————————————————————————————————分割线————————————————————————————————————————')
注册时间: