Skip to content

Instantly share code, notes, and snippets.

@rchatterjee
Created August 5, 2020 20:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rchatterjee/a716105b1455ab5c824d6e2bf14bd71f to your computer and use it in GitHub Desktop.
Save rchatterjee/a716105b1455ab5c824d6e2bf14bd71f to your computer and use it in GitHub Desktop.
Often I had to process a gigantic CSV files (containing hundreds of millions of lines). Despite having a powerful 32 core machine I could not use it's full power, so I created this script, it breaks the csv into chunks and easily process them in parallel.
import csv
from multiprocessing import Pool
import itertools
import mmap
import io
def approx_split_file(f, n):
"""
Does not work on gzip
Returns n seek locations which approximately split the file into
n equal chunks, without breaking a line
"""
# f is a file-like object.
old_file_position = f.tell()
SEEK_SET, SEEK_CUR, SEEK_END = 0, 1, 2
f.seek(0, SEEK_END)
size = f.tell()
f.seek(old_file_position, SEEK_SET)
ret = [0]
while len(ret)<n:
i = ret[-1] + size/n
f.seek(i, SEEK_SET)
f.readline()
ret.append(f.tell())
print("File size: {}\nret: {}".format(size, ret))
return ret
class ParallelCSV(object):
def __init__(fiter, ncpu=-1, chunk_size):
if isinstance(fiter, str):
self.fiter = open(fiter)
else:
self.fiter = fiter
def apply(self, func):
with multiprocessing.Pool(self.ncpu) as p:
return p.imap(func, csvfobj)
def _split(self, nchunks):
"""Split the file into @nchunks chunks
Returns start and end of the file iterator
"""
approx_splits = approx_split_file(self.fiter, nchunks)
return approx_splits
def _get_chunk(self, s, e):
m = mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ)
trainf.seek(nline, 0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment