【量化投资】策略七(聚宽)
程序员文章站
2022-03-22 23:36:58
...
简述
这里通过加大了对冲beta的比例,使得整个策略算是更进一步了。
效果
代码
# 导入函数库
import statsmodels.api as sm
from statsmodels import regression
import numpy as np
import pandas as pd
import time
from datetime import date
import scipy.stats as stats
# 一键回测说明:
# 百度聚宽-》注册账号-》我的策略里面创建策略-》复制代码到里面
# 右边回测 开始时间:2017-1-1 终止时间:今天 资金:10000000
# 初始化函数,设定基准等等
def initialize(context):
g.tc= 2 # 调仓频率
# 下面是框架固定部分,不需要修改
g.N = 20 # 持仓数目
g.t = 0 # 记录运行的天数
g.weight_list = [1] # 因子的权重参数
log.set_level('order', 'error')
set_option('use_real_price', True) # 用真实价格交易
set_slippage(FixedSlippage(0)) # 将滑点设置为0
set_commission(PerTrade(buy_cost=0.0000, sell_cost=0.000, min_cost=0)) # 手续费设置为0
# set_benchmark('000905.XSHG') #中证500为业绩基准
set_benchmark('000300.XSHG') # 沪深300为业绩基准
# 选股范围为全市场选股:上证+深证股票
# g.stockrange= get_index_stocks('000001.XSHG')+get_index_stocks('399106.XSHE')
g.stockrange = get_index_stocks('000300.XSHG')
init_cash_ = context.portfolio.starting_cash/3
g.init_cash = init_cash_
# 操作期货
set_subportfolios([SubPortfolioConfig(cash=init_cash_, type='stock'),
SubPortfolioConfig(cash=init_cash_ * 2 , type='futures')])
g.futures_rate = 0.01 # 100倍的杠杆
set_option('futures_margin_rate', g.futures_rate)
# 每当g.calDays == 0的时候就更新一下初始值
g.orignalValue = 0
g.TotalDays = 30
g.calDays = 0
g.levels = [0.01, 0.032, 0.055]
# -20 -8 -1 0 1 8 20
# 全买 买2/3 买1/3 卖1/6 买1/6 ...
g.betaName = 'IF9999.CCFX'
## 每根日线运行一次
def handle_data(context, data):
if g.t == 0:
# 设置可行股票池:用set_feasible_stocks函数剔除当前或者计算样本期间停牌的股票
g.all_stocks = pickStock(context, g.stockrange)
previousStr = str(context.previous_date)
# adjustedBeta
# adjustedBeta(context)
stock_sort = get_all_cleaned_factor_ranked(g.all_stocks, g.weight_list, previousStr)
# 调仓
rebalance_position(context, stock_sort)
# order_stock_sell(context,data,stock_sort)
# order_stock_buy(context,data,stock_sort)
# g.t+=1 # speed up
g.t = (g.t + 1) % g.tc
def get_all_cleaned_factor_ranked(stocks, weight_list, previousStr):
# value = my_apha1(stocks)
value = my_alpha2(stocks)
value = (-value).argsort()[:g.N]
return list(map(lambda x: stocks[x], value))
'''
factor1
'''
def single_alpha1(stock):
df = get_price(stock, count=26, fields=['close'])
returns = (df[1:] - df[:-2]) / df[:-2] # 25 lines
value = [0] * 5
for i in range(5):
if np.array(returns).tolist()[-1 - i] < 0:
value[i] = returns.ix[-20 - i:-1 - i].std() ** 2
else:
value[i] = df['close'][-1 - i] ** 2
return 5 - np.array(value).argmax()
def my_apha1(stocks):
value = list(map(single_alpha1, stocks))
MAX = max(value)
MIN = min(value)
value = (np.array(value) - MIN) / (MAX - MIN)
return value
'''
END factor1
'''
'''
factor2
'''
def single_alpha2(stock):
df = get_price(stock, count=8, fields=['open', 'close', 'volume'])
volume = np.log(df['volume'].tolist())
delta_volume = volume[2:] - volume[:-2]
close = df['close']
open_ = df['open']
rate = (close - open_) / open_
rate = np.array(rate)[2:]
return delta_volume, rate
def my_alpha2(stocks):
values = list(map(single_alpha2, stocks))
T_values = list(zip(*values))
T_values = list(map(doubleListRank, T_values))
T_values = list(map(list, zip(*T_values)))
values = np.array(map(Neg_correlation, T_values))
return values
def Neg_correlation(adlist):
return -stats.pearsonr(np.array(adlist[0]), np.array(adlist[1]))[0]
def doubleListRank(ddlist):
ddlist6 = list(zip(*ddlist))
ddlist6 = list(map(rank, ddlist6))
return list(map(list, zip(*ddlist6)))
def rank(alist):
MAX = max(alist)
MIN = min(alist)
return (np.array(alist) - MIN) / (MAX - MIN)
# (-1 * correlation(rank(delta(log(volume), 2)), rank(((close - open)/ open)), 6))
# 6天以来的相关系数() 两个数值分别是
# rank(delta(log(volume), 2))
'''
END factor2
'''
'''
初步筛选股票
'''
# pick stocks
def pickStock(context, stocks):
# universe = set_feasible_stocks(stocks)
universe = filter_specials(stocks, context)
# 过滤上市时间小于60天的股票
for stock in universe:
days_public = (context.current_dt.date() - get_security_info(stock).start_date).days
if days_public < 60:
universe.remove(stock)
g.lenth = len(universe)
return universe
# 过滤停牌的股票
def set_feasible_stocks(stock_list):
current_data = get_current_data()
stock_list = [stock for stock in stock_list if not current_data[stock].paused] # 不考虑停盘的股票
return stock_list
# 将多因子的dataframe进行排序,并且将有空值的行去掉
def rank_stock(all_factor, weight_list):
C = len(all_factor.columns)
ranked = all_factor.iloc[:, 0].rank() * weight_list[0]
if C > 1:
for j in range(1, C):
ranked = all_factor.iloc[:, j].rank() * weight_list[j] + ranked
ranked = pd.DataFrame(ranked)
ranked.columns = ['rank']
one_sort = ranked.sort('rank', ascending=g.ascending)
stock_sort = one_sort.index[:g.N]
return stock_sort
#过滤退市,停牌,ST
def filter_specials(stock_list,context):
curr_data = get_current_data()
stock_list = [stock for stock in stock_list if \
(not curr_data[stock].paused) # 未停牌
and (not curr_data[stock].is_st) # 非ST
and ('ST' not in curr_data[stock].name)
and ('*' not in curr_data[stock].name)
and ('退' not in curr_data[stock].name)
and (curr_data[stock].low_limit < curr_data[stock].day_open < curr_data[stock].high_limit) ]
return stock_list
'''
END 初步筛选股票
'''
'''
调仓
'''
def rebalance_position(context, stocks_list):
current_holding = context.subportfolios[0].positions.keys()
stocks_to_sell = list(set(current_holding) - set(stocks_list))
# 卖出
bulk_orders(stocks_to_sell, 0)
total_value = context.subportfolios[0].total_value
rebalance_beta(context)
# 买入
bulk_orders(stocks_list, total_value / len(stocks_list))
def rebalance_beta(context):
if g.calDays == 0:
g.orignalValue = context.subportfolios[0].total_value
else:
total_value = context.subportfolios[0].total_value
rate = (total_value - g.orignalValue) / g.orignalValue
money = context.subportfolios[1].transferable_cash / g.futures_rate
if rate >= 0 and rate < g.levels[0]:
bete_order(money / 6.0)
elif rate <= -g.levels[0] and rate >= -g.levels[1]:
bete_order(money / 3)
elif rate < -g.levels[1] and rate >= -g.levels[2]:
bete_order(money * 2.0 / 3)
elif rate < -g.levels[2]:
bete_order(money)
elif rate < 0 and rate >= -g.levels[0]:
bete_order(-money / 6.0)
elif rate > g.levels[0] and rate <= g.levels[1]:
bete_order(-money / 3)
elif rate > g.levels[1] and rate <= g.levels[2]:
bete_order(-money * 2.0 / 3)
elif rate > g.levels[2]:
bete_order(-money)
print (context.subportfolios[0].total_value - g.init_cash) / g.init_cash
g.calDays = (g.calDays + 1) % g.TotalDays
# 批量买卖股票
def bulk_orders(stocks_list, target_value):
for i in stocks_list:
order_target_value(i, target_value, pindex=0)
def bete_order(money):
amount = int( money / get_current_data()[g.betaName].last_price)
order_target(g.betaName, amount , side='long', pindex=1) # 做空
'''
END 调仓
'''
'''
调仓初版
'''
##获得卖出信号,并执行卖出操作
# 输入:context, data,已排序股票列表stock_sort-list类型
# 输出:none
def order_stock_sell(context, data, stock_sort):
# 对于不需要持仓的股票,全仓卖出
for stock in context.portfolio.positions:
# 除去排名前g.N个股票(选股!)
if stock not in stock_sort:
stock_sell = stock
order_target_value(stock_sell, 0)
# 获得买入信号,并执行买入操作
# 输入:context, data,已排序股票列表stock_sort-list类型
# 输出:none
def order_stock_buy(context, data, stock_sort):
# 对于需要持仓的股票,按分配到的份额买入
for stock in stock_sort:
stock_buy = stock
order_target_value(stock_buy, g.everyStock)
'''
END 调仓初版
'''
上一篇: TypeScript