Last active
July 5, 2018 16:20
-
-
Save mvallebr/d0f3600a8fc0f79f845a to your computer and use it in GitHub Desktop.
Example of processing a CSV using several processes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
Created on 21/10/2014 | |
@author: mvalle | |
''' | |
import csv | |
import argparse | |
import sys | |
from multiprocessing import Pool, Semaphore | |
import traceback | |
def process_row_set(row_set): | |
#Esta funcao processa um conjunto de N linhas da planilha | |
for row in row_set: | |
print '|'.join(row) | |
print '=' * 50 | |
sys.stdout.flush() | |
def secure_process_row_set(process_method, *kargs): | |
try: | |
process_row_set(*kargs) | |
return False | |
except: | |
print "Exception in child process for args: %s " % (str(kargs)) | |
print '-' * 60 | |
e = sys.exc_info() | |
print e | |
print traceback.format_exc() | |
print '-' * 60 | |
return True | |
workers_semaphore = None | |
def task_done(result): | |
workers_semaphore.release() | |
def process_csv(input_stream, args): | |
row_set = [] | |
i = 0 | |
with input_stream as csvfile: | |
spamreader = csv.reader(csvfile, delimiter=args.delimiter, quotechar=args.quotechar) | |
for row in spamreader: | |
row_set.append(row) | |
if len(row_set) >= step: | |
workers_semaphore.acquire() | |
process_pool.apply_async(secure_process_row_set, [process_row_set, row_set], callback=task_done) | |
i += step | |
row_set = [] | |
if len(row_set) > 0: | |
process_pool.apply_async(secure_process_row_set, [process_row_set, row_set], callback=task_done) | |
i += step | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-i", "--input", type=str, help="Input CSV to be processed") | |
parser.add_argument("-d", "--delimiter", type=str, default='|', help="Character that separates columns") | |
parser.add_argument("-q", "--quotechar", type=str, default='"', help="When field contains delimiter character, which character to use as quotes?") | |
parser.add_argument("-s", "--step", type=int, default=1000, help="Amount of records to be processed at once each step") | |
parser.add_argument("-w", "--workers", type=int, default=1, help="Num of parallel processes that will process input lines in parallel") | |
parser.add_argument("-v", "--verbose", type=bool, default=False, help="Show detailed output") | |
args = parser.parse_args() | |
process_pool = Pool(processes=args.workers) | |
global workers_semaphore | |
workers_semaphore = Semaphore(args.workers) | |
process_csv(sys.stdin, args) | |
process_pool.close() | |
process_pool.join() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment