Created
November 2, 2023 18:49
-
-
Save C-Saunders/db35389cb8eb4dd27e7fb7e45527f82c to your computer and use it in GitHub Desktop.
Split a file into chunks with a specified number of lines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from io import TextIOWrapper | |
from pathlib import Path | |
from typing import Optional, Tuple | |
def split_file(*, input_path: Path, output_file_prefix: str, output_file_suffix: Optional[str], lines_per_chunk: int, has_header: bool = True) -> list[Path]: | |
def build_filename() -> Path: | |
if output_file_suffix is None: | |
return input_path.parent.joinpath(f'{output_file_prefix}_{chunk_number}{input_path.suffix}') | |
return input_path.parent.joinpath(f'{output_file_prefix}_{chunk_number}_{output_file_suffix}{input_path.suffix}') | |
def initialize_chunk_file() -> Tuple[TextIOWrapper, int]: | |
output_path = build_filename() | |
output_file_paths.append(output_path) | |
output = open(output_path, 'w') | |
if has_header or chunk_number == 1: | |
output.write(header) | |
line_count = 1 | |
else: | |
line_count = 0 | |
return (output, line_count) | |
output_file_paths: list[Path] = [] | |
with open(input_path, 'rt') as input: | |
header = input.readline() | |
chunk_number = 1 | |
output, line_count = initialize_chunk_file() | |
while line := input.readline(): | |
if line_count == lines_per_chunk: | |
output.close() | |
chunk_number += 1 | |
output, line_count = initialize_chunk_file() | |
output.write(line) | |
line_count += 1 | |
return output_file_paths | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("input_path", type=Path) | |
parser.add_argument("--output-file-prefix", type=str, default="output") | |
parser.add_argument("--output-file-suffix", type=str, default=None) | |
parser.add_argument("--chunk-size", type=int, default=1_000_000) | |
parser.add_argument("--no-header", action="store_true", default=False) | |
args = parser.parse_args() | |
input_path = args.input_path | |
chunk_size = args.chunk_size | |
if input_path.is_dir(): | |
raise ValueError(f'Input path {input_path} is a directory') | |
if not input_path.exists(): | |
raise ValueError(f'Input path {input_path} does not exist') | |
if chunk_size <= 0: | |
raise ValueError(f'Lines per chunk must be positive, got {chunk_size}') | |
for output_path in split_file( | |
input_path=input_path, | |
output_file_prefix=args.output_file_prefix, | |
output_file_suffix=args.output_file_suffix, | |
lines_per_chunk=chunk_size, | |
has_header=not args.no_header | |
): | |
print(output_path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment