Skip to content

Instantly share code, notes, and snippets.

@jcausey-astate
Created December 19, 2018 16:41
Show Gist options
  • Save jcausey-astate/2878ac9f359c782d523ba69e838b026b to your computer and use it in GitHub Desktop.
Save jcausey-astate/2878ac9f359c782d523ba69e838b026b to your computer and use it in GitHub Desktop.
"""
Encodes SNP format (TSV) into a numeric matrix (CSV) format.
See docstring in `main()` for more info, or run with --help.
"""
import click
import sys, os, csv, json
@click.command()
@click.argument("input_file", type=click.File("r"))
@click.argument("output_file", required=False, type=click.Path())
@click.option(
"--alleles-column",
default=3,
type=int,
help="Column number (counting from 1) containing the ref/alt allele specification.",
)
@click.option(
"--alleles-mapping",
help="Mapping dictionary in JSON format specifying how to assign values to the reference,"
"alternate, heterozygous, and missing alleles.",
)
@click.option(
"--sample-start-column",
default=6,
type=int,
help="Column number (counting from 1) containing the first sample allele.",
)
def main(input_file, output_file, alleles_column, alleles_mapping, sample_start_column):
"""
Encodes SNP text format (TSV) from INPUT_FILE into a matrix-based
format with numeric values representing the genotypes, which is
written to OUTPUT_FILE. If no OUTPUT_FILE is supplied, one will
be created with the same name as INPUT_FILE and a ".csv" extension.
The sample names from the original file are preserved as column headings,
and the reference IDs (first column) are preserved as row headings.
This program assumes header looks something like this (no need to match
header names exactly):
\b
rs# ltName alleles chrom pos sample1 sample2 ... sampleN
What is important is that alleles is at the 3rd column (by
default), or that you supply the --alleles-col argument.
Also, the samples should start in the 6th column, or you can
supply the --alleles-start-col argument to specify a different
column number.
\b
Encodes as follows, by default:
Homozygous, Ref allele: 1
Homozygous, Alt allele: 2
Heterozygous: 3
Missing value: 0
Ref allele is considered the first listed in the `alleles`
column; alt is the second.
To change the encoding, pass a JSON-encoded dictionary to the
--alleles-mapping argument, with the keys:
["reference", "alternate", "heterozygous", "missing"], and
a value assigned to each one. For example:
{"reference": -1, "alternate": 1, "heterozygous": 0, "missing": 2}
"""
REF, ALT, HET, MISS = (1, 2, 3, 0)
if output_file is None:
output_file = "{}.csv".format(os.path.splitext(input_file.name)[0])
if alleles_mapping is not None:
alleles_mapping = json.loads(alleles_mapping)
REF = alleles_mapping.get("reference", REF)
ALT = alleles_mapping.get("alternate", ALT)
HET = alleles_mapping.get("heterozygous", HET)
MISS = alleles_mapping.get("missing", MISS)
alleles_column -= 1 # The column is human-friendly, so we must make it 0-indexed.
sample_start_column -= 1 # Same thing is true for sample start col.
header = None
with open(output_file, "w") as fout:
writer = csv.writer(fout, dialect="excel")
for idx, line in enumerate(input_file):
if header is None:
header = line.split("\t")[sample_start_column:]
header = [h.strip() for h in header]
header = ["rs#"] + header
writer.writerow(header)
continue
line = line.rstrip("\n").rstrip("\r").split("\t")
alleles = line[alleles_column]
line_hdr = line[0].strip()
line = line[sample_start_column:]
line_ref = alleles[0]
try:
line_alt = alleles.split("/")[1]
except IndexError:
# allele specification like "C" not "C/G". There is no alt.
line_alt = "NONE"
allele_subst = {line_ref: REF, line_alt: ALT, "H": HET, "-": MISS, "": MISS}
output_line = []
try:
output_line = [allele_subst[str(a)] for a in line]
except KeyError as bad_allele:
print(
"\n\nERROR: Unexpected allele found on input line {}: {}.\nExpected one of {} (ref) or {} (alt), or 'H' or '-'.\n\n".format(
idx + 1, bad_allele, line_ref, line_alt
),
file=sys.stderr,
)
sys.exit(1)
output_line = [line_hdr] + output_line
writer.writerow(output_line)
if input_file is not sys.stdin:
input_file.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment