Skip to content

Instantly share code, notes, and snippets.

@audubon
Last active December 30, 2017 22:52
Show Gist options
  • Save audubon/6a2c700101052bea79a0683ce203162c to your computer and use it in GitHub Desktop.
Save audubon/6a2c700101052bea79a0683ce203162c to your computer and use it in GitHub Desktop.
import numpy as np
import scipy.stats as ss
import time,calendar
import math
from sklearn.decomposition import PCA
import statsmodels.api as sm
calc_returns = lambda x: np.log(x / x.shift(1))[1:]
scale = lambda x: (x - x.mean()) / x.std()
def OLSreg(y, Xmat):
return sm.OLS(y, sm.add_constant(Xmat, prepend = True)).fit()
def coc (F,D,S,t):
"""
F = S * exp (r*t) - D.
So, r = ln((F+D)/S)/t
Where F - Futrues price
S - spot price
r - cost of carry
t - time to expiry
D - dividends to be paid during the life of the futures contract or storage cost
"""
return np.log((F+D)/S)/t
def future_price (S,r,u,q,y,T):
"""
F=S* exp((r+u-q-y)T)
future_price(10,.04,.01,0.01,0.01,1)=10.30454
r= irate
u= storage cost
q= income
y= conveneince yield
"""
return S* exp((r+u-q-y)*T)
def info(seqn):
"""
shannons formula
"""
bits=0
s=float(sum(seqn))
print s
for i in seqn:
p=i/s
bits-=p*math.log(p,2)
return bits
def irate_parity ():
"""
R : T-year dollar par swap rate with fixed flows paid quarterly.
f : T-year foreign par swap rate with fixed flows paid quarterly.
Yz : T-year dollar zero-coupon swap rate.
Fz : T-year foreign zero-coupon swap rate.
Lu : 3-month dollar LIBOR at time t.
Lf : 3-month foreign LIBOR at time t.
bu : A swap of the overnight dollar default-free rate is fair against 3-month dollar LIBOR minus b .
bf : A swap of the overnight foreign default-free rate is fair against 3-month foreign LIBOR
S : spot rate
X : A swap of 3-month dollar LIBOR plus X is fair against 3-month foreign LIBOR.
Lehman Paper:
Interest Rate Parity, Money Market Basis Swaps, and Cross-Currency Basis Swaps
"""
S=109
Yz=.042
T=3./12
Fz=.04
X=.046
f=.049
R=.0399
bu=.0334
bd=.0394
num=(1+Yz)**T
dnum=(1+Fz)**T
onum= num-1
odnum= R* num
xnum= dnum-1
xdnum= R* dnum
F= S * ( num / dnum ) * ( 1+X* ( onum/odnum ) )
#top=num*( 1-bu * (onum/odnum) )
#bot = dnum*( 1-bf* xnum/zdnum )
#F=S*top/bot
return F
def normalize_vec (arr):
"""
http://blog.wolfire.com/2009/07/linear-algebra-for-game-developers-part-2/
"""
return arr/np.linalg.norm(arr)
def local_to_utc(t):
"""
Make sure that the dst flag is -1 -- this tells mktime to take daylight
savings into account
local_to_utc(time.localtime())
http://docs.python.org/library/time.html#time.struct_time
"""
secs = time.mktime(t)
return time.gmtime(secs)
def utc_to_local(t):
secs = calendar.timegm(t)
return time.localtime(secs)
def ortho_poly_fit(x, degree = 1):
n = degree + 1
x = np.asarray(x).flatten()
if(degree >= len(np.unique(x))):
stop("'degree' must be less than number of unique points")
xbar = np.mean(x)
x = x - xbar
X = np.fliplr(np.vander(x, n))
q,r = np.linalg.qr(X)
z = np.diag(np.diag(r))
raw = np.dot(q, z)
norm2 = np.sum(raw**2, axis=0)
alpha = (np.sum((raw**2)*np.reshape(x,(-1,1)), axis=0)/norm2 + xbar)[:degree]
Z = raw / np.sqrt(norm2)
return Z, norm2, alpha
def ortho_poly_predict(x, alpha, norm2, degree = 1):
x = np.asarray(x).flatten()
n = degree + 1
Z = np.empty((len(x), n))
Z[:,0] = 1
if degree > 0:
Z[:, 1] = x - alpha[0]
if degree > 1:
for i in np.arange(1,degree):
Z[:, i+1] = (x - alpha[i]) * Z[:, i] - (norm2[i] / norm2[i-1]) * Z[:, i-1]
Z /= np.sqrt(norm2)
return Z
def normalize (nparr,ulimit=10,llimit=0):
"""
returns range between 0 .. 1
"""
difflmt=ulimit-llimit
lnparr=min(nparr)
unparr=max(nparr)
diff=unparr-lnparr
d=[]
for i in nparr:
d.append( float(i-lnparr) / diff)
return d
def forward_rate(spotOne,lenOne,spotTwo,lenTwo):
"""
2yr spot
1.5yr spot
result 6mth rate 1.5yrs from now.
spotOne=.06
lenOne=2.
spotTwo=.05
lenTwo=1.5
"""
a=(1+spotOne/2)**(lenOne*2)
b=(1+spotTwo/2)**(lenTwo*2)
return ( a/b -1)*2
def forward_rate2(spotOne,lenOne,spotTwo,lenTwo):
"""
0.2680433 = forward_rate2(.05,1.8,.07,2.)
2yr spot
1.5yr spot
result 6mth rate 1.5yrs from now.
spotOne=.06
lenOne=2.
spotTwo=.05
lenTwo=1.5
"""
a=(1+spotTwo)**lenTwo
b=(1+spotOne)**lenOne
delta= lenTwo - lenOne
c=a/b
recip = 1/delta
#x=c**recip
return c-1
def takepoint (risk,reward):
"""
* Double/Pass puts us at -4 -3 or 43% match equity
* Double/Take/Lose gives -4 -1 or 19% match equity
* Double/Take/Win wins the match or 100% match equity
We?d be risking 24% (43-19) to gain 57% (100-43).
The formula for the raw take point is risk/(risk+gain)
so here it?s 24/(24+57) or 24/81 which is just under 30% as our raw take point.
"""
return risk/(risk+reward)
def ppp(a=1.95,b=3.73,AB_ex=6.78):
"""
burgernomics PPP calculation
a=price of good in foreign land
b=price of good in dometic land
AB_ex= a per b exchange rate
returns: fair value of currency
"""
return a/b*AB_ex
def tokyoGold2USDoz (yengram,USDJPY):
#1 ounce = 28.3495231 grams
grams=28.3495231
return yengram
def hedge_ratio (a, b):
"""
min variance hedge ratio
log returns
"""
astd = a.std(ddof=1)
bstd = b.std(ddof=1)
c = np.corrcoef( [a,b ])
return bstd/astd*c[1,0]
def hurst (x):
"""
takes log returns
hurst exponent is you measure of "trendiness".
if it is going back to 0.5 that would mean your serie is loosing trendiness and behaves as a random walk again.
(hurst >0.5 -> autocorrelation,ie trends; hurst<0.5 -> negative aurocorrelation,ie mean reverting behavior)
"""
N=len(x)
R=np.zeros(N,float)
S=np.zeros(N,float)
for n in range(1,N):
avg=x[:n].mean()
X=np.zeros(n,float)
for t in range(1,n):
X[t]=X[t-1]+(x[t-1]-avg)
R[n]=X.max()-X.min()
S[n]=np.sqrt(1./n*((x[:n]*avg)**2).sum())
Z=R[3:]/S[3:]
M=np.zeros((N-3,2),float)
M[:,0]=np.log(np.arange(3,N))
M[:,1]=np.log(Z)
H= ss.linregress(M[:,0],M[:,1])
return M, H[0]
def simplereturns (prices):
# http://matplotlib.sourceforge.net/examples/misc/longshort.html
prices = map(float,prices)
#gains = np.zeros_like(prices)
#gains[1:] = np.diff(prices)/prices[:-1]
#return gains[1:]
return np.diff(prices)/prices[:-1]
def logreturns (prices):
return np.log(prices[1:]) - np.log(prices[:-1])
def window(iterable, window_len=2, window_step=1):
itr = iter(iterable)
step_range = xrange(window_step)
current_window = collections.deque(islice(itr, window_len))
while window_len == len(current_window):
yield current_window
for idx in step_range:
current_window.popleft()
current_window.extend(islice(itr, window_step))
def ang(u,v):
"""
ang([-1,-1],[1,-1]) = 90
so assume tseries goes 1,2,1 then vectors are [-1,-1] (and from 2) [1,-1]
2 is the vertex
"""
c=np.dot(u,v)/np.linalg.norm(u)/np.linalg.norm(v)
return math.degrees(np.arccos(c))
def A(dx, dy):
return math.degrees( math.atan2(dy, dx) )
def cheapdeliver ():
"""
1 full point in 10y is (very roughly) arnd 14bps.
1 full point in the ultra-long is (very roughly) arnd 5bps
The formula to use is simple. Fwd price of the CTD = Futures Price * Conversion Factor.
Once you have the fwd price, you can calculate the yield easily (e.g. using the YIELD
function in Excel). Do that for two futures prices and you have your rough answer (rough,
because it glosses over quite a few complexities inherent in pricing bond futures).
"""
pass
def weightedmid(b,bqty,a,aqty):
"""
weighted mid price
http://www.personal.psu.edu/qxc2/research/JofFuturesMarkets2.pdf
"""
return a*bqty + b*aqty /(baty + aqty)
from numpy import *
def entropy(*args):
"""
args => AoA
a = [0, 1, 1, 0, 1, 0, 0, 0, 1, 0] and b = [1, 1, 1, 0, 0, 0, 1, 0, 1, 0]
"""
xy = zip(*args)
# probs
proba = [ float(xy.count(c)) / len(xy) for c in dict.fromkeys(list(xy)) ]
entropy= - sum([ p * log2(p) for p in proba ])
return entropy
def cagr(endval,begval,yrs):
return (float(endval) / float(begval) ) ** (1. / yrs) -1
def convenienceYield(r, T, F, S):
""" http://en.wikipedia.org/wiki/Convenience_yield
inventories low -> S > F -> c > r (backwardation)
inventories high -> S < F -> c < r (contango)
r =.035
T = .5 6month
F = 1300.0
S = 1371.0
"""
recipT = 1/float(T)
return r + recipT*(1-(F/S))
def convenienceYieldContinous(r, T, F, S):
"""
r =.035
T = .5 6month
F = 1300.0
S = 1371.0
"""
recipT = 1/float(T)
return r - recipT* np.log(F/S)
class QuandlCurve(object):
def __init__ (self,quandlcode,nummonths,columnnum):
""" qc=QuandlCurve('OFDP/FUTURE_CL',5,4)
"""
self.QUANDLTOKEN = QUANDLTOKEN
self.nummonths=nummonths
curve = [ u'%s%s.%s'%(quandlcode,k,columnnum) for k in range(1,nummonths+1) ]
self.df = Quandl.get(curve,authtoken=self.QUANDLTOKEN)
def plot(self):
e = pd.concat([self.df[-20:-19].T,self.df[-10:-9].T,self.df[-1:].T],axis=1)
e.plot(style=['k--','r+-','b-.'])
"""
https://www.quandl.com/OWF/documentation/methodology
Describing the Skew Curve
A volatility skew curve shows the implied volatilities of options at various strike prices. Skew curves are sometimes calculated by fitting a smooth curve through a number of market-observed quotes for at-the-money and out-of-the-money options.
The OWF database provides a smooth volatility skew curve using this 6-degree polynomial model:
IV = AtM + Beta1*x + Beta2*x^2 + Beta3*x^3 + Beta4*x^4 + Beta5*x^5 + Beta6*x^6
where:
IV = implied volatility at a given strike price
AtM = at-the-money implied volatility
x = "moneyness" of the strike = ln (strike price / futures price)
Beta1 to Beta6 = model coefficients
"""
def worktogether(*args):
# time to finish
#http://www.algebra.com/algebra/homework/Rate-of-work-word-problems/HOW-TO-Solve-Rate-of-Work-Problems.lesson
return np.reciprocal( sum([ np.reciprocal(float(a)) for a in args]) )
def bootstrap_resample(X, n=None):
""" Bootstrap resample an array_like
Parameters
----------
X : array_like
data to resample
n : int, optional
length of resampled array, equal to len(X) if n==None
Results
-------
returns X_resamples
"""
if isinstance(X, pd.Series):
X = X.copy()
X.index = range(len(X.index))
if n == None:
n = len(X)
resample_i = np.floor(np.random.rand(n)*len(X)).astype(int)
X_resample = np.array(X[resample_i]) # TODO: write a test demonstrating why array() is important
return X_resample
def bootstrap_resample(X, n=None):
""" Bootstrap resample an array_like
Parameters
----------
X : array_like
data to resample
n : int, optional
length of resampled array, equal to len(X) if n==None
Results
-------
returns X_resamples
"""
if n == None:
n = len(X)
resample_i = np.floor(np.random.rand(n)*len(X)).astype(int)
X_resample = X[resample_i]
return X_resample
def profitfactor (avgwinday,avglosday , numwindays ,numlosdays):
return avgwinday/avglosday * numwindays /numlosdays
def QuoteImbalance(sharesAddedBestbid , sharesCxlBestbid , sharesAddedBestask , sharesCxlBestask):
return sharesAddedBestbid -sharesCxlBestbid - sharesAddedBestask + sharesCxlBestask
def TradeImbalance(sharesTradedAboveMid ,SharesTradedBelowMid):
return sharesTradedAboveMid - SharesTradedBelowMid
import scipy as sp
def loglossfun(act, pred):
"""
loglossfun(np.array([1,1,0,1,0]),np.array([.75,.64,.26,.62,.42]))
"""
epsilon = 1e-15
pred = sp.maximum(epsilon, pred)
pred = sp.minimum(1-epsilon, pred)
ll = sum(act*sp.log(pred) + sp.subtract(1,act)*sp.log(sp.subtract(1,pred)))
ll = ll * -1.0/len(act)
return ll
def logloss(y, yhat):
"""
loglossfun(np.array([1,1,0,1,0]),np.array([.75,.64,.26,.62,.42]))
"""
score = -sum(map(lambda y, yhat: y*log(yhat) + (1-y)*log(1-yhat), y, yhat))/len(y)
return score
def moneyness(strike,price):
return np.log(strike/price)
calc_returns = lambda x: np.log(x / x.shift(1))[1:]
scale = lambda x: (x - x.mean()) / x.std()
def beta (dfpctchg,seriespctchg):
""" dfpctchg : 2 column pctchange dataframe
seriespctchg : series of second column
In [32]: p.tail()
Out[32]:
Adj_Close Settle
Date
2014-12-09 0.002320 0.012182
2014-12-10 -0.016204 -0.044076
2014-12-11 -0.020392 -0.015860
2014-12-12 -0.031225 -0.035056
2014-12-15 -0.016529 -0.031336
In [31]: p.cov() / p.Settle.var()
Out[31]:
Adj_Close Settle
Adj_Close 0.687536 0.479343
Settle 0.479343 1.000000
beta: 0.479343
"""
return dfpctchg.cov() / seriespctchg.var()
def make_pca_index(data, scale_data = True,**kwargs):
'''
Compute the correlation matrix of a set of stock data, and return
the first principal component.
By default, the data are scaled to have mean zero and variance one
before performing the PCA.
'''
components = kwargs.get('components',1)
if scale_data:
data_std = data.apply(scale)
else:
data_std = data
corrs = np.asarray(data_std.cov())
pca = PCA(n_components = components).fit(corrs)
mkt_index = -scale(pca.transform(data_std))
return mkt_index
"""
pca_fit = PCA(n_components = len(returns.columns)).fit(returns.corr())
pca_fit.explained_variance_ratio_.cumsum()
"""
if __name__ == "__main__":
r = np.random.randint(10, high=60, size=100)
print r
print simplereturns(r)
print hurst( simplereturns(r))
a = np.random.randint(10, high=60, size=100)
b = np.random.randint(10, high=60, size=100)
print hedge_ratio( logreturns(a),logreturns(b) )
print normalize(r)
rk=[
[50 ,51.23, 67.69 ,68.97, 80.92 ]
,[48.77 ,50 ,59.90 ,66.86 ,74.34 ]
,[32.31 ,40.10, 50 ,57.14 ,64.77 ]
,[31.03 ,33.14 ,42.86 ,50 ,57.74 ]
,[19.08 ,25.66 ,35.23, 42.26 ,50]
]
met=np.mat(rk)
score=[5,4,3,2,1]
plot( rk[0],score)
plot(met[:,0].transpose().tolist()[0],score)
plot( rk[1],score)
plot(met[:,1].transpose().tolist()[0],score)
plot( rk[2],score)
plot(met[:,2].transpose().tolist()[0],score)
plot( rk[3],score)
plot(met[:,3].transpose().tolist()[0],score)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment