Last active
August 30, 2019 13:55
-
-
Save Nanguage/e646becdd4f5887c8818be8035c5f090 to your computer and use it in GitHub Desktop.
code for processing(coarsen, normalize_use_control) bigwig file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
from typing import Iterable, Tuple, Optional, Union, Callable, TypeVar | |
from functools import partial | |
from contextlib import ContextDecorator | |
from tqdm.auto import tqdm | |
Bin = Tuple[str, int, int] | |
Bigwig = "pyBigWig.bigWigFile" | |
BinValue = Tuple[Bin, float] | |
def get_binsize(bw:Bigwig) -> int: | |
chr_ = list(bw.chroms().keys())[0] | |
bins = bw.intervals(chr_) | |
s,e,v = bins[0] | |
return e - s | |
def get_bins(chrom:str, length:int, binsize:int) -> Iterable[Bin]: | |
nbins = (length + binsize - 1) // binsize | |
for i in range(nbins): | |
s = i * binsize | |
e = min(length, s + binsize) | |
yield chrom, s, e | |
def is_main_chrom(chrom_name:str) -> bool: | |
return bool(re.match("chr[0-9]+$", chrom_name)) | |
T_bin = TypeVar('Bin|BinValue', Bin, BinValue) | |
def filter_chrom(func:Callable, bins:Iterable[T_bin]) -> Iterable[T_bin]: | |
def f_(t): | |
if isinstance(t[1], float): | |
return func(t_[0][0]) | |
else: | |
return func(t[0]) | |
return filter(f_, bins) | |
def stats_bw(bw:Bigwig, bins:Iterable[Bin]) -> Iterable[BinValue]: | |
for c, s, e in bins: | |
v = bw.stats(c, s, e)[0] | |
yield (c, s, e), v | |
class open_bw(object): | |
def __init__(self, path:str, mode:str='r', | |
temp:Optional["open_bw"]=None): | |
self.mode = mode | |
if 'w' in mode: | |
import os | |
if os.path.exists(path): os.remove(path) | |
import pyBigWig as pbw | |
self.bw = pbw.open(path, mode) | |
if 'w' in mode and temp: | |
self.bw.addHeader(list(temp.chrom2len.items())) | |
@property | |
def chrom2len(self) -> Optional[dict]: | |
if 'w' not in self.mode: return self.bw.chroms() | |
def write(self, bin_vals:Iterable[BinValue]): | |
if 'w' not in self.mode: raise IOError(f"{self} is not writable") | |
for (c, s, e), v in bin_vals: | |
recs = [c], [s], [e], [v] | |
self.bw.addEntries(*recs) | |
def read_bins(self, binsize:int) -> Iterable[Bin]: | |
if 'r' not in self.mode: raise IOError(f"{self} is not readable") | |
for chrom, length in self.chrom2len.items(): | |
for bin_ in get_bins(chrom, length, binsize): | |
yield bin_ | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.bw.close() | |
def devide_bins(bin1:Iterable[BinValue], bin2:Iterable[BinValue]) -> Iterable[BinValue]: | |
for (b1, v1), (b2, v2) in zip(bin1, bin2): | |
assert b1 == b2 | |
if v1 and (v2 is not None): | |
new_v = v2 / v1 | |
yield b1, new_v | |
# ------------------------------------------------------------ | |
def coarsen(path:str, output:str, binsize:int): | |
"""Coarsen(make binsize bigger) a bigwig""" | |
with open_bw(path, 'r') as fi: | |
cur_bsize = get_binsize(fi.bw) | |
assert binsize >= cur_bsize | |
with open_bw(output, 'w', temp=fi) as fo: | |
bins = filter_chrom(is_main_chrom, fi.read_bins(binsize)) | |
f_ = partial(filter, lambda t: t[1] is not None) | |
new_vals = f_(stats_bw(fi.bw, bins)) | |
fo.write(new_vals) | |
def normalize_use_control(path_in:str, path_control:str, path_out:str, binsize:int): | |
"""Normalize bigwig with a control.""" | |
with open_bw(path_in, 'r') as fi, open_bw(path_control, 'r') as fc: | |
assert get_binsize(fi.bw) <= binsize | |
f_ = partial(filter_chrom, is_main_chrom) | |
bins_in = stats_bw(fi.bw, f_(fi.read_bins(binsize))) | |
bins_c = stats_bw(fc.bw, f_(fc.read_bins(binsize))) | |
with open_bw(path_out, 'w', temp=fi) as fo: | |
new_bins = devide_bins(bins_c, bins_in) | |
fo.write(new_bins) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment