Skip to content

Instantly share code, notes, and snippets.

@ShujiaHuang
Last active June 19, 2023 09:43
Show Gist options
  • Save ShujiaHuang/d53c366ce860f2f8d1d05ac4ed100846 to your computer and use it in GitHub Desktop.
Save ShujiaHuang/d53c366ce860f2f8d1d05ac4ed100846 to your computer and use it in GitHub Desktop.
def merge_region(position_region, delta=1):
"""Merge a batch of sorted region
Parameters
----------
``position_region``: a list like, required
A regions (2D) array, format like: [[start1,end1], [start2,end2], ...]
``delta``: Integer, optinal
Example
-------
>>> merge_region([[1,1], [2,3], [4,6], [4,5], [8, 20], [9, 12]])
... [[1, 6], [8, 20]]
"""
# sorted
position_region.sort(key=lambda x: x[0])
m_region = []
pre_pos, start, end = None, None, None
flag = False
for s, e in position_region:
if s > e:
sys.stderr.write("[ERROR]Your region start > end. It is not allow when call Merge function\n")
sys.exit(1)
if pre_pos is None:
# The light is on => Get the region!
if flag:
m_region.append([start, end])
start, end = s, e
flag = True
else:
if pre_pos > s:
sys.stderr.write("[ERROR] The array hasn't been sorted.\n")
sys.exit(1)
if delta + end >= s:
end = e if e > end else end
else:
m_region.append([start, end])
start, end = s, e
pre_pos = s
if flag:
m_region.append([start, end])
return m_region
"""Merge
Author: Shujia Huang
Date: 2021-12-17 09:59:04
"""
import sys
from datetime import datetime
class BedGenerator(object):
def __init__(self):
self.chromosome = ""
self.start = -1
self.last = -1
self.depth = -1
def add_position(self, chromo, pos, depth):
if self.chromosome == "":
self.chromosome = chromo
self.start = pos
self.last = pos
self.depth = depth
elif (chromo == self.chromosome) and (pos == self.last + 1) and (depth == self.depth):
# Merge
self.last = pos
else:
# Output: bed format with 1-base
print(self.chromosome, self.start, self.last, self.depth, sep="\t")
# reset the value
self.chromosome = chromo
self.start = pos
self.last = pos
self.depth = depth
def out_last_pos(self):
if self.last != -1:
print(self.chromosome, self.start, self.last, self.depth, sep="\t")
if __name__ == "__main__":
START_TIME = datetime.now()
bed = BedGenerator()
n = 0
for line in sys.stdin: # $ samtools depth --reference ref.fa in.cram | python merge_samtools_depth.py > out.bed
"""
Input:
chr1 10004 1
chr1 10005 1
chr1 10006 1
chr1 10007 2
chr1 10008 2
chr1 10009 2
chr1 10010 3
Output:
chr1 10004 10006 1
chr1 10007 10009 2
chr1 10010 10010 3
"""
col = line.strip().split()
chrom, pos, depth = col[0], int(col[1]), int(col[2])
bed.add_position(chrom, pos, depth)
n += 1
if n % 1000000 == 0:
sys.stderr.write(f"[INFO] Parsing {n} lines and hit: {chrom} - {pos}\n")
sys.stderr.write(f"[INFO] Parsed total {n} lines and hit the end position: {chrom} - {pos}\n")
bed.out_last_pos()
elapsed_time = datetime.now() - START_TIME
sys.stderr.write(f"\n** process done, {elapsed_time.seconds} seconds elapsed **\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment