Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@Nanguage
Last active August 30, 2019 13:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Nanguage/e646becdd4f5887c8818be8035c5f090 to your computer and use it in GitHub Desktop.
Save Nanguage/e646becdd4f5887c8818be8035c5f090 to your computer and use it in GitHub Desktop.
code for processing(coarsen, normalize_use_control) bigwig file
import os
import re
from typing import Iterable, Tuple, Optional, Union, Callable, TypeVar
from functools import partial
from contextlib import ContextDecorator
from tqdm.auto import tqdm
Bin = Tuple[str, int, int]
Bigwig = "pyBigWig.bigWigFile"
BinValue = Tuple[Bin, float]
def get_binsize(bw:Bigwig) -> int:
chr_ = list(bw.chroms().keys())[0]
bins = bw.intervals(chr_)
s,e,v = bins[0]
return e - s
def get_bins(chrom:str, length:int, binsize:int) -> Iterable[Bin]:
nbins = (length + binsize - 1) // binsize
for i in range(nbins):
s = i * binsize
e = min(length, s + binsize)
yield chrom, s, e
def is_main_chrom(chrom_name:str) -> bool:
return bool(re.match("chr[0-9]+$", chrom_name))
T_bin = TypeVar('Bin|BinValue', Bin, BinValue)
def filter_chrom(func:Callable, bins:Iterable[T_bin]) -> Iterable[T_bin]:
def f_(t):
if isinstance(t[1], float):
return func(t_[0][0])
else:
return func(t[0])
return filter(f_, bins)
def stats_bw(bw:Bigwig, bins:Iterable[Bin]) -> Iterable[BinValue]:
for c, s, e in bins:
v = bw.stats(c, s, e)[0]
yield (c, s, e), v
class open_bw(object):
def __init__(self, path:str, mode:str='r',
temp:Optional["open_bw"]=None):
self.mode = mode
if 'w' in mode:
import os
if os.path.exists(path): os.remove(path)
import pyBigWig as pbw
self.bw = pbw.open(path, mode)
if 'w' in mode and temp:
self.bw.addHeader(list(temp.chrom2len.items()))
@property
def chrom2len(self) -> Optional[dict]:
if 'w' not in self.mode: return self.bw.chroms()
def write(self, bin_vals:Iterable[BinValue]):
if 'w' not in self.mode: raise IOError(f"{self} is not writable")
for (c, s, e), v in bin_vals:
recs = [c], [s], [e], [v]
self.bw.addEntries(*recs)
def read_bins(self, binsize:int) -> Iterable[Bin]:
if 'r' not in self.mode: raise IOError(f"{self} is not readable")
for chrom, length in self.chrom2len.items():
for bin_ in get_bins(chrom, length, binsize):
yield bin_
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.bw.close()
def devide_bins(bin1:Iterable[BinValue], bin2:Iterable[BinValue]) -> Iterable[BinValue]:
for (b1, v1), (b2, v2) in zip(bin1, bin2):
assert b1 == b2
if v1 and (v2 is not None):
new_v = v2 / v1
yield b1, new_v
# ------------------------------------------------------------
def coarsen(path:str, output:str, binsize:int):
"""Coarsen(make binsize bigger) a bigwig"""
with open_bw(path, 'r') as fi:
cur_bsize = get_binsize(fi.bw)
assert binsize >= cur_bsize
with open_bw(output, 'w', temp=fi) as fo:
bins = filter_chrom(is_main_chrom, fi.read_bins(binsize))
f_ = partial(filter, lambda t: t[1] is not None)
new_vals = f_(stats_bw(fi.bw, bins))
fo.write(new_vals)
def normalize_use_control(path_in:str, path_control:str, path_out:str, binsize:int):
"""Normalize bigwig with a control."""
with open_bw(path_in, 'r') as fi, open_bw(path_control, 'r') as fc:
assert get_binsize(fi.bw) <= binsize
f_ = partial(filter_chrom, is_main_chrom)
bins_in = stats_bw(fi.bw, f_(fi.read_bins(binsize)))
bins_c = stats_bw(fc.bw, f_(fc.read_bins(binsize)))
with open_bw(path_out, 'w', temp=fi) as fo:
new_bins = devide_bins(bins_c, bins_in)
fo.write(new_bins)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment