|
#!/usr/bin/env python |
|
import argparse |
|
import gzip |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import os |
|
|
|
|
|
def compress_block(block: bytes) -> float: |
|
compressed = gzip.compress(block) |
|
ratio = ( |
|
len(block) / len(compressed) if compressed else float("inf") |
|
) # Prevent division by zero |
|
return ratio |
|
|
|
|
|
def is_gzipped(file_path: str) -> bool: |
|
with open(file_path, "rb") as f: |
|
return f.read(2) == b"\x1f\x8b" |
|
|
|
|
|
def calculate_compression_ratios(file_path: str, block_size: int) -> np.ndarray: |
|
compression_ratios = [] |
|
|
|
open_func = gzip.open if is_gzipped(file_path) else open |
|
|
|
with open_func(file_path, "rb") as f: |
|
while True: |
|
block = f.read(block_size) |
|
if not block: |
|
break |
|
ratio = compress_block(block) |
|
compression_ratios.append(ratio) |
|
|
|
# Convert the list of ratios to a 1D array |
|
ratios_array = np.array(compression_ratios) |
|
return ratios_array |
|
|
|
|
|
def log_transform_ratios(ratios_array: np.ndarray) -> np.ndarray: |
|
# Add a small constant to avoid log(0) |
|
small_constant = 1e-5 |
|
adjusted_ratios = ( |
|
ratios_array + small_constant - min(ratios_array) + 1 |
|
) # Ensure all values are positive |
|
log_ratios = np.log(adjusted_ratios) |
|
return log_ratios |
|
|
|
|
|
def z_score_standardize_ratios(ratios_array: np.ndarray) -> np.ndarray: |
|
mean = np.mean(ratios_array) |
|
std = np.std(ratios_array) |
|
standardized_ratios = (ratios_array - mean) / std |
|
return standardized_ratios |
|
|
|
|
|
def generate_heatmap( |
|
ratios_array: np.ndarray, output_image: str, cmap: str, transform: str = None |
|
) -> None: |
|
plt.figure(figsize=(10, 5)) |
|
plt.imshow(ratios_array.reshape(1, -1), cmap=cmap, aspect="auto") |
|
plt.colorbar() |
|
plt.title("Heatmap of Compression Ratios") |
|
plt.xlabel("Block Number") |
|
ylabel = "Compression Ratio" |
|
if transform == "log": |
|
ylabel = "Log of Compression Ratio" |
|
elif transform == "zscore": |
|
ylabel = "Z-Score of Compression Ratio" |
|
plt.ylabel(ylabel) |
|
plt.savefig(output_image) |
|
plt.close() |
|
|
|
|
|
def save_ratios_tsv( |
|
ratios_array: np.ndarray, output_filename: str, block_size: int |
|
) -> None: |
|
tsv_file_name = os.path.join( |
|
os.getcwd(), output_filename + "_compression_ratios.tsv.gz" |
|
) |
|
|
|
# Save the compression ratios as a gzip compressed TSV file |
|
with gzip.open(tsv_file_name, "wt") as f: |
|
f.write(f"#block_size={block_size}\n") |
|
for i, ratio in enumerate(ratios_array): |
|
f.write(f"{i}\t{ratio}\n") |
|
|
|
print(f"Compression ratios saved as: {tsv_file_name}") |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser( |
|
description="Generate a heatmap and TSV of compression ratios for a file." |
|
) |
|
parser.add_argument("file", type=str, help="File to compress and analyze") |
|
parser.add_argument( |
|
"-o", |
|
"--output", |
|
type=str, |
|
help="Output image filename (PNG format)", |
|
default=None, |
|
) |
|
parser.add_argument( |
|
"-b", |
|
"--block_size", |
|
type=int, |
|
default=1024, |
|
help="Block size for compression (default: 1024)", |
|
) |
|
parser.add_argument( |
|
"-w", |
|
"--window_size", |
|
type=int, |
|
default=1, |
|
help="Window size to average compression ratios (default: 1)", |
|
) |
|
parser.add_argument( |
|
"-c", |
|
"--cmap", |
|
type=str, |
|
default="rainbow", |
|
help="Matplotlib colormap for the heatmap (default: rainbow)", |
|
) |
|
parser.add_argument( |
|
"-t", |
|
"--transform", |
|
type=str, |
|
default=None, |
|
help='Apply a transformation to the ratios (options: "log", "zscore")', |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
file_path = args.file |
|
block_size = args.block_size |
|
window_size = args.window_size |
|
cmap = args.cmap |
|
|
|
base_filename = os.path.splitext(os.path.basename(file_path))[0] |
|
|
|
if args.output: |
|
output_image = args.output |
|
else: |
|
output_image = ( |
|
f"{os.path.splitext(base_filename)[0]}_compression_ratio_heatmap.png" |
|
) |
|
|
|
ratios_array = calculate_compression_ratios(file_path, block_size) |
|
|
|
output_tsv_filename = ( |
|
base_filename |
|
if args.output is None |
|
else os.path.splitext(os.path.basename(args.output))[0] |
|
) |
|
|
|
save_ratios_tsv(ratios_array, output_tsv_filename, block_size) |
|
|
|
if args.transform == "log": |
|
ratios_array = log_transform_ratios(ratios_array) |
|
elif args.transform == "zscore": |
|
ratios_array = z_score_standardize_ratios(ratios_array) |
|
|
|
if window_size > 1: |
|
ratios_array = np.convolve( |
|
ratios_array, np.ones(window_size) / window_size, mode="valid" |
|
) |
|
|
|
generate_heatmap(ratios_array, output_image, cmap, args.transform) |
|
print(f"Heatmap saved as: {output_image}") |
Example:
SRR11794587_1
SRR11794587_2