Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@p7k
Last active May 23, 2018 17:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save p7k/ef55af29411cc85a8bdf07cdc7916cfc to your computer and use it in GitHub Desktop.
Save p7k/ef55af29411cc85a8bdf07cdc7916cfc to your computer and use it in GitHub Desktop.
"""Translate a Tandem Repeat Finder file into a bed file containing
annotated repeat intervals.
> This file is a text file which contains the same information, in the same
order, as the summary table file, plus consensus pattern and repeat sequences
http://tandem.bu.edu/trf/trf.definitions.html#table
"""
from __future__ import division
import argparse
import collections
import functools
import re
import attr
@attr.s(slots=True, frozen=True)
class DATRecord(object):
"""
example:
32781 32798 9 2.0 9 100 0 18 44 33 22 0 1.53 ACCAGACAG ACCAGACAGACCAGACAG
"""
start = attr.ib(convert=int)
stop = attr.ib(convert=int)
repeat_period_size = attr.ib(convert=int)
number_of_copies = attr.ib(convert=float)
consensus_pattern_size = attr.ib(convert=int)
percent_matches = attr.ib(convert=int)
percent_indels = attr.ib(convert=int)
alignment_score = attr.ib(convert=int)
percent_composition_A = attr.ib(convert=int)
percent_composition_C = attr.ib(convert=int)
percent_composition_G, = attr.ib(convert=int)
percent_composition_T = attr.ib(convert=int)
entropy = attr.ib(convert=float)
consensus_pattern = attr.ib()
repeat_seq = attr.id()
def zero_based_exclusive(self):
return attr.assoc(self, start=(self.start - 1))
# Sequence: 1 dna:chromosome chromosome:GRCh37:1:1:249250621:1
# Sequence: NM_001316362.1_PRKRA_from_6_ssto_hap7
SEQUENCE_NAME_RE = re.compile(r'Sequence:\s+(?P<name>\S+)')
BedRecord = collections.namedtuple('BedRecord', (
'contig', 'start', 'stop', 'unit', 'count'))
DAT_FILTERS = dict(
filter_min_match=lambda val, dat: int(dat.percent_matches) >= val,
filter_max_indel=lambda val, dat: int(dat.percent_indels) <= val,
filter_min_count=lambda val, dat: float(dat.number_of_copies) >= val)
def parse_dat_records(stream):
sequence_name = None
for line in stream:
try:
dat_record = DATRecord(line.split())
assert sequence_name is not None, 'sequence name must be set'
yield (sequence_name, dat_record)
except TypeError:
try:
sequence_name = SEQUENCE_NAME_RE.match(line).group('name')
except AttributeError:
continue
def trf_dat_to_bed(in_dat, out_bed, filters):
"""Post process TandemRepeatFinder output DAT stream into a BED stream.
Args:
in_dat (io.stream): input dat descriptor.
out_bed (io.stream): output bed descriptor.
filters (list<func(dat_record>)): dat record filter unary functions.
"""
# skip empty lines
non_empty_lines = (line for line in in_dat if line != '')
# parse and filter dat records
dat_records = parse_dat_records(non_empty_lines)
dat_records_filtered = (
(seq_name, dat_rec) for seq_name, dat_rec in dat_records
if all(_filter(dat_rec) for _filter in filters))
# prepare bed records
bed_records = (BedRecord(
contig=seq_name,
# TRF is 1 - based
start=int(dat_rec.start) - 1,
stop=dat_rec.stop,
unit=dat_rec.consensus_pattern,
count=float(dat_rec.number_of_copies)
) for seq_name, dat_rec in dat_records_filtered)
# write bed records
for bed_record in bed_records:
out_bed.write(
'{contig}\t{start}\t{stop}\tunit={unit};count={count}\n'.format(
**bed_record._asdict()))
def parse_args():
parser = argparse.ArgumentParser(
description=trf_dat_to_bed.__doc__.split('\n')[0],
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'in_dat', type=argparse.FileType('r'), help='Input TRF data file.')
parser.add_argument(
'out_bed', type=argparse.FileType('w'), help='Output BED file.')
def filter_type(_type):
return lambda val: None if val == 'off' else _type(val)
filter_group = parser.add_argument_group(
title='filters', description='("off" turns off the filter)')
filter_group.add_argument(
'--filter-min-match', type=filter_type(int), default=99,
help='Exclude records below match percent.')
filter_group.add_argument(
'--filter-max-indel', type=filter_type(int), default=0,
help='Exclude records above indel percent.')
filter_group.add_argument(
'--filter-min-count', type=filter_type(float), default=2.0,
help='Exclude records below count value.')
# prepare filter functions
raw_args = parser.parse_args()
filters = tuple(
# partial application -> unary
functools.partial(DAT_FILTERS[arg_name], arg_value)
for arg_name, arg_value in vars(raw_args).iteritems()
if arg_name.startswith('filter') and arg_value is not None)
args = argparse.Namespace(
filters=filters, **{
name: value for name, value in vars(raw_args).iteritems()
if not name.startswith('filter')})
return args
if __name__ == '__main__':
trf_dat_to_bed(**vars(parse_args()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment