Created
August 5, 2020 20:10
-
-
Save rchatterjee/a716105b1455ab5c824d6e2bf14bd71f to your computer and use it in GitHub Desktop.
Often I had to process a gigantic CSV files (containing hundreds of millions of lines). Despite having a powerful 32 core machine I could not use it's full power, so I created this script, it breaks the csv into chunks and easily process them in parallel.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
from multiprocessing import Pool | |
import itertools | |
import mmap | |
import io | |
def approx_split_file(f, n): | |
""" | |
Does not work on gzip | |
Returns n seek locations which approximately split the file into | |
n equal chunks, without breaking a line | |
""" | |
# f is a file-like object. | |
old_file_position = f.tell() | |
SEEK_SET, SEEK_CUR, SEEK_END = 0, 1, 2 | |
f.seek(0, SEEK_END) | |
size = f.tell() | |
f.seek(old_file_position, SEEK_SET) | |
ret = [0] | |
while len(ret)<n: | |
i = ret[-1] + size/n | |
f.seek(i, SEEK_SET) | |
f.readline() | |
ret.append(f.tell()) | |
print("File size: {}\nret: {}".format(size, ret)) | |
return ret | |
class ParallelCSV(object): | |
def __init__(fiter, ncpu=-1, chunk_size): | |
if isinstance(fiter, str): | |
self.fiter = open(fiter) | |
else: | |
self.fiter = fiter | |
def apply(self, func): | |
with multiprocessing.Pool(self.ncpu) as p: | |
return p.imap(func, csvfobj) | |
def _split(self, nchunks): | |
"""Split the file into @nchunks chunks | |
Returns start and end of the file iterator | |
""" | |
approx_splits = approx_split_file(self.fiter, nchunks) | |
return approx_splits | |
def _get_chunk(self, s, e): | |
m = mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ) | |
trainf.seek(nline, 0) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment