Skip to content

Instantly share code, notes, and snippets.

@mvallebr
Last active July 5, 2018 16:20
Show Gist options
  • Save mvallebr/d0f3600a8fc0f79f845a to your computer and use it in GitHub Desktop.
Save mvallebr/d0f3600a8fc0f79f845a to your computer and use it in GitHub Desktop.
Example of processing a CSV using several processes
#!/usr/bin/env python
'''
Created on 21/10/2014
@author: mvalle
'''
import csv
import argparse
import sys
from multiprocessing import Pool, Semaphore
import traceback
def process_row_set(row_set):
#Esta funcao processa um conjunto de N linhas da planilha
for row in row_set:
print '|'.join(row)
print '=' * 50
sys.stdout.flush()
def secure_process_row_set(process_method, *kargs):
try:
process_row_set(*kargs)
return False
except:
print "Exception in child process for args: %s " % (str(kargs))
print '-' * 60
e = sys.exc_info()
print e
print traceback.format_exc()
print '-' * 60
return True
workers_semaphore = None
def task_done(result):
workers_semaphore.release()
def process_csv(input_stream, args):
row_set = []
i = 0
with input_stream as csvfile:
spamreader = csv.reader(csvfile, delimiter=args.delimiter, quotechar=args.quotechar)
for row in spamreader:
row_set.append(row)
if len(row_set) >= step:
workers_semaphore.acquire()
process_pool.apply_async(secure_process_row_set, [process_row_set, row_set], callback=task_done)
i += step
row_set = []
if len(row_set) > 0:
process_pool.apply_async(secure_process_row_set, [process_row_set, row_set], callback=task_done)
i += step
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", type=str, help="Input CSV to be processed")
parser.add_argument("-d", "--delimiter", type=str, default='|', help="Character that separates columns")
parser.add_argument("-q", "--quotechar", type=str, default='"', help="When field contains delimiter character, which character to use as quotes?")
parser.add_argument("-s", "--step", type=int, default=1000, help="Amount of records to be processed at once each step")
parser.add_argument("-w", "--workers", type=int, default=1, help="Num of parallel processes that will process input lines in parallel")
parser.add_argument("-v", "--verbose", type=bool, default=False, help="Show detailed output")
args = parser.parse_args()
process_pool = Pool(processes=args.workers)
global workers_semaphore
workers_semaphore = Semaphore(args.workers)
process_csv(sys.stdin, args)
process_pool.close()
process_pool.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment