forked from quantopian/research_public
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pairs_trading_template.py
166 lines (118 loc) · 5.87 KB
/
pairs_trading_template.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import numpy as np
import statsmodels.api as sm
import pandas as pd
import quantopian.experimental.optimize as opt
import quantopian.algorithm as algo
def initialize(context):
# Quantopian backtester specific variables
set_slippage(slippage.FixedSlippage(spread=0))
set_commission(commission.PerTrade(cost=1))
set_symbol_lookup_date('2014-01-01')
context.stock_pairs = [(symbol('ABGB'), symbol('FSLR')),
(symbol('CSUN'), symbol('ASTI'))]
context.stocks = symbols('ABGB', 'FSLR', 'CSUN', 'ASTI')
context.num_pairs = len(context.stock_pairs)
# strategy specific variables
context.lookback = 20 # used for regression
context.z_window = 20 # used for zscore calculation, must be <= lookback
context.target_weights = pd.Series(index=context.stocks, data=0.25)
context.spread = np.ndarray((context.num_pairs, 0))
context.inLong = [False] * context.num_pairs
context.inShort = [False] * context.num_pairs
# Only do work 30 minutes before close
schedule_function(func=check_pair_status, date_rule=date_rules.every_day(), time_rule=time_rules.market_close(minutes=30))
# Will be called on every trade event for the securities you specify.
def handle_data(context, data):
# Our work is now scheduled in check_pair_status
pass
def check_pair_status(context, data):
prices = data.history(context.stocks, 'price', 35, '1d').iloc[-context.lookback::]
new_spreads = np.ndarray((context.num_pairs, 1))
for i in range(context.num_pairs):
(stock_y, stock_x) = context.stock_pairs[i]
Y = prices[stock_y]
X = prices[stock_x]
# Comment explaining try block
try:
hedge = hedge_ratio(Y, X, add_const=True)
except ValueError as e:
log.debug(e)
return
context.target_weights = get_current_portfolio_weights(context, data)
new_spreads[i, :] = Y[-1] - hedge * X[-1]
if context.spread.shape[1] > context.z_window:
# Keep only the z-score lookback period
spreads = context.spread[i, -context.z_window:]
zscore = (spreads[-1] - spreads.mean()) / spreads.std()
if context.inShort[i] and zscore < 0.0:
context.target_weights[stock_y] = 0
context.target_weights[stock_x] = 0
context.inShort[i] = False
context.inLong[i] = False
record(X_pct=0, Y_pct=0)
allocate(context, data)
return
if context.inLong[i] and zscore > 0.0:
context.target_weights[stock_y] = 0
context.target_weights[stock_x] = 0
context.inShort[i] = False
context.inLong[i] = False
record(X_pct=0, Y_pct=0)
allocate(context, data)
return
if zscore < -1.0 and (not context.inLong[i]):
# Only trade if NOT already in a trade
y_target_shares = 1
X_target_shares = -hedge
context.inLong[i] = True
context.inShort[i] = False
(y_target_pct, x_target_pct) = computeHoldingsPct(y_target_shares,X_target_shares, Y[-1], X[-1])
context.target_weights[stock_y] = y_target_pct * (1.0/context.num_pairs)
context.target_weights[stock_x] = x_target_pct * (1.0/context.num_pairs)
record(Y_pct=y_target_pct, X_pct=x_target_pct)
allocate(context, data)
return
if zscore > 1.0 and (not context.inShort[i]):
# Only trade if NOT already in a trade
y_target_shares = -1
X_target_shares = hedge
context.inShort[i] = True
context.inLong[i] = False
(y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares, X_target_shares, Y[-1], X[-1] )
context.target_weights[stock_y] = y_target_pct * (1.0/context.num_pairs)
context.target_weights[stock_x] = x_target_pct * (1.0/context.num_pairs)
record(Y_pct=y_target_pct, X_pct=x_target_pct)
allocate(context, data)
return
context.spread = np.hstack([context.spread, new_spreads])
def hedge_ratio(Y, X, add_const=True):
if add_const:
X = sm.add_constant(X)
model = sm.OLS(Y, X).fit()
return model.params[1]
model = sm.OLS(Y, X).fit()
return model.params.values
def computeHoldingsPct(yShares, xShares, yPrice, xPrice):
yDol = yShares * yPrice
xDol = xShares * xPrice
notionalDol = abs(yDol) + abs(xDol)
y_target_pct = yDol / notionalDol
x_target_pct = xDol / notionalDol
return (y_target_pct, x_target_pct)
def get_current_portfolio_weights(context, data):
positions = context.portfolio.positions
positions_index = pd.Index(positions)
share_counts = pd.Series(
index=positions_index,
data=[positions[asset].amount for asset in positions]
)
current_prices = data.current(positions_index, 'price')
current_weights = share_counts * current_prices / context.portfolio.portfolio_value
return current_weights.reindex(positions_index.union(context.stocks), fill_value=0.0)
def allocate(context, data):
# Set objective to match target weights as closely as possible, given constraints
objective = opt.TargetPortfolioWeights(context.target_weights)
# Define constraints
constraints = []
constraints.append(opt.MaxGrossLeverage(1.0))
algo.order_optimal_portfolio(objective=objective, constraints=constraints, universe=context.stocks)