Skip to content

Instantly share code, notes, and snippets.

@alexiswl
Last active May 31, 2018 02:05
Show Gist options
  • Save alexiswl/b52884c82e7afd28aec7bcda20b6f5fa to your computer and use it in GitHub Desktop.
Save alexiswl/b52884c82e7afd28aec7bcda20b6f5fa to your computer and use it in GitHub Desktop.
Minimap2 CS String Pandas Implementation: Querying genomic alignments through Pandas Dataframes
#!/usr/bin/env python3
import pandas as pd
import pysam
import re
import numpy as np
def cs_to_pd():
"""
Takes in an aligned segment that contains a cs tag.
Return a pandas DataFrame with the following columns.
Read, Reference, Read_Pos, Ref_Pos
"""
cs_string = get_tag_cs()
refer_start = 0
refer_end = 0
read_index = 0
ref_index = 0
keys = "[=+-*][ACGTacgt]+"
rows = []
for key in re.match(keys, cs_string):
if key.startswith("="): # A set of matches!
contiguous_ref_start = refer_start + ref_index
contiguous_ref_end = refer_start + ref_index + len(key[1:])
contiguous_read_start = read_index
contiguous_read_end = read_index + len(key[1:])
for nuc in key[1:]:
rows.append([nuc, nuc, refer_start+ref_index, read_index,
contiguous_read_start, contiguous_read_end,
contiguous_ref_start, contiguous_ref_end])
read_index += 1
ref_index += 1
elif key.startswith("-"): # A deletion
contiguous_ref_start = refer_start + ref_index
contiguous_ref_end = refer_start + ref_index + len(key[1:])
contiguous_read_start = None
contiguous_read_end = None
for nuc in key[1:]:
rows.append([None, nuc, refer_start+ref_index, ref_index,
contiguous_read_start, contiguous_read_end,
contiguous_ref_start, contiguous_ref_end])
ref_index += 1
elif key.startswith("+"): # An insertion
contiguous_ref_start = None
contiguous_ref_end = None
contiguous_read_start = read_index
contiguous_read_end = read_index + len(key[1:])
for nuc in key[1:]:
rows.append([nuc, None, refer_start+ref_index, read_index,
contiguous_read_start, contiguous_read_end,
contiguous_ref_start, contiguous_ref_end] )
read_index += 1
elif key.startswith("*"): # A substitution
rows.append([key[2], key[1], refer_start+ref_index, read_index,
contiguous_read_start, contiguous_read_end,
contiguous_ref_start, contiguous_ref_end])
read_index += 1
ref_index += 1
return pd.DataFrame(rows, columns=["ReadNuc", "RefNuc", "ReadPos", "RefPos",
"ReadStartContiguous", "RefStartContiguous",
"ReadEndContiguous", "RefEndContiguous"])
def get_insertions(cs_pd, boundaries=0):
insertions = cs_pd.query("RefNuc==None")[["ReadStartContiguous", "ReadEndContiguous"]].drop_duplicates().index.tolist()
insertions_w_bound = []
for i in insertions:
start = max(i - boundaries, 0)
end = min(i + boundaries, cs_pd.shape()[0])
steps = end - start + 1
insertions_w_bound.append(np.linspace(start, end, num=steps))
return cs_pd.iloc(insertions_w_bound)
def get_mismatches(cs_pd, boundaries=0):
mismatches = cs_pd.query("ReadNuc != None & RefNuc != None & ReadNuc != RefNuc").index
mismatches_w_bound = []
for i in mismatches:
start = max(i - boundaries, 0)
end = min(i + boundaries, cs_pd.shape()[0])
steps = end - start + 1
mismatches.append(np.linspace(start, end, num=steps))
def get_deletions(cs_pd, boundaries=0):
deletions = cs_pd.query("ReadNuc==None")[["ReadStartContiguous", "ReadEndContiguous"]].drop_duplicates().index.tolist()
deletions_w_bound = []
for i in deletions:
start = max(i - boundaries, 0)
end = min(i + boundaries, cs_pd.shape()[0])
steps = end - start + 1
deletions_w_bound.append(np.linspace(start, end, num=steps))
return cs_pd[deletions_w_bound]
def get_pattern(cs_pd, pattern, boundaries=0, reference=None, read=None):
if reference is not None:
ref_series = cs_pd.query("RefNuc != None")['RefNuc']
match_index_s = ref_series[pd.DataFrame([ref_series.shift(-i) == p
for i, p in enumerate(pattern)]).all()].index.tolist()
match_index_e = ref_series[pd.DataFrame([ref_series.shift(+i) == p
for i, p in enumerate(reversed(pattern))]).all()].index.tolist()
matches = [cs_pd[max(start - boundaries, 0):min(stop + boundaries, cs_pd.shape()[0])]
for start, stop in zip(match_index_s, match_index_e)]
return matches
def get_homopolymers(cs_pd, boundaries=0):
pattern = "AAAA"
get_pattern()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment