Skip to content

Instantly share code, notes, and snippets.

@gr8Adakron
Last active August 17, 2018 11:28
Show Gist options
  • Save gr8Adakron/1f5341742b8c9fe0e6a0f591dd1e9789 to your computer and use it in GitHub Desktop.
Save gr8Adakron/1f5341742b8c9fe0e6a0f591dd1e9789 to your computer and use it in GitHub Desktop.
Convert the JSON fields to CSV using the multiprocess along with map reduce architecture. - No memory issue - Easy handling of enormous conversion(100GB files) - Lightening fast - Works fine of linux - Haven't tested for windows.
#author: gr8_adakron.
#python: 3.6 (necessary for fstrings)
from subprocess import PIPE, Popen
from multiprocessing import Pool
import multiprocessing as mp
import pandas as pd
import random
import string
import re
import sys
import os
import time
import json
class JSONtoCSV():
def __init__(self,input_file,output_file,model_name="dummy",date="dummy"):
super(JSONtoCSV, self).__init__()
#..> Basic constant variables.
self.cwd = os.getcwd()
self.number_of_cores = int(mp.cpu_count())
self.split_threshold = 15000
self.minimum_split = 5000
#..> I/O constant variables.
self.input_file = input_file
self.output_file = output_file
self.model_name = model_name
self.date = date
self.output_column_order = "column_1,column_2"
#..> Map-reduce constants variables.
self.output_segments_directory = "output_segments"+str(int(round(time.time() * 1000)))
self.json_child = f"{self.output_segments_directory}/JSON_segments"
self.csv_child = f"{self.output_segments_directory}/CSV_segments"
self.create_directories()
def deleted_directories(self):
self.cmdline(f"rm -rf {self.output_segments_directory}")
def create_directories(self):
self.cmdline(f"mkdir {self.output_segments_directory}")
self.cmdline(f"mkdir {self.json_child}")
self.cmdline(f"mkdir {self.csv_child}")
os.system(f'echo "{self.output_column_order}" > {self.output_file}')
self.output_column_order_list = self.output_column_order.split(',')
def cmdline(self,command):
process = Popen(
args=command,
stdout=PIPE,
shell=True
)
return process.communicate()[0]
def check_for_fields(self,single_json,field_name):
if(field_name in single_json.keys()):
if(re.search(r"^\s*$",str(single_json[field_name]).lower())):
return "N.A."
return_var = str(single_json[field_name])
return_var = return_var.replace('\n', ' ').replace('\r', '').replace('\t', '')
return_var = re.sub(r"\s+",r" ",return_var)
else:
return_var = "N.A"
return return_var
def single_file_executer(self,input_segment):
"""
========================================= IMPORTANT CHANGES ========================================
Main conversion of JSON to CSV:
Replace the json_field_1, json_field_2 and so on as per your requirement of your JSON keys, which you want to be there
in output CSV. (line number: 87,88)
Similarly replace the CSV: columns_1, columns_2 and so on, as per you name convention for the output CSV. (line number: 87,88)
Order the CSV columns based on your convinient at line number: 32.
"""
input_full_path = f"{self.cwd}/{self.json_child}/{input_segment}"
output_full_path = f"{self.cwd}/{self.csv_child}/{input_segment}.csv"
single_csv_df_list = []
for index,line in enumerate(open(input_full_path,'r',encoding="iso-8859-1")):
try:
strip_line = line.strip()
stripped_single_line = strip_line.strip()
single_json_line = json.loads(stripped_single_line)
single_csv_df_list.append({
"columns_1" : self.check_for_fields(single_json_line,'json_field_1'),
"columns_2" : self.check_for_fields(single_json_line,'json_field_2'),
})
except:
print(" ~> Error: Skipping index: %s of file: %s ."%(str(index),str(input_segment)))
single_csv_df = pd.DataFrame(single_csv_df_list)
single_csv_order_column_df = single_csv_df[self.output_column_order_list]
single_csv_order_column_df.to_csv(output_full_path,index=False,header=False)
def parallel_files_execution(self,filename_list):
# pool = mp.Pool(4)
jobs = []
with Pool(processes=self.number_of_cores) as pool:
pool.map(self.single_file_executer, filename_list)
pool.close()
def give_line_number(self,file_name):
line_count = f"wc -l {file_name}"
line_no = self.cmdline(line_count)
line_no_int = int(re.findall(r"\d+",str(line_no))[0])
return line_no_int
def segment_processing(self):
all_json = os.listdir(self.json_child)
self.parallel_files_execution(all_json)
def concatenate_output(self):
csv_output_seg_list = os.listdir(self.csv_child)
for segment_file in csv_output_seg_list:
os.system(f'cat {self.output_file} {self.csv_child}/{segment_file} >> {self.output_file}')
#print(csv_output_seg_list)
def post_stats_printing(self,input_line_no,output_line_no,start_time):
missing_lines = input_line_no - (output_line_no-1)
end_time = (int(time.time())-int(start_time))
print("\n ================================== STATISTICS =============================\n")
print(" > Input JSON file : %d lines."%(input_line_no))
print(" > Output CSV file : %d lines."%(output_line_no))
print(" > Skipped : %d lines.\n"%(missing_lines))
print(' >> Completed, in %s min (i.e : %d sec)\n'%(format(end_time/60,'.2f'),end_time))
def supervisor(self,start_time):
print(self.input_file)
line_no = self.give_line_number(self.input_file)
split_value = str(int(line_no/20))
if(line_no>self.split_threshold):
split_value = self.minimum_split
else:
split_value = line_no
self.cmdline(f"split -l {split_value} {self.input_file} {self.json_child}/segment_")
self.segment_processing()
self.concatenate_output()
output_line_no = self.give_line_number(self.output_file)
self.post_stats_printing(line_no,output_line_no,start_time)
if __name__ == "__main__":
input_file = sys.argv[1]
output_file = sys.argv[2]
start_time = time.time()
converter_worker = JSONtoCSV(input_file,output_file)
converter_worker.supervisor(start_time)
converter_worker.deleted_directories()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment