Last active
February 25, 2024 12:17
-
-
Save FirasSadiyah/2854cf51579928a159ad8d28b623bd71 to your computer and use it in GitHub Desktop.
python-exception-handling.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
class EmptyDataFrameException(Exception): | |
pass | |
# create a dataframe with some genome variants | |
vcf_df = pd.DataFrame({"chrom": ["chr1", "chr2", "chr3", "chr4"], | |
"start": [1000, 2000, 3000, 4000], | |
"end": [1010, 2010, 3010, 4010], | |
"ref": ["A", "C", "G", "T"], | |
"alt": ["T", "G", "C", "A"]}) | |
def filter_by_region(df): | |
# filter by a region of interest, this operation will return an empty dataframe | |
roi_chrom = "chr1" | |
roi_start = 0 | |
roi_end = 5000 | |
# filter the dataframe by the chromosome and the overlapping range | |
return df[(df["chrom"] == roi_chrom) & (df["start"] < roi_end) & (df["end"] > roi_start)].copy() | |
def calculate_vaf(df): | |
# calculate the variant allele frequency | |
if not df.empty: # check if the dataframe is empty | |
df.loc[:, "vaf"] = df["alt"].apply(lambda x: x.count(",") + 1) / (df["ref"].apply(len) + df["alt"].apply(len)) | |
return df | |
# If dataframe is empty after operation, raise an exception | |
if df.empty: | |
msg = "calculating variant allele frequency resulted in an empty dataframe" | |
raise EmptyDataFrameException(msg) | |
return df | |
def calculate_allele_frequency(df, allele, pop_size): | |
# calculate the allele frequency of a variant in the dataframe, given a population size | |
try: | |
# check if the population size is positive | |
if pop_size <= 0: | |
# raise a ValueError if the population size is zero or negative | |
raise ValueError("The population size must be positive.") | |
# get the number of chromosomes with the allele | |
allele_count = df['ref'].str.count(allele).sum() + df['alt'].str.count(allele).sum() | |
# calculate the allele frequency | |
allele_freq = allele_count / pop_size | |
# print the allele frequency | |
print(f"The allele frequency of {allele} is {allele_freq:.4f}.") | |
except ZeroDivisionError: | |
# print an error message if the population size is zero | |
print("The population size cannot be zero.") | |
except ValueError as e: | |
# print the error message if the population size is negative | |
print(e) | |
except KeyError: | |
# print an error message if the allele is not valid | |
print(f"The allele {allele} is not valid. It must be one of A, C, G, or T.") | |
else: | |
# print a message when no exception occurs | |
print("The calculation was successful.") | |
def process_vcf(df): | |
try: | |
df = filter_by_region(df) | |
df = calculate_vaf(df) | |
calculate_allele_frequency(df, "A", 1000) | |
# ... more mini functions | |
print(df) | |
except EmptyDataFrameException as e: | |
print(f"Exiting the program due to {e}") | |
return df | |
finally: | |
df.to_csv("output.csv", index=False) | |
print("The DataFrame was saved to output.csv") | |
process_vcf(vcf_df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment