Skip to content

Instantly share code, notes, and snippets.

@lmtani
Last active February 7, 2023 22:51
Show Gist options
  • Save lmtani/88fe7b25083b4feb814b266a1262460d to your computer and use it in GitHub Desktop.
Save lmtani/88fe7b25083b4feb814b266a1262460d to your computer and use it in GitHub Desktop.
Create BED with ENSEMBL genes
"""
Script para gerar todas as CDSs de determinada assembly do genoma humano.
Essas coordenadas são usadas para o calculo de cobertura das regiões alvo
do exoma.
"""
from pyensembl import EnsemblRelease
CHROMOSOMES = list(map(str, list(range(1, 24)))) + ["X", "Y", "MT"]
def write_lines(name, lines):
with open(name, "w+") as f:
for l in lines:
f.write(l)
def retrieve_all_coding_sequences(database: EnsemblRelease, list_of_transcripts):
all_cds = {}
failed = 0
for p in list_of_transcripts:
try:
all_cds.setdefault(p, []).append(data.db.query_loci("transcript_id", p, "CDS"))
except ValueError:
failed += 1
print(f"Existem {failed} transcritos que não possuem CDS")
return all_cds
def produce_values_for_cds(database: EnsemblRelease, all_cds):
lines = []
for k, v in all_cds.items():
gene = database.gene_name_of_transcript_id(k)
flat_list = [item for sublist in v for item in sublist]
for l in flat_list:
if l.contig not in CHROMOSOMES:
continue
if l.start == l.end:
continue
lines.append([l.contig, l.start, l.end, gene])
return lines
def generate_bed_file(bed_values, assembly):
bed_lines = []
with_chr = True if assembly == "GRCh38" else False
for contig, start, end, gene in bed_values:
if with_chr:
if contig == "MT":
contig = "chrM"
else:
contig = f"chr{contig}"
bed_lines.append(f"{contig}\t{start}\t{end}\t{gene}\n")
write_lines(f"all_cds.{assembly}.bed", bed_lines)
if __name__ == "__main__":
import sys
assembly = sys.argv[1]
if assembly not in ("GRCh37", "GRCh38"):
raise ValueError(f"Sem suporte para assembly {assembly}")
# release 75: GRCh37 | 93: GRCh38
data = EnsemblRelease(93)
d = data.transcript_ids()
all_cds = retrieve_all_coding_sequences(data, d)
bed_information = produce_values_for_cds(data, all_cds)
generate_bed_file(bed_information, assembly)
# ex: bedtools sort -i cds_genes.grch37.bed | bedtools merge -c 4 -o distinct > cds_genes.grch37.merged.bed
"""
Script para gerar um bed com todos os genes de ensembl (processed_transcripts ou
protein_coding).
Essas coordenadas são usadas para anotar as targets dos kits cadastrados. Ex:
bedtools intersect -a kit.bed -b all_genes_sorted.GRCh37.bed -wb -loj | awk '{print $1"\t"$2"\t"$3"\t"$8}' > kit.GRCh37.bed
bedtools merge -delim "," -o collapse -c 4 -i kit.GRCh37.bed > kit_final.GRCh37.bed
"""
from pyensembl import EnsemblRelease
CHROMOSOMES = list(map(str, list(range(1, 24)))) + ["X", "Y", "MT"]
def write_lines(name, lines):
with open(name, "w+") as f:
for l in lines:
f.write(l)
def generate_bed_file(bed_values, assembly):
"""
bed values é uma lista de tupla, com contig, start, end e symbol
"""
bed_lines = []
with_chr = True if assembly == "GRCh38" else False
for contig, start, end, gene in bed_values:
if with_chr:
if contig == "MT":
contig = "chrM"
else:
contig = f"chr{contig}"
bed_lines.append(f"{contig}\t{start}\t{end}\t{gene}\n")
write_lines(f"all_genes.{assembly}.bed", bed_lines)
def subset_genes(genes):
bed_values = []
for gene in genes:
if gene.biotype == "protein_coding":
bed_values.append((gene.contig, gene.start, gene.end, gene.gene_name))
continue
if gene.biotype != "processed_transcript":
bed_values.append((gene.contig, gene.start, gene.end, gene.gene_name))
continue
return bed_values
if __name__ == "__main__":
import sys
assemblies = {"GRCh37": 75, "GRCh38": 93}
assembly = sys.argv[1]
if assembly not in assemblies.keys():
raise ValueError(f"Sem suporte para assembly {assembly}")
# release 75: GRCh37 | 93: GRCh38
data = EnsemblRelease(assemblies[assembly])
genes = data.genes()
bed_values = subset_genes(genes)
generate_bed_file(bed_values, assembly)
# ex: bedtools sort -i cds_genes.grch37.bed | bedtools merge -c 4 -o distinct > cds_genes.grch37.merged.bed
"""
Script para split de beds grandes em bed menores para paralelizar algumas etapas
dos pipelines em WDL.
Ex:
- Exibe quantas partes serão produzidas com o valor padrão (2 000 000)
python scripts/split_bed.py ~/temp/bed_v3/AMCP3_ENSG_TARGETS_FINAL.bed AMCP3_FIXED dry
- Executa a divisão
python scripts/split_bed.py ~/temp/bed_v3/AMCP3_ENSG_TARGETS_FINAL.bed AMCP3_FIXED run
"""
def parse_bed(path):
rows = []
with open(path) as f:
for l in f:
try:
chrom, start, end, info = l.strip().split("\t")[:4]
except IndexError:
raise IndexError("Verifique se seu bed possui 4 colunas.")
rows.append([chrom, int(start), int(end), info])
return rows
def define_window_size(rows, desired_parts):
total_size = 0
for r in rows:
__, start, end, __ = r
total_size += end - start
return total_size / int(desired_parts)
def parse_line(line):
chrom, start, end, info = line.strip().split("\t")[:4]
return chrom, int(start), int(end), info
def write_lines(prefix, chunks):
for k, v in chunks.items():
with open(f"{prefix}_{k}.bed", "a+") as f:
for l in v:
f.write(l)
def report(elapsed, parts_sizes, size, chunk_number):
print("-----")
for k, v in parts_sizes.items():
print(f"Part {k}: {sum(v)} bp")
print("-----")
print(f"Total bed size: {size}")
print(f"Total parts: {chunk_number}")
print(f"Performed in {round(elapsed, 3)}s")
def split_bed(bedf, out_prefix, dry, remove_chrom_prefix=False, desired_parts=5):
chunk_size = 0
chunk_number = 1
part_size = {}
chunks = {}
size = 0
rows = parse_bed(bedf)
window_size = define_window_size(rows, desired_parts)
for r in rows:
chrom, start, end, info = r
if remove_chrom_prefix:
chrom = chrom.replace("chr", "")
size += end - start
chunk_size += end - start
new_line = f"{chrom}\t{start}\t{end}\t{info}\n"
chunks.setdefault(chunk_number, []).append(new_line)
part_size.setdefault(str(chunk_number), []).append((end - start))
if chunk_size >= window_size:
chunk_size = 0
chunk_number += 1
return chunks, size, chunk_number, part_size
if __name__ == "__main__":
import argparse
import time
parser = argparse.ArgumentParser()
parser.add_argument("--bed", help="path to bed file", required=True)
parser.add_argument("--prefix", help="String to prefix output chunks", required=True)
parser.add_argument("--parts", help="Number of parts to be splitted", required=True)
parser.add_argument("--remove_chrom", help="Remove 'chr' prefix of chromosomes column", action="store_true")
parser.add_argument("--dry", help="flag to specify if is to perform dry run", action="store_true")
args = parser.parse_args()
started_at = time.time()
# window_size = 2000000 # Mude aqui caso queira mais ou menos pedaços
chunks, size, chunk_number, parts_sizes = split_bed(
args.bed, args.prefix, args.dry, remove_chrom_prefix=args.remove_chrom, desired_parts=args.parts
)
if not args.dry:
write_lines(args.prefix, chunks)
elapsed = time.time() - started_at
report(elapsed, parts_sizes, size, chunk_number)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment