Skip to content

Instantly share code, notes, and snippets.

@dceoy
Last active May 19, 2020 17:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dceoy/236cc9147094fa00c0dab797d54700c1 to your computer and use it in GitHub Desktop.
Save dceoy/236cc9147094fa00c0dab797d54700c1 to your computer and use it in GitHub Desktop.
[Python] Extract lowercase regions in a FASTA file and write them into a BED file
#!/usr/bin/env python
import argparse
import bz2
import gzip
import logging
import os
from pathlib import Path
from pprint import pformat
import numpy as np
import pandas as pd
__version__ = 'v0.0.1'
def main():
args = _parse_options()
_set_log_config(args=args)
logger = logging.getLogger(__name__)
logger.debug('args:' + os.linesep + pformat(vars(args)))
logger.info(f'Write BED:\t{args.bed_path}')
with open(Path(args.bed_path).resolve(), 'w') as f:
for chrom, seq in _generate_chrom_seq(fa_path=args.fa_path):
logger.info(f'chrom:\t{chrom}')
df = _detect_lowercase_bed_df(chrom, seq)
logger.info(f'df:{os.linesep}{df}')
df.to_csv(f, sep='\t', header=None, index=None)
print(f'{chrom}:\t{df.shape[0]} regions', flush=True)
def _detect_lowercase_bed_df(chrom, seq):
a_diff = np.diff(
np.array([0] + [s.islower() for s in seq] + [0]).astype(int)
)
a_start = np.where(a_diff == 1)[0]
a_end = np.where(a_diff == -1)[0]
return pd.DataFrame({
'chrom': chrom, 'chromStart': a_start,
'chromEnd': np.append(a_end, len(seq))[:len(a_start)]
})
def _generate_chrom_seq(fa_path):
logger = logging.getLogger(__name__)
logger.info(f'Read FASTA:\t{fa_path}')
with _open_readable_file(path=fa_path) as f:
chrom = None
seq = ''
for s in f:
if s.startswith('>'):
prev_chrom = chrom
prev_seq = seq
chrom = s.strip()[1:]
seq = ''
if prev_chrom and prev_seq:
yield prev_chrom, prev_seq
elif s:
seq += s.strip()
yield chrom, seq
def _open_readable_file(path):
p = Path(path).resolve()
if path.endswith('.gz'):
return gzip.open(p, mode='rt')
elif path.endswith('.bz2'):
return bz2.open(p, mode='rt')
else:
return open(p, mode='r')
def _set_log_config(args):
if args.debug:
lv = logging.DEBUG
elif args.info:
lv = logging.INFO
else:
lv = logging.WARNING
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S', level=lv
)
def _parse_options():
parser = argparse.ArgumentParser(
prog=__file__,
description=(
'Extract lowercase regions in a FASTA and write them into a BED'
)
)
parser.add_argument(
'fa_path', type=str, help='Path to an input FASTA file'
)
parser.add_argument(
'bed_path', type=str, help='Path to an output BED file'
)
parser.add_argument(
'--version', action='version', version=f'%(prog)s\t{__version__}'
)
log_parser = parser.add_mutually_exclusive_group()
log_parser.add_argument(
'--debug', dest='debug', action='store_true',
help='Log with DEBUG level'
)
log_parser.add_argument(
'--info', dest='info', action='store_true',
help='Log with INFO level'
)
return parser.parse_args()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment