Skip to content

Instantly share code, notes, and snippets.

@jassinm
Created May 4, 2012 20:29
Show Gist options
  • Save jassinm/2597529 to your computer and use it in GitHub Desktop.
Save jassinm/2597529 to your computer and use it in GitHub Desktop.
import os
import pymongo
import gridfs
from test import sanitize, size_chunks, CsvChunkReader
import multiprocessing as mp
import csv
mongo_conn = pymongo.Connection()
mongo_db = mongo_conn['test']
docs = mongo_db['docs']
fs = gridfs.GridFS(mongo_db)
def upload_to_mongo(csvreader, header):
for r in sanitize(csvreader):
if 'path' in r:
with open(r['path']) as fh:
fsid = fs.put(fh, filename=os.path.basename(r['path']))
r['fsid'] = fsid
docs.save(r)
def run(filepath):
number_of_workers = 16
file_size = os.path.getsize(filepath)
with open(filepath, 'r') as fh:
chunks = size_chunks(fh, file_size, num_chunks=number_of_workers)
with open(filepath, 'r') as fh:
csvreader = csv.reader(fh)
header = csvreader.next()
jobs = []
for workerid, chunk in enumerate(chunks):
start, end = chunk
csv_chunk_reader = CsvChunkReader(open(filepath, 'r'), start, end)
t = mp.Process(target=upload_to_mongo, args=(csv_chunk_reader, header))
t.start()
jobs.append(t)
for job in jobs:
job.join()
if __name__ == '__main__':
import sys
run(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment