Create BED with ENSEMBL genes
Script para gerar todas as CDSs de determinada assembly do genoma humano.
Essas coordenadas são usadas para o calculo de cobertura das regiões alvo
do exoma.
from pyensembl import EnsemblRelease
CHROMOSOMES = list(map(str, list(range(1, 24)))) + ["X", "Y", "MT"]
def write_lines(name, lines):
with open(name, "w+") as f:
for l in lines:
def retrieve_all_coding_sequences(database: EnsemblRelease, list_of_transcripts):
all_cds = {}
failed = 0
for p in list_of_transcripts:
all_cds.setdefault(p, []).append(data.db.query_loci("transcript_id", p, "CDS"))
except ValueError:
failed += 1
print(f"Existem {failed} transcritos que não possuem CDS")
return all_cds
def produce_values_for_cds(database: EnsemblRelease, all_cds):
lines = []
for k, v in all_cds.items():
gene = database.gene_name_of_transcript_id(k)
flat_list = [item for sublist in v for item in sublist]
for l in flat_list:
if l.contig not in CHROMOSOMES:
if l.start == l.end:
lines.append([l.contig, l.start, l.end, gene])
return lines
def generate_bed_file(bed_values, assembly):
bed_lines = []
with_chr = True if assembly == "GRCh38" else False
for contig, start, end, gene in bed_values:
if with_chr:
if contig == "MT":
contig = "chrM"
contig = f"chr{contig}"
write_lines(f"all_cds.{assembly}.bed", bed_lines)
if __name__ == "__main__":
import sys
assembly = sys.argv[1]
if assembly not in ("GRCh37", "GRCh38"):
raise ValueError(f"Sem suporte para assembly {assembly}")
# release 75: GRCh37 | 93: GRCh38
data = EnsemblRelease(93)
d = data.transcript_ids()
all_cds = retrieve_all_coding_sequences(data, d)
bed_information = produce_values_for_cds(data, all_cds)
generate_bed_file(bed_information, assembly)
# ex: bedtools sort -i cds_genes.grch37.bed | bedtools merge -c 4 -o distinct > cds_genes.grch37.merged.bed
Script para gerar um bed com todos os genes de ensembl (processed_transcripts ou
Essas coordenadas são usadas para anotar as targets dos kits cadastrados. Ex:
bedtools intersect -a kit.bed -b all_genes_sorted.GRCh37.bed -wb -loj | awk '{print $1"\t"$2"\t"$3"\t"$8}' > kit.GRCh37.bed
bedtools merge -delim "," -o collapse -c 4 -i kit.GRCh37.bed > kit_final.GRCh37.bed
from pyensembl import EnsemblRelease
CHROMOSOMES = list(map(str, list(range(1, 24)))) + ["X", "Y", "MT"]
def write_lines(name, lines):
with open(name, "w+") as f:
for l in lines:
def generate_bed_file(bed_values, assembly):
bed values é uma lista de tupla, com contig, start, end e symbol
bed_lines = []
with_chr = True if assembly == "GRCh38" else False
for contig, start, end, gene in bed_values:
if with_chr:
if contig == "MT":
contig = "chrM"
contig = f"chr{contig}"
write_lines(f"all_genes.{assembly}.bed", bed_lines)
def subset_genes(genes):
bed_values = []
for gene in genes:
if gene.biotype == "protein_coding":
bed_values.append((gene.contig, gene.start, gene.end, gene.gene_name))
if gene.biotype != "processed_transcript":
bed_values.append((gene.contig, gene.start, gene.end, gene.gene_name))
return bed_values
if __name__ == "__main__":
import sys
assemblies = {"GRCh37": 75, "GRCh38": 93}
assembly = sys.argv[1]
if assembly not in assemblies.keys():
raise ValueError(f"Sem suporte para assembly {assembly}")
# release 75: GRCh37 | 93: GRCh38
data = EnsemblRelease(assemblies[assembly])
genes = data.genes()
bed_values = subset_genes(genes)
generate_bed_file(bed_values, assembly)
# ex: bedtools sort -i cds_genes.grch37.bed | bedtools merge -c 4 -o distinct > cds_genes.grch37.merged.bed
Script para split de beds grandes em bed menores para paralelizar algumas etapas
dos pipelines em WDL.
- Exibe quantas partes serão produzidas com o valor padrão (2 000 000)
python scripts/ ~/temp/bed_v3/AMCP3_ENSG_TARGETS_FINAL.bed AMCP3_FIXED dry
- Executa a divisão
python scripts/ ~/temp/bed_v3/AMCP3_ENSG_TARGETS_FINAL.bed AMCP3_FIXED run
def parse_bed(path):
rows = []
with open(path) as f:
for l in f:
chrom, start, end, info = l.strip().split("\t")[:4]
except IndexError:
raise IndexError("Verifique se seu bed possui 4 colunas.")
rows.append([chrom, int(start), int(end), info])
return rows
def define_window_size(rows, desired_parts):
total_size = 0
for r in rows:
__, start, end, __ = r
total_size += end - start
return total_size / int(desired_parts)
def parse_line(line):
chrom, start, end, info = line.strip().split("\t")[:4]
return chrom, int(start), int(end), info
def write_lines(prefix, chunks):
for k, v in chunks.items():
with open(f"{prefix}_{k}.bed", "a+") as f:
for l in v:
def report(elapsed, parts_sizes, size, chunk_number):
for k, v in parts_sizes.items():
print(f"Part {k}: {sum(v)} bp")
print(f"Total bed size: {size}")
print(f"Total parts: {chunk_number}")
print(f"Performed in {round(elapsed, 3)}s")
def split_bed(bedf, out_prefix, dry, remove_chrom_prefix=False, desired_parts=5):
chunk_size = 0
chunk_number = 1
part_size = {}
chunks = {}
size = 0
rows = parse_bed(bedf)
window_size = define_window_size(rows, desired_parts)
for r in rows:
chrom, start, end, info = r
if remove_chrom_prefix:
chrom = chrom.replace("chr", "")
size += end - start
chunk_size += end - start
new_line = f"{chrom}\t{start}\t{end}\t{info}\n"
chunks.setdefault(chunk_number, []).append(new_line)
part_size.setdefault(str(chunk_number), []).append((end - start))
if chunk_size >= window_size:
chunk_size = 0
chunk_number += 1
return chunks, size, chunk_number, part_size
if __name__ == "__main__":
import argparse
import time
parser = argparse.ArgumentParser()
parser.add_argument("--bed", help="path to bed file", required=True)
parser.add_argument("--prefix", help="String to prefix output chunks", required=True)
parser.add_argument("--parts", help="Number of parts to be splitted", required=True)
parser.add_argument("--remove_chrom", help="Remove 'chr' prefix of chromosomes column", action="store_true")
parser.add_argument("--dry", help="flag to specify if is to perform dry run", action="store_true")
args = parser.parse_args()
started_at = time.time()
# window_size = 2000000 # Mude aqui caso queira mais ou menos pedaços
chunks, size, chunk_number, parts_sizes = split_bed(
args.bed, args.prefix, args.dry, remove_chrom_prefix=args.remove_chrom,
if not args.dry:
write_lines(args.prefix, chunks)
elapsed = time.time() - started_at
report(elapsed, parts_sizes, size, chunk_number)
