Skip to content

Instantly share code, notes, and snippets.

@pansapiens
Created May 21, 2024 08:10
Show Gist options
  • Save pansapiens/60978c8e076336e80e869b4ffa2be9e1 to your computer and use it in GitHub Desktop.
Save pansapiens/60978c8e076336e80e869b4ffa2be9e1 to your computer and use it in GitHub Desktop.
Convert miRDeep2 counts CSV to nicer TSV tables
#!/usr/bin/env python
import argparse
import pandas as pd
import re
import sys
from typing import Optional, List
import logging
import glob
import io
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
DEFAULT_SAMPLE_ID_REGEX = r"_([a-zA-Z0-9-]+)_collapsed\.csv$"
def extract_sample_id(
filename: str,
sample_id: Optional[str] = None,
sample_id_regex: Optional[str] = None,
) -> Optional[str]:
if sample_id:
return sample_id
sample_id_regex = sample_id_regex or DEFAULT_SAMPLE_ID_REGEX
match = re.search(sample_id_regex, filename)
if match:
return match.group(1)
return None
def read_section(lines: List[str], start_line: str) -> List[str]:
start_index = next(
(i for i, line in enumerate(lines) if line.startswith(start_line)), None
)
if start_index is None:
logging.error(f"Start line '{start_line}' not found in file.")
return []
section_lines = []
for line in lines[start_index + 1 :]:
if line.strip() == "":
break
section_lines.append(line)
return section_lines
def clean_column_names(df: pd.DataFrame) -> pd.DataFrame:
df.columns = df.columns.str.replace(r"[\s-]", "_", regex=True).str.lower()
return df
def combine_id_columns(df: pd.DataFrame) -> pd.DataFrame:
df["mirdeep2_id"] = df["provisional_id"].combine_first(df["tag_id"])
df = df.drop(columns=["provisional_id", "tag_id"])
return df
def combine_probability_columns(df: pd.DataFrame) -> pd.DataFrame:
prob_cols = df.filter(like="estimated_probability_that").columns
df["estimated_probability"] = df[prob_cols[0]].combine_first(df[prob_cols[1]])
prob_values = (
df["estimated_probability"]
.str.replace("%", "")
.str.replace(" ", "")
.str.split(r"\+/\-", expand=True)
)
df["percent_probability"] = pd.to_numeric(prob_values[0], errors="coerce")
df["percent_probability_ci"] = pd.to_numeric(prob_values[1], errors="coerce")
df = df.drop(columns=prob_cols)
df = df.drop(columns=["estimated_probability"])
return df
def convert_p_value(df: pd.DataFrame) -> pd.DataFrame:
df["significant_randfold_p_value"] = df["significant_randfold_p_value"].map(
{"yes": True, "no": False}
)
return df
def replace_hyphens_with_na(df: pd.DataFrame) -> pd.DataFrame:
df.replace("-", pd.NA, inplace=True)
return df
def split_precursor_coordinate(df: pd.DataFrame) -> pd.DataFrame:
coord_data = df["precursor_coordinate"].str.extract(
r"(?P<chromosome>[^:]+):(?P<start>\d+)\.\.(?P<end>\d+):(?P<strand>[\+\-])"
)
df = pd.concat([df, coord_data], axis=1)
return df
def process_dataframe(df: pd.DataFrame) -> pd.DataFrame:
df = clean_column_names(df)
df = combine_id_columns(df)
df = combine_probability_columns(df)
df = convert_p_value(df)
df = replace_hyphens_with_na(df)
df = split_precursor_coordinate(df)
# Reorder columns
cols = list(df.columns)
cols.insert(0, cols.pop(cols.index("sample_id")))
cols.insert(1, cols.pop(cols.index("mirdeep2_id")))
cols.insert(4, cols.pop(cols.index("percent_probability")))
cols.insert(5, cols.pop(cols.index("percent_probability_ci")))
df = df[cols]
return df
def process_file(
input_file: str,
sample_id: Optional[str] = None,
sample_id_regex: Optional[str] = None,
) -> pd.DataFrame:
with open(input_file, "r") as file:
lines = file.readlines()
novel_section = read_section(lines, "novel miRNAs predicted by miRDeep2")
mature_section = read_section(lines, "mature miRBase miRNAs detected by miRDeep2")
if not novel_section and not mature_section:
logging.error(f"Neither section found in file: {input_file}")
return pd.DataFrame()
sample_id_value = extract_sample_id(input_file, sample_id, sample_id_regex)
novel_df = pd.read_csv(io.StringIO("\n".join(novel_section)), sep="\t")
novel_df["novel"] = True
mature_df = pd.read_csv(io.StringIO("\n".join(mature_section)), sep="\t")
mature_df["novel"] = False
merged_df = pd.concat([novel_df, mature_df], ignore_index=True)
merged_df["sample_id"] = sample_id_value
processed_df = process_dataframe(merged_df)
return processed_df
def main(
input_files: List[str], sample_id: Optional[str], sample_id_regex: Optional[str]
) -> None:
all_dfs = []
for input_file in input_files:
merged_df = process_file(input_file, sample_id, sample_id_regex)
if not merged_df.empty:
all_dfs.append(merged_df)
if all_dfs:
final_df = pd.concat(all_dfs, ignore_index=True)
final_df.to_csv(sys.stdout, sep="\t", index=False)
else:
logging.error("No data to output.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Extract and merge miRNA prediction sections from miRDeep2 output."
)
parser.add_argument(
"input_files",
nargs="+",
help="Input file paths from miRDeep2 (support for glob patterns)",
)
parser.add_argument(
"--sample-id",
help="Sample ID to override extraction from filename",
required=False,
)
parser.add_argument(
"--sample-id-regex",
help=f"Regex pattern to extract sample ID from filename (default: {DEFAULT_SAMPLE_ID_REGEX})",
default=DEFAULT_SAMPLE_ID_REGEX,
)
args = parser.parse_args()
expanded_files = []
for pattern in args.input_files:
expanded_files.extend(glob.glob(pattern))
main(expanded_files, args.sample_id, args.sample_id_regex)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment