Created
October 28, 2022 09:57
-
-
Save kpalin/b906c66c1113fb7980cb0af2f5c41c61 to your computer and use it in GitHub Desktop.
Plot sequence read alignments with respect to read (instead to genome)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
Created at Mon Mar 14 15:05:11 2016 by Kimmo Palin <kpalin@merit.ltdk.helsinki.fi> | |
""" | |
def main(): | |
import argparse | |
parser = argparse.ArgumentParser(description="Plot chimeric reads") | |
parser.add_argument( | |
"input", | |
nargs=1, | |
help="Input bam file for plotting [default:%(default)s]", | |
) | |
parser.add_argument( | |
"-r", | |
"--region", | |
default=None, | |
help="Region of primary alignments to plot[default:%(default)s]", | |
) | |
parser.add_argument( | |
"-o", | |
"--outbase", | |
default=None, | |
help="Output base for figures. Files will be named 'outbase.qname.png' [default:%(default)s]", | |
) | |
parser.add_argument( | |
"--also_singlemaps", | |
default=False, | |
action="store_true", | |
help="Output also boring plots with single alignment [default:%(default)s]", | |
) | |
parser.add_argument( | |
"--mapq", | |
default=-1, | |
type=int, | |
help="Minimum mapping quality aligment to use [default:%(default)s]", | |
) | |
parser.add_argument( | |
"-V", | |
"--verbose", | |
default=False, | |
action="store_true", | |
help="Be more verbose with output [and log to a file] [default:%(default)s]", | |
) | |
args = parser.parse_args() | |
import logging | |
if args.verbose: | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s:%(funcName)s:%(levelname)s:%(message)s", | |
) | |
import matplotlib | |
matplotlib.use("agg") | |
return args | |
from collections import namedtuple | |
SAtype = namedtuple( | |
"SupplementaryAlignment", ["rname", "pos", "strand", "CIGAR", "mapQ", "NM"] | |
) | |
def parseSupplementaryAlignments(SAtag: str) -> SAtype: | |
r = [] | |
for SA in SAtag.rstrip(";").split(";"): | |
p = SA.split(",") | |
p[1] = int(p[1]) - 1 # Zero based coordinates | |
p[4] = int(p[4]) | |
p[5] = int(p[5]) | |
r.append(SAtype(*p)) | |
return r | |
def plot_chimera(samFile, aln, ax=None, max_read_name_len=24, MQlimit=-1): | |
"Plot chimeric sam read alignment" | |
import pysam | |
assert ( | |
not aln.is_supplementary | |
), "Can't yet plot supplementary alignments, please find the primary bam record" | |
if ax is None: | |
import matplotlib.pyplot as plt | |
ax = plt.gca() | |
colors = [ | |
(0.0, 0.0, 0.0), | |
(0.2980392156862745, 0.4470588235294118, 0.6901960784313725), | |
(0.3333333333333333, 0.6588235294117647, 0.40784313725490196), | |
(0.7686274509803922, 0.3058823529411765, 0.3215686274509804), | |
(0.5058823529411764, 0.4470588235294118, 0.6980392156862745), | |
(0.8, 0.7254901960784313, 0.4549019607843137), | |
(0.39215686274509803, 0.7098039215686275, 0.803921568627451), | |
] | |
mappedChrPos = {} | |
prevArrows = [] | |
def plotAln(tmpAln, ax, ypos, refname): | |
"Plot alignment tmpAln on axes ax, height ypos and reference name refname. Return label." | |
import logging as log | |
if tmpAln.is_reverse: | |
sStart = seqLen - tmpAln.qend | |
sEnd = seqLen - tmpAln.qstart | |
x = sEnd | |
dx = sStart - sEnd | |
else: | |
sStart, sEnd = tmpAln.qstart, tmpAln.qend | |
x = sStart | |
dx = sEnd - sStart | |
ax.hlines(ypos, sStart, sEnd, color=colors[ypos % len(colors)]) | |
ax.arrow( | |
x, | |
ypos, | |
dx, | |
0, | |
width=0.1, | |
head_width=0.3, | |
head_length=0.05 * seqLen, | |
length_includes_head=True, | |
color=colors[ypos % len(colors)], | |
) | |
rLabel = "%s:%d-%d MQ:%d" % ( | |
refname, | |
tmpAln.reference_start + 1, | |
tmpAln.reference_end, | |
tmpAln.mapq, | |
) | |
arrowData = ( | |
refname, | |
tmpAln.reference_start, | |
tmpAln.reference_end, | |
x, | |
ypos, | |
dx, | |
set(tmpAln.get_reference_positions()), | |
) | |
# Shading the alignment overlaps on reference sequence. | |
for oldArrow in prevArrows: | |
if ( | |
oldArrow[0] == arrowData[0] | |
and oldArrow[1] < arrowData[2] | |
and arrowData[1] < oldArrow[2] | |
): | |
from matplotlib.patches import Polygon | |
_, oldRefStart, oldRefEnd, oldX, oldY, oldDX, oldRefPos = oldArrow | |
sharedPos = oldRefPos.intersection(arrowData[6]) | |
try: | |
shareRefMin, shareRefMax = min(sharedPos), max(sharedPos) | |
except ValueError: | |
shareRefMin = max(oldRefStart, arrowData[1]) | |
shareRefMax = min(oldRefEnd, arrowData[2]) | |
log.info( | |
"No shared positions on {}:{}-{}".format( | |
oldArrow[0], shareRefMin, shareRefMax | |
) | |
) | |
# print "Sharing region:",shareRefMin,shareRefMax | |
refStartGapOld = shareRefMin - oldRefStart | |
refEndGapOld = oldRefEnd - shareRefMax | |
if oldDX > 0: | |
oldVertices = [ | |
[oldX + refStartGapOld, oldY], | |
[oldX + oldDX - refEndGapOld, oldY], | |
] | |
else: | |
oldVertices = [ | |
[oldX - refStartGapOld, oldY], | |
[oldX + oldDX + refEndGapOld, oldY], | |
] | |
refStartGapNew = shareRefMin - tmpAln.reference_start | |
refEndGapNew = tmpAln.reference_end - shareRefMax | |
if dx > 0: | |
newVertices = [ | |
[x + refStartGapNew, ypos], | |
[x + dx - refEndGapNew, ypos], | |
] | |
else: | |
newVertices = [ | |
[x - refStartGapNew, ypos], | |
[x + dx + refEndGapNew, ypos], | |
] | |
log.info(str([oldDX, oldVertices])) | |
log.info(str([dx, newVertices])) | |
if ( | |
dx * oldDX > 0 | |
): # Flip the shading back if both are in reverse WHY??? | |
newVertices = newVertices[::-1] | |
vertices = newVertices + oldVertices | |
log.info(str(vertices)) | |
p = Polygon( | |
vertices, | |
alpha=0.4, | |
color=colors[oldArrow[4] % len(colors)], | |
fill=True, | |
) | |
ax.add_patch(p) | |
pass | |
prevArrows.append(arrowData) | |
return rLabel | |
lineLabels = [] | |
seqLen = len(aln.seq) | |
ax.hlines(0, 0, seqLen, color=colors[0]) | |
lineLabels.append(aln.qname[:max_read_name_len]) | |
lineLabels.append(plotAln(aln, ax, 1, samFile.getrname(aln.reference_id))) | |
if aln.has_tag("SA"): | |
SAs = parseSupplementaryAlignments(aln.get_tag("SA")) | |
else: | |
SAs = [] | |
SAs = [x for x in SAs if x.mapQ >= MQlimit] | |
import logging as log | |
for ypos, SA in enumerate(SAs): | |
a = pysam.AlignedSegment() | |
a.flag = 0 | |
a.flag = 16 if SA.strand == "-" else 0 | |
a.seq = aln.seq | |
a.cigarstring = SA.CIGAR | |
a.reference_start = SA.pos # pysam is 0 based | |
a.mapq = SA.mapQ | |
log.info(SA) | |
log.info(a) | |
lineLabels.append(plotAln(a, ax, ypos + 2, SA.rname)) | |
ax.set_ylim(-0.5, 1.5 + len(SAs)) | |
ax.set_xlim(-0.1 * seqLen, 1.1 * seqLen) | |
ax.set_yticks(list(range(len(lineLabels)))) | |
ax.set_yticklabels(lineLabels) | |
import logging as log | |
log.info("\n".join("{}\t{}".format(lineLabels[0], x) for x in lineLabels[1:])) | |
# ax.legend() | |
def plot_all(inbam, outbase, region=None, also_singletons=False, MQlimit=-1): | |
""" | |
Arguments: | |
- `inbam`: | |
- `outbase`: | |
""" | |
import matplotlib.pyplot as plt | |
import pysam | |
import logging as log | |
log.info("Reading {}".format(inbam)) | |
bam = pysam.Samfile(inbam) | |
alnIt = iter(bam) if region is None else bam.fetch(region=region) | |
out_fmt = outbase + ".{}.png" if outbase is not None else "{}.png" | |
for r in alnIt: | |
if ( | |
r.seq is not None | |
and not r.is_supplementary | |
and (also_singletons or r.has_tag("SA")) | |
): | |
log.info("Plotting {}".format(r.qname)) | |
fig = plt.figure() | |
ax = fig.gca() | |
try: | |
plot_chimera(bam, r, ax, MQlimit=MQlimit) | |
plt.tight_layout() | |
fig.savefig(out_fmt.format(r.qname)) | |
plt.close(fig) | |
except ValueError as e: | |
log.warning(str(e)) | |
if __name__ == "__main__": | |
args = main() | |
plot_all(args.input[0], args.outbase, args.region, args.also_singlemaps, args.mapq) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment