Skip to content

Instantly share code, notes, and snippets.

@bristena-op
Last active May 10, 2018 12:19
Show Gist options
  • Save bristena-op/f4d2de8110e1b3d33d2c86dd8f77b913 to your computer and use it in GitHub Desktop.
Save bristena-op/f4d2de8110e1b3d33d2c86dd8f77b913 to your computer and use it in GitHub Desktop.
# Some of the code taken from http://alimanfoo.github.io/2016/06/21/understanding-pbwt.html;
X1 = [[0, 1, 0, 1, 0, 1, 2],
[1, 1, 0, 0, 0, 1, 3],
[1, 1, 1, 1, 1, 1, 4],
[0, 1, 1, 1, 1, 0, 5],
[0, 0, 0, 0, 0, 0, 6],
[1, 0, 0, 0, 1, 0, 7],
[1, 1, 0, 0, 0, 1, 8],
[0, 1, 0, 1, 1, 0, 9]]
X = [[0, 1, 0, 1, 0, 1],
[1, 1, 0, 0, 0, 1],
[1, 1, 1, 1, 1, 1],
[0, 1, 1, 1, 1, 0],
[0, 0, 0, 0, 0, 0],
[1, 0, 0, 0, 1, 0],
[1, 1, 0, 0, 0, 1],
[0, 1, 0, 1, 1, 0]]
Y = [[0,0,0,1],
[1,0,0,1],
[1,1,1,1],
[0,0,0,0]
]
y = [1,1,1,1]
# y= [0,0,0,0]
Z = [[0,1,1],
[0,0,1]]
import numpy as np
def build_ppa(X):
X = np.array(X)
# N is the length of each haplotype
# M is the number of haplotypes
N, M = X.transpose().shape[:2]
ppa = np.zeros((N,M), dtype = "u4")
for i in range(M):
ppa[0,i] = i
for k in range(N):
a = list()
b = list()
u = v = 0
for index in ppa[k]:
if X[index, k] ==0:
a.append(index)
u+=1
elif X[index, k] == 1:
b.append(index)
v+=1
else:
break
# print(a,b)
ppa[k+1] = a+b
# print(ppa)
return ppa
def build_ppa_div(X):
X = np.array(X)
# N is the length of each haplotype
# M is the number of haplotypes
N, M = X.transpose().shape[:2]
ppa = np.zeros((N+1, M), dtype="u4")
div = np.zeros((N+1, M+1), dtype="u4")
w0 = np.zeros((N, M), dtype="u4")
w1 = np.zeros((N, M), dtype="u4")
for i in range(M):
ppa[0, i] = i
for k in range(N):
a = list()
b = list()
d = list()
e = list()
u = v = 0
p = q = k+1
for index, match_start in zip(ppa[k], div[k]):
if match_start > p:
p = match_start
if match_start > q:
q = match_start
if X[index, k] == 0:
a.append(index)
u += 1
d.append(p)
p = 0
w0[k,i] = u
elif X[index, k] == 1:
b.append(index)
v += 1
e.append(q)
q = 0
w1[k,i] = v
else:
break
# print(a,b)
# print (ppa)
w1[k] += u -1
if a and b:
ppa[k + 1] = a + b
div[k + 1] = d + e +[0]
# print("ppa",ppa)
# print("div", div)
return ppa, div ,w0, w1
def report_long_match(X,L):
X = np.array(X)
N, M = X.transpose().shape[:2]
ppa = np.zeros((N+1, M), dtype="u4")
div = np.zeros((N+1, M), dtype="u4")
for i in range(M):
ppa[0, i] = i
for k in range(N):
a = list()
b = list()
d = list()
e = list()
u = v = 0
p = q = k + 1
ma = list()
mb = list()
for index, match_start in zip(ppa[k], div[k]):
if match_start > k - L:
if ma and mb:
yield k, ma, mb
ma = list()
mb = list()
if match_start > p:
p = match_start
if match_start > q:
q = match_start
if X[index, k] == 0:
a.append(index)
u += 1
d.append(p)
p = 0
ma.append(index)
elif X[index, k] == 1:
b.append(index)
v += 1
e.append(q)
q = 0
mb.append(index)
else:
break
if ma and mb:
yield k, ma, mb
if a and b:
ppa[k + 1] = a + b
div[k + 1] = d + e
# print(ppa, div)
def report_long_matches(X,L):
X = np.array(X)
# N is the length of each haplotype
# M is the number of haplotypes
N, M = X.transpose().shape[:2]
ppa = np.zeros((N+1, M), dtype="u4")
div = np.zeros((N+1, M), dtype="u4")
for i in range(M):
ppa[0, i] = i
for k in range(N):
a = list()
b = list()
d = list()
e = list()
u = v = 0
p = q = k + 1
for index, match_start in zip(ppa[k], div[k]):
if match_start > p:
p = match_start
if match_start > q:
q = match_start
if X[index, k] == 0:
a.append(index)
u += 1
d.append(p)
p = 0
elif X[index, k] == 1:
b.append(index)
v += 1
e.append(q)
q = 0
else:
break
if a and b:
ppa[k + 1] = a + b
div[k + 1] = d + e
print(ppa)
print(div)
for k in range(N):
a = np.zeros(M, dtype = 'u4')
b = np.zeros(M, dtype = 'u4')
u = v = 0
# ma = list()
# mb = list()
for index, match_start in zip(ppa[k], div[k]):
if match_start>k-L:
if u > 0 and v > 0:
for i in range(u):
for j in range(v):
print("Match from ", a[i], 'to', b[j], 'ending at', k)
u = v = 0
if X[index,k] == 0:
a[u] = index
u+=1
elif X[index, k] == 1:
b[v] = index
v+=1
else:
if u > 0 and v > 0:
for i in range(u):
for j in range(v):
print("Match from ", a[i], 'to', b[j], 'ending at', k)
if u > 0 and v > 0:
for i in range(u):
for j in range(v):
print("Match from ", a[i], 'to', b[j], 'ending at', k)
def report_set_max_match(X):
X = np.array(X)
# N is the length of each haplotype
# M is the number of haplotypes
N, M = X.transpose().shape[:2]
ppa, div = build_ppa_div(X)
max_match =[]
l = 0
print(ppa, div)
for k in range(N+1):
div[k,0] = k + 1
div[k,M] = k + 1
for i in range(M-1):
m = i-1
n = i+1
if div[k, i] <= div[k,i+1]:
while div[k,m]<= div[k,i]:
if k!=N:
if X[ppa[k,m], k] == X[ppa[k,i],k]:
break
m -= 1
if div[k,i] >= div[k,i+1]:
while div[k,n] <= div[k,i+1]:
if k != N:
if X[ppa[k,n],k] == X[ppa[k,i], k] and k!=N:
break
n +=1
for j in range(m+1, i):
if div[k,i]<k :
if k - div[k,i] > l:
l = k - div[k,i]
max_match = [(ppa[k,i], ppa[k,j], div[k,i], k)]
elif k - div[k,i] == l:
max_match.append((ppa[k,i], ppa[k,j], div[k,i], k))
print("Match from ", ppa[k,i], 'to', ppa[k,j],"starting at", div[k,i], 'ending at', k)
for j in range(i+1, n):
if div[k,i+1] <k :
if k - div[k,i+1] > l:
l = k - div[k,i+1]
max_match = [(ppa[k,i], ppa[k,j], div[k,i+1], k)]
elif k - div[k,i] == l:
max_match.append((ppa[k,i], ppa[k,j], div[k,i+1], k))
print("Match from ", ppa[k, i], 'to', ppa[k, j], "starting at", div[k, i+1], 'ending at', k)
for item in max_match:
print("Match from ", item[0], 'to',item[1], "starting at", item[2], 'ending at', item[3])
return max_match
def report_set_max_match_with_thresh(X, L):
X = np.array(X)
# N is the length of each haplotype
# M is the number of haplotypes
N, M = X.transpose().shape[:2]
ppa, div = build_ppa_div(X)
max_match =[]
print(ppa, div)
for k in range(N+1):
div[k,0] = k + 1
div[k,M] = k + 1
for i in range(M-1):
m = i-1
n = i+1
if div[k, i] <= div[k,i+1]:
while div[k,m]<= div[k,i]:
if k!=N:
if X[ppa[k,m], k] == X[ppa[k,i],k]:
break
m -= 1
if div[k,i] >= div[k,i+1]:
while div[k,n] <= div[k,i+1]:
if k != N:
if X[ppa[k,n],k] == X[ppa[k,i], k] and k!=N:
break
n +=1
for j in range(m+1, i):
if div[k,i]<k:
if k - div[k,i] >= L:
max_match.append((ppa[k,i], ppa[k,j], div[k,i], k))
print("Match from ", ppa[k,i], 'to', ppa[k,j],"starting at", div[k,i], 'ending at', k)
for j in range(i+1, n):
if div[k,i+1] <k:
if k - div[k,i+1] >= L:
max_match.append((ppa[k,i], ppa[k,j], div[k,i+1], k))
print("Match from ", ppa[k, i], 'to', ppa[k, j], "starting at", div[k, i+1], 'ending at', k)
return max_match
def maximal_matches_with_z(z,X):
X = np.array(X)
# N is the length of each haplotype
# M is the number of haplotypes
N, M = X.transpose().shape[:2]
ppa, div, w0, w1 = build_ppa_div(X)
e = 0
f = 0
g = 1
e1 =0
l=2
for k in range(N):
div[k,0] = k+1
div[k,M] = k+1
if k > 0:
if z[k] == 0:
f1 = w0[k,f]
g1 = w0[k,g]
else:
f1 = w1[k,f]
g1 = w1[k,f]
if f1 < g1:
e1 = e
else:
for i in range(f, g):
print("Match to", ppa[k, i], "from", e, "to", k)
e1 = div[k+1, f1] - 1
if z[e1] == 0 and f1 != 0 :
f1 = g1 - 1
# print(f1,g1,e1)
while e1>0 and z[e1-1] == X[ppa[k+1,f1], e1-1] :
e1 -=1
while div[k+1, f1] <= e1 :
f1 -= 1
# print(f1,g1,e1)
else:
g1 = f1 + 1
# print(f1,g1,e1)
while e1>0 and z[e1 - 1] == X[ppa[k+1,f1], e1-1]:
e1 -= 1
while g1 < M and div[k+1, g1] <= e1 :
g1 += 1
# print(f1,g1,e1)
f = f1
g = g1
e = e1
print(f,g,e)
return f,g,e
if __name__ == '__main__':
build_ppa_div(Y)
# build_ppa(Y)
# for match in report_long_match(Y, 2):
# print(match)
# report_long_matches(Y, 2)
# report_set_max_match(Y)
# report_set_max_match_with_thresh(X,3)
maximal_matches_with_z(y,Y)
@bristena-op
Copy link
Author

init

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment