Skip to content

Instantly share code, notes, and snippets.

@TTTPOB
Created January 18, 2022 02:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TTTPOB/d0a3e4fad9108a52b48ff6e6de57eaf6 to your computer and use it in GitHub Desktop.
Save TTTPOB/d0a3e4fad9108a52b48ff6e6de57eaf6 to your computer and use it in GitHub Desktop.
convert gtf provided by gencode to bed12 file that suits pyGenomeTracks(well, it can handle gtf files but absurdly slow)
#!/usr/bin/env python3
from typing import Dict, List
import pandas as pd
import click
import numpy as np
from pathlib import Path
from tqdm import tqdm
import subprocess
def count_file_line(filename: str) -> int:
"""
Count the number of lines in a file.
using wc -l
"""
lines = subprocess.check_output("wc -l {}".format(filename), shell=True).split()[0]
lines = int(lines)
return lines
def parse_gtf_line(line: str) -> dict:
"""
Parse a line from a GTF file.
"""
fields = line.split("\t")
return {
"seqnames": fields[0],
"source": fields[1],
"feature": fields[2],
"start": int(fields[3]),
"end": int(fields[4]),
"score": fields[5],
"strand": fields[6],
"frame": fields[7],
"attribute": fields[8],
}
def parse_gtf_attribute(attribute: str) -> dict:
"""
Parse the attribute field from a GTF line.
"""
fields = [field.strip(" ") for field in attribute.split(";") if field != ""]
attribute_dict = {}
for field in fields:
key, value = field.split(" ")
value = value.replace('"', "")
attribute_dict[key] = value
return attribute_dict
def parse_gtf(gtf_path: str) -> pd.DataFrame:
"""
Parse a GTF file.
"""
gtf_line_list = []
linecount = count_file_line(gtf_path)
with open(gtf_path, "r") as f:
for line in tqdm(f, total=linecount):
if line.startswith("#"):
continue
line_dict = parse_gtf_line(line.strip())
attribute_dict = parse_gtf_attribute(line_dict["attribute"])
line_dict.pop("attribute")
line_dict.update(attribute_dict)
gtf_line_list.append(line_dict)
return gtf_line_list
def concat_parsed_lines(parsed_lines: List[Dict]) -> pd.DataFrame:
"""
Concatenate a list of parsed lines.
"""
df = pd.DataFrame(parsed_lines)
return df
def write_to_file(df: pd.DataFrame, filename: str):
"""
Write a dataframe to a file.
"""
df.to_csv(filename, sep="\t", index=False)
def trim_unused_column(df: pd.DataFrame) -> pd.DataFrame:
used_column = ["seqnames", "start", "end", "gene_name", "score", "strand"]
return df[used_column]
def remove_transcript(df: pd.DataFrame):
without_transcript = df.query("feature != 'transcript'")
return without_transcript
def gtf_df_to_bed12(df: pd.DataFrame) -> pd.DataFrame:
without_transcript = remove_transcript(df)
gene_row_list = []
for gene_name, group in tqdm(without_transcript.groupby("gene_name")):
# gene seqnames, start, end, and strand
group = group.sort_values("start")
gene_row = group.loc[group["feature"] == "gene", :]
seqname = gene_row.seqnames.values[0]
start = gene_row.start.values[0]
end = gene_row.end.values[0]
strand = gene_row.strand.values[0]
# print(seqname, start, end, strand)
thick_start = start
thick_end = start
score = 0
item_rgb = "0,0,0"
exons = group.loc[group["feature"].values == "exon", :]
block_count = exons.shape[0]
block_starts_array = exons["start"].values - start
block_ends_array = exons["end"].values - start
block_size_array = block_ends_array - block_starts_array
# no, we cannot force them to be unique
# block_size_array = np.unique(block_size_array)
# block_starts_array = np.unique(block_starts_array)
block_sizes = ",".join(map(str, block_size_array))
block_starts = ",".join(map(str, block_starts_array))
result_gene_row = {
"seqname": seqname,
"start": start,
"end": end,
"gene_name": gene_name,
"score": score,
"strand": strand,
"thick_start": thick_start,
"thick_end": thick_end,
"item_rgb": item_rgb,
"block_count": block_count,
"block_sizes": block_sizes,
"block_starts": block_starts,
}
gene_row_list.append(result_gene_row)
bed12_df = pd.DataFrame(gene_row_list).sort_values(["seqname", "start"])
return bed12_df
def write_without_colnames(df: pd.DataFrame, filename: str):
df.to_csv(filename, sep="\t", index=False, header=False)
@click.command()
@click.option("-i", "--gtf", help="path to gtf csv file")
@click.option("-o", "--output-bed12", help="path to output bed12 file")
def main(gtf, output_bed12):
click.echo("parsing gtf lines", err=True)
lines = parse_gtf(gtf)
click.echo("making dataframe", err=True)
df = concat_parsed_lines(lines)
click.echo("converting to bed12", err=True)
df = gtf_df_to_bed12(df)
click.echo("writing bed12", err=True)
df = write_without_colnames(df, output_bed12)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment