Skip to content

Instantly share code, notes, and snippets.

@SiarheyUchukhlebau
Created July 16, 2024 10:58
Show Gist options
  • Save SiarheyUchukhlebau/30f7e6d1a3b904c3cf54fb5eca7bcc40 to your computer and use it in GitHub Desktop.
Save SiarheyUchukhlebau/30f7e6d1a3b904c3cf54fb5eca7bcc40 to your computer and use it in GitHub Desktop.
import os
import sys
def split_file(input_file, output_dir, chunk_size_mb=100):
if not os.path.exists(output_dir):
os.makedirs(output_dir)
chunk_size = chunk_size_mb * 1024 * 1024
header = None
current_chunk = []
current_chunk_size = 0
file_count = 1
def write_chunk(chunk, file_count):
output_file = os.path.join(output_dir, f'export_chunk_{file_count}.csv')
with open(output_file, 'w', encoding='utf-8') as output:
output.write(header)
output.writelines(chunk)
print(f"Created file: {output_file}")
with open(input_file, 'r', encoding='utf-8') as file:
header = file.readline()
product_chunk = []
for line in file:
line_size = len(line.encode('utf-8'))
first_column_value = line.split(',', 1)[0]
if first_column_value != '':
new_chunk_size = current_chunk_size + sum(len(l.encode('utf-8')) for l in product_chunk) + line_size
if new_chunk_size > chunk_size:
write_chunk(current_chunk, file_count)
current_chunk = []
current_chunk_size = 0
file_count += 1
current_chunk.extend(product_chunk)
current_chunk_size += sum(len(l.encode('utf-8')) for l in product_chunk)
product_chunk = [line]
else:
product_chunk.append(line)
current_chunk.extend(product_chunk)
if current_chunk:
write_chunk(current_chunk, file_count)
if __name__ == "__main__":
if len(sys.argv) < 3 or len(sys.argv) > 4:
print("Usage: python split_file.py <input_file_path> <output_directory> [chunk_size_mb]")
sys.exit(1)
input_file_path = sys.argv[1]
output_directory = sys.argv[2]
chunk_size_mb = int(sys.argv[3]) if len(sys.argv) == 4 else 100
split_file(input_file_path, output_directory, chunk_size_mb)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment