Last active
August 17, 2018 11:28
-
-
Save gr8Adakron/1f5341742b8c9fe0e6a0f591dd1e9789 to your computer and use it in GitHub Desktop.
Convert the JSON fields to CSV using the multiprocess along with map reduce architecture. - No memory issue - Easy handling of enormous conversion(100GB files) - Lightening fast - Works fine of linux - Haven't tested for windows.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#author: gr8_adakron. | |
#python: 3.6 (necessary for fstrings) | |
from subprocess import PIPE, Popen | |
from multiprocessing import Pool | |
import multiprocessing as mp | |
import pandas as pd | |
import random | |
import string | |
import re | |
import sys | |
import os | |
import time | |
import json | |
class JSONtoCSV(): | |
def __init__(self,input_file,output_file,model_name="dummy",date="dummy"): | |
super(JSONtoCSV, self).__init__() | |
#..> Basic constant variables. | |
self.cwd = os.getcwd() | |
self.number_of_cores = int(mp.cpu_count()) | |
self.split_threshold = 15000 | |
self.minimum_split = 5000 | |
#..> I/O constant variables. | |
self.input_file = input_file | |
self.output_file = output_file | |
self.model_name = model_name | |
self.date = date | |
self.output_column_order = "column_1,column_2" | |
#..> Map-reduce constants variables. | |
self.output_segments_directory = "output_segments"+str(int(round(time.time() * 1000))) | |
self.json_child = f"{self.output_segments_directory}/JSON_segments" | |
self.csv_child = f"{self.output_segments_directory}/CSV_segments" | |
self.create_directories() | |
def deleted_directories(self): | |
self.cmdline(f"rm -rf {self.output_segments_directory}") | |
def create_directories(self): | |
self.cmdline(f"mkdir {self.output_segments_directory}") | |
self.cmdline(f"mkdir {self.json_child}") | |
self.cmdline(f"mkdir {self.csv_child}") | |
os.system(f'echo "{self.output_column_order}" > {self.output_file}') | |
self.output_column_order_list = self.output_column_order.split(',') | |
def cmdline(self,command): | |
process = Popen( | |
args=command, | |
stdout=PIPE, | |
shell=True | |
) | |
return process.communicate()[0] | |
def check_for_fields(self,single_json,field_name): | |
if(field_name in single_json.keys()): | |
if(re.search(r"^\s*$",str(single_json[field_name]).lower())): | |
return "N.A." | |
return_var = str(single_json[field_name]) | |
return_var = return_var.replace('\n', ' ').replace('\r', '').replace('\t', '') | |
return_var = re.sub(r"\s+",r" ",return_var) | |
else: | |
return_var = "N.A" | |
return return_var | |
def single_file_executer(self,input_segment): | |
""" | |
========================================= IMPORTANT CHANGES ======================================== | |
Main conversion of JSON to CSV: | |
Replace the json_field_1, json_field_2 and so on as per your requirement of your JSON keys, which you want to be there | |
in output CSV. (line number: 87,88) | |
Similarly replace the CSV: columns_1, columns_2 and so on, as per you name convention for the output CSV. (line number: 87,88) | |
Order the CSV columns based on your convinient at line number: 32. | |
""" | |
input_full_path = f"{self.cwd}/{self.json_child}/{input_segment}" | |
output_full_path = f"{self.cwd}/{self.csv_child}/{input_segment}.csv" | |
single_csv_df_list = [] | |
for index,line in enumerate(open(input_full_path,'r',encoding="iso-8859-1")): | |
try: | |
strip_line = line.strip() | |
stripped_single_line = strip_line.strip() | |
single_json_line = json.loads(stripped_single_line) | |
single_csv_df_list.append({ | |
"columns_1" : self.check_for_fields(single_json_line,'json_field_1'), | |
"columns_2" : self.check_for_fields(single_json_line,'json_field_2'), | |
}) | |
except: | |
print(" ~> Error: Skipping index: %s of file: %s ."%(str(index),str(input_segment))) | |
single_csv_df = pd.DataFrame(single_csv_df_list) | |
single_csv_order_column_df = single_csv_df[self.output_column_order_list] | |
single_csv_order_column_df.to_csv(output_full_path,index=False,header=False) | |
def parallel_files_execution(self,filename_list): | |
# pool = mp.Pool(4) | |
jobs = [] | |
with Pool(processes=self.number_of_cores) as pool: | |
pool.map(self.single_file_executer, filename_list) | |
pool.close() | |
def give_line_number(self,file_name): | |
line_count = f"wc -l {file_name}" | |
line_no = self.cmdline(line_count) | |
line_no_int = int(re.findall(r"\d+",str(line_no))[0]) | |
return line_no_int | |
def segment_processing(self): | |
all_json = os.listdir(self.json_child) | |
self.parallel_files_execution(all_json) | |
def concatenate_output(self): | |
csv_output_seg_list = os.listdir(self.csv_child) | |
for segment_file in csv_output_seg_list: | |
os.system(f'cat {self.output_file} {self.csv_child}/{segment_file} >> {self.output_file}') | |
#print(csv_output_seg_list) | |
def post_stats_printing(self,input_line_no,output_line_no,start_time): | |
missing_lines = input_line_no - (output_line_no-1) | |
end_time = (int(time.time())-int(start_time)) | |
print("\n ================================== STATISTICS =============================\n") | |
print(" > Input JSON file : %d lines."%(input_line_no)) | |
print(" > Output CSV file : %d lines."%(output_line_no)) | |
print(" > Skipped : %d lines.\n"%(missing_lines)) | |
print(' >> Completed, in %s min (i.e : %d sec)\n'%(format(end_time/60,'.2f'),end_time)) | |
def supervisor(self,start_time): | |
print(self.input_file) | |
line_no = self.give_line_number(self.input_file) | |
split_value = str(int(line_no/20)) | |
if(line_no>self.split_threshold): | |
split_value = self.minimum_split | |
else: | |
split_value = line_no | |
self.cmdline(f"split -l {split_value} {self.input_file} {self.json_child}/segment_") | |
self.segment_processing() | |
self.concatenate_output() | |
output_line_no = self.give_line_number(self.output_file) | |
self.post_stats_printing(line_no,output_line_no,start_time) | |
if __name__ == "__main__": | |
input_file = sys.argv[1] | |
output_file = sys.argv[2] | |
start_time = time.time() | |
converter_worker = JSONtoCSV(input_file,output_file) | |
converter_worker.supervisor(start_time) | |
converter_worker.deleted_directories() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment