Skip to content

Instantly share code, notes, and snippets.

@trupti-shetty
Last active May 22, 2021 16:53
Show Gist options
  • Save trupti-shetty/b2c0a6f3faf30af14fc8b780d34ac8c8 to your computer and use it in GitHub Desktop.
Save trupti-shetty/b2c0a6f3faf30af14fc8b780d34ac8c8 to your computer and use it in GitHub Desktop.
Upload Huge CSV Data using Mutiprocessing
import multiprocessing
import pymongo
import pandas as pd
import os
'''
Article - https://truptishetty.medium.com/how-to-upload-huge-csv-data-faster-28f27114cb22
'''
class CSVData:
def __init__(self, server_name, port, db_name, file_name):
self.server_name = server_name
self.port = port
self.db_name = db_name
self.file_name = file_name
self.chunk_size = 100000
# Change it according to the number of cores available
self.number_of_pool = 4
def read_csv(self):
if not os.path.exists(self.file_name):
return
header = ["all", "headers", "or", "read", "from", "csv"]
pool = multiprocessing.Pool(self.number_of_pool)
reader = pd.read_csv(self.file_name, names=header, chunksize=self.chunk_size, low_memory=False, skiprows=1)
for df in reader:
# Do any data cleaning or manipulation if required here
'''We will have to send the mongo details instead
of the connection because python doesn't share
variables between processes '''
pool.apply_async(upload_data, args=(df, header, self.server_name, self.port , self.db_name ))
pool.close()
pool.join()
def upload_data(data, header, server_name, port, db_name):
try:
conn = pymongo.MongoClient(server_name, port)
db = conn[db_name]
db.collection.insert(data.to_dict('records'))
except Exception as ex:
print( "Error while inserting CSV data - ", repr(ex))
finally:
conn.close()
@trupti-shetty
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment