Created
December 27, 2017 07:46
-
-
Save gr8Adakron/66c7c958d8dde1fd6b10180437757fad to your computer and use it in GitHub Desktop.
Its a perl script which on-time creates a berkeley-DB with the JSON IDs as primary_key. Whle forming a output JSON from input CSV, it searches the extra attributes of CSV in JSON-DB(berkeley-DB){Just like Vlookup}. the only difference is it takes, creates, utilize everything using parallel processing{reading as well writing} and it is lightening…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Author : gr8_Adakron. | |
use Term::ANSIColor; | |
use Time::HiRes qw(time); | |
use strict; | |
use warnings; | |
use Carp; | |
use POSIX ":sys_wait_h"; | |
use Data::Dumper; | |
use JSON; | |
use lib '/junk/lib'; | |
use JSON; | |
use Text::CSV; | |
use forks::BerkeleyDB; | |
use BerkeleyDB; | |
use threads::shared; | |
use BerkeleyDB::Hash; | |
use Time::HiRes qw(gettimeofday tv_interval); | |
use constant DBPATH => 'raw_json_hash.db'; #.... hash data base file name. | |
our $max_processors = 1; #.... initialize max_number_of_processor_on_cpu = 1. | |
my $json_converter = JSON->new->allow_nonref; | |
my $start = time; | |
$|++; | |
# print backtrace info | |
sub bt { | |
croak "DIED ($.)\n"; } | |
chomp(my $cpu_count = `grep -c -P '^processor\\s+:' /proc/cpuinfo`); | |
#-------------> Arguments . | |
my $input_file = $ARGV[0] or die "Error : 1st argument is Model_Output.CSV file\n"; | |
my $raw_input_file = $ARGV[1] or die "Error : 2nd argument is Raw_Data_Input_File.JSON file\n"; | |
our $final_output_file = $ARGV[2] or die "Error : 3rd argument is Meged_Output.JSON file\n"; | |
#-------------> global variables. | |
my ($no_of_json_row) = number_of_line_in_input_files($raw_input_file,$input_file); | |
if($no_of_json_row>5000){ | |
$max_processors = int($cpu_count); #.... max_number_of_processor_on_cpu | |
} | |
my $time_stamp = time; | |
our $temporary_cache_file = "CSVtoJSON_cache$time_stamp"; | |
our $db_directory = "$temporary_cache_file/databases"; | |
our $csv_split_files_directory = "$temporary_cache_file/csv_segment"; | |
our $segment_of_csv_name = "segment"; | |
our $output_json_segment_directory = "$temporary_cache_file/output_segment"; | |
our $final_output_directory = "$temporary_cache_file/merged"; | |
our $raw_json_split_files_directory = "$temporary_cache_file/inputs_json_segments"; | |
our $segment_of_raw_json_name = "raw_json_child"; | |
our $completed_mapped_data = 0; | |
my $headers = `head -1 $input_file`; | |
my @csv_headers = $headers =~/([a-z_]+)/ig; | |
#-------------> Creating Segment Directories. | |
my $create_temporary_cache = `mkdir $temporary_cache_file`; | |
my $create_csv_split_directory = `mkdir $csv_split_files_directory`; | |
my $create_json_split_directory = `mkdir $output_json_segment_directory`; | |
my $create_raw_json_split_directory = `mkdir $raw_json_split_files_directory`; | |
my $create_db_directory = `mkdir $db_directory`; | |
#..... Main : starts .....# | |
#-------------> Mapping Process . | |
print("\n ~~>> Number of Core Avalaible : $cpu_count."); | |
print("\n ~~>> Processing... "); | |
print("\n >>__________________________________________ Mapping Json... ____________________________________________<<\n\n"); | |
#-------------> Mapping Section. | |
my @raw_json_sub_file = spliting_file_process($raw_input_file,$raw_json_split_files_directory,$segment_of_raw_json_name); | |
my $number_raw_json_file = int($#raw_json_sub_file)+1; | |
$SIG{__WARN__} = \&bt; #..... install signal handlers | |
for my $iteration (0..$#raw_json_sub_file){ | |
simultaneous_mapping_hash($raw_json_sub_file[$iteration],$iteration+1); | |
} | |
for (1..$number_raw_json_file) { my $pid = wait; } | |
for my $iteration (0..$#raw_json_sub_file){ | |
my $row_mapped = `wc -l $raw_json_split_files_directory/$raw_json_sub_file[$iteration].json`; | |
$row_mapped =~/(\d+)/is; | |
$row_mapped = int($1); | |
$completed_mapped_data = $completed_mapped_data + $row_mapped; | |
} | |
my $duration_mapped_seconds = sprintf("%.2f", time - $start); | |
my $duration_mapped_minutes = sprintf("%.2f", $duration_mapped_seconds/60); | |
#print colored( sprintf("hello"), 'magenta' ), "\n"; | |
print colored( sprintf("\n ~~>> Total Row Mapped : $completed_mapped_data in $duration_mapped_minutes min . (i.e: $duration_mapped_seconds sec)"), 'red'); | |
print("\n\n >>__________________________________________ Matching... _____________________________________________<<\n"); | |
#-------------> Reading CSV and threading. | |
my $env = get_environment(); #........ db enviroment init | |
our $db_instance_to_match = formed_db_instance($env); | |
my @sub_file = spliting_file_process($input_file,$csv_split_files_directory,$segment_of_csv_name); | |
our $num_of_threads = int($#sub_file)+1; | |
print("\n ~~> Segment File created in the Segment folder :"); | |
print("\n ==>> [ @sub_file ]"); | |
print("\n -- -------- -------- ---------- --------- ---------"); | |
print("\n ~~> Number of Threads, Formed : $num_of_threads"); | |
print("\n -- -------- -------- ---------- --------- ---------"); | |
print("\n ~~>> Removing None, process start... threads running... "); | |
print("\n -- -------- -------- ---------- --------- ---------\n\n"); | |
my @threads = initThreads(); | |
for my $iteration (0..$#threads){ | |
if($sub_file[$iteration] eq 'segmentaa'){ | |
my $removing_header_from_csv = `sed -i '1d' $csv_split_files_directory/$sub_file[$iteration]`; | |
} | |
$threads[$iteration] = threads->create(\&doOperation,$sub_file[$iteration],\@csv_headers); | |
} | |
foreach(@threads){ | |
$_->join(); | |
} | |
#-------------> Post Processing Function. | |
print("\n\n >>_______________________________ Merging Output Segment to $final_output_file ... _________________________________<<\n\n"); | |
my $number_of_prediction = merged_all_output_segments(); #.... merging all output segments to output file. | |
delete_all_temporary_directory(); #.... deleting all temporary folder for killing threads. | |
#-------------> Program Time Calculation. | |
my $duration_seconds = sprintf("%.2f", time - $start); | |
my $duration_minutes = sprintf("%.2f", $duration_seconds/60); | |
print("\n -- -------- -------- ---------- --------- ---------"); | |
print colored( sprintf("\n ~~>> Total Prediction Transformed : $number_of_prediction. "), 'red'); | |
print("\n -- -------- -------- ---------- --------- ---------"); | |
print("\n ~~>> Completed.(Removing None Completed). "); | |
print colored( sprintf("\n ~~>> Execution time: $duration_minutes . minutes . (in $duration_seconds . sec)\n"), 'red'); | |
print(" ~~>> Successfully Ended, Good Bye!\n"); | |
#..... Main : End .....# | |
#-----------> subroutines zone | |
#-ErrFile => $dberr, | |
sub get_environment { | |
BerkeleyDB::Env->new ( | |
-Home => "$db_directory", | |
-Cachesize => 4 * 1024 * 1024, | |
-Flags => DB_CREATE | | |
DB_INIT_MPOOL | | |
DB_INIT_CDB, | |
) or die | |
"Couldn't initialize BerkeleyDB environment: $BerkeleyDB::Error"; | |
} | |
sub formed_db_instance { | |
my $env = $_[0]; | |
my $db = new BerkeleyDB::Hash( | |
-Filename => DBPATH, | |
-Env => $env, | |
-Flags => DB_CREATE()) | |
or croak "Failed to open DBPATH: '$!' ($BerkeleyDB::Error)\n"; | |
return $db; | |
} | |
sub initThreads{ | |
my @initThreads; | |
for(my $i = 1;$i<=$num_of_threads;$i++){ | |
push(@initThreads,$i); | |
} | |
return @initThreads; | |
} | |
sub mapping_data{ | |
my $segment_to_be_mapped = $_[0]; | |
my $db_instance_created = $_[1]; | |
my $mapping_index = 0; | |
open my $fh, '<', $segment_to_be_mapped or die "$segment_to_be_mapped: $!"; | |
while( <$fh> ) { | |
$mapping_index = $.; | |
my $raw_single_json = give_json_object($_); | |
$db_instance_created->db_put($raw_single_json->{'item_id'}, $_); | |
} | |
close $fh; | |
return $mapping_index | |
} | |
sub doOperation{ | |
my @arguments = @_; | |
my @input_headers = @{$_[1]}; | |
my $input_file = "$csv_split_files_directory/$arguments[0]"; | |
my $output_file = "$output_json_segment_directory/output_$arguments[0].json"; | |
my $id = threads->tid(); #......... thread id. | |
merging_data($input_file,$output_file,\@input_headers); | |
print " ==>> Thread $id, Executing $input_file.csv, DONE! ..................................................ok \n"; | |
threads->exit(); #........ Exit the thread | |
} | |
sub simultaneous_mapping_hash { | |
my $id = $_[1]; | |
my $pid = fork; | |
croak "Could not fork: '$!'\n" unless (defined($pid)); | |
return unless ($pid == 0); | |
my $env = get_environment(); #........ db enviroment init | |
my $db_instance = formed_db_instance($env); #........ db instance init | |
my $one_thread_mapped_total = 0; | |
my $input_file = "$raw_json_split_files_directory/$_[0].json"; | |
my $convert_to_json_extension = `mv $raw_json_split_files_directory/$_[0] $input_file`; | |
my $segments_mapped = mapping_data($input_file,$db_instance); | |
$one_thread_mapped_total = $one_thread_mapped_total + $segments_mapped; | |
print " ==>> Thread $id, Executing $input_file, DONE! = $one_thread_mapped_total .................ok \n"; | |
exit 0; | |
} | |
sub spliting_file_process{ | |
my $input_file = $_[0]; #..... input file to be splited | |
my $split_files_directory = $_[1]; #..... directory where segment file should be stored. | |
my $segment_name = $_[2]; #..... name of the segments to be stored! | |
my $number_of_entries_in_your_dataset = `wc -l $input_file`; | |
$number_of_entries_in_your_dataset =~/(\d+)/is; | |
$number_of_entries_in_your_dataset = int($1); | |
my $number_split=int($number_of_entries_in_your_dataset/$max_processors); | |
my $split_Files=`split -l $number_split $input_file $split_files_directory/$segment_name`; | |
my @sub_file = (); | |
opendir(DIRECTORY, $split_files_directory) or die $!; | |
while (my $file = readdir(DIRECTORY)) { | |
if($file=~/^\./){next;} | |
push @sub_file, $file; | |
} | |
return @sub_file; | |
} | |
sub give_json_object { | |
my @arguments = @_; | |
my $input = $arguments[0]; | |
my $json_object_create; | |
{ | |
local $/; #........ Enable 'slurp' mode | |
$json_object_create = $input; | |
} | |
return decode_json($json_object_create); | |
} | |
sub write_json { | |
my @taken_list = @_; | |
my $json = $taken_list[0]; | |
my $output_file = $taken_list[1]; | |
my $file_location = $output_file; | |
open my $o_file, '>>', $file_location or die $!; | |
print $o_file "$json\n"; | |
close $o_file; | |
} | |
sub make_it_array { | |
my @arguments = @_; | |
my $data_to_check = $arguments[0]; | |
if($data_to_check){ | |
if ($data_to_check =~ /^ *$/) { | |
$data_to_check = [] | |
} | |
elsif ($data_to_check=~ /\[/){ | |
my @string_to_array = $data_to_check =~/'(.*?)'(?:\s*,|\s*])/g; | |
$data_to_check = \@string_to_array; | |
return $data_to_check; | |
} | |
else{ | |
$data_to_check = [$data_to_check]; | |
} | |
} | |
else{ | |
$data_to_check = []; | |
} | |
return $data_to_check; | |
} | |
sub merged_all_output_segments{ | |
my $final_output = $final_output_file; | |
opendir(DIRECTORY, $output_json_segment_directory) or die $!; | |
my $merged_output_file = `touch $final_output`; | |
while (my $output_segment_file = readdir(DIRECTORY)) { | |
my $combining = `cat $final_output $output_json_segment_directory/$output_segment_file >> $final_output`; | |
} | |
my $number_of_predicted_row = `wc -l $final_output`; | |
$number_of_predicted_row =~/(\d+)/is; | |
$number_of_predicted_row = int($1); | |
#print("\n => Total Row Transformed : $number_of_predicted_row.\n"); | |
return $number_of_predicted_row; | |
} | |
sub delete_all_temporary_directory{ | |
my $delelte_all_temporary_cache = `rm -rf $temporary_cache_file`; | |
} | |
sub merging_data{ | |
my @arguments = @_; | |
my $file = $arguments[0]; #........ input_file | |
my $output_file = $arguments[1]; #........ output_file | |
my @headers = @{$_[2]}; | |
my %hash; | |
my $csv = Text::CSV->new ({ | |
}); | |
my $sum = 0; | |
#my @headers; | |
open(my $data,"<:encoding(utf8)" ,$file) or die "Could not open '$file' $!\n"; | |
while (my $fields = $csv->getline( $data )) { | |
my $predicted = $fields->[4]; | |
$predicted =~ tr/A-Z/a-z/; | |
if($predicted eq "none"){ | |
next; | |
} | |
else{ | |
my @coloumns =(); | |
@coloumns = @$fields; | |
@hash{@headers} = @coloumns; | |
my $raw_input; | |
my $json = encode_json \%hash; | |
my $single_json = decode_json($json); | |
my $item_id = $single_json->{'item_id'}; | |
my $sts = $db_instance_to_match->db_get($item_id, $raw_input); #$raw_input is returned as string, compulsary to be converted to json_object. | |
if ($sts) { | |
print "Error: . $sts -------> $raw_input \n"; | |
next; | |
} | |
my $raw_single_json = give_json_object($raw_input); #send string to be converted to json object. | |
$single_json->{'predictions'}=make_it_array($single_json->{'predictions'}); | |
$single_json->{'unitsOfMeasure'}=make_it_array($single_json->{'unitsOfMeasure'}); | |
$single_json->{'title'} = $raw_single_json->{'title'}; | |
$single_json->{'short_description'} = $raw_single_json->{'short_description'}; | |
$single_json->{'long_description'} = $raw_single_json->{'long_description'}; | |
my $merged_json =$json_converter->ascii (1)->encode( $single_json ); | |
write_json($merged_json,$output_file); # write merged json to output file. | |
} # If not equal to none loop ending. | |
} | |
close $data; | |
} | |
sub number_of_line_in_input_files{ | |
my $json_file_for_total_line = $_[0]; | |
my $csv_file_for_total_line = $_[1]; | |
if(-f $json_file_for_total_line && -f $csv_file_for_total_line){ | |
my $number_of_line_in_json; | |
my $number_of_line_in_csv; | |
$number_of_line_in_json = `wc -l $json_file_for_total_line`; | |
$number_of_line_in_json =~/(\d+)/is; | |
$number_of_line_in_json = int($1); | |
$number_of_line_in_csv = `wc -l $csv_file_for_total_line`; | |
$number_of_line_in_csv =~/(\d+)/is; | |
$number_of_line_in_csv = int($1); | |
if($number_of_line_in_csv < 2 || $number_of_line_in_json == 0){ | |
# die colored( sprintf("\n\n\t~X Error : Either or Both Of the Input Files Probably Truncate!.\n"), 'white on_red' ), "\n"; | |
print("\n\n"); | |
die_and_print_error("\t~X Error : Either or Both Of the Input Files Probably Truncate!."); | |
} | |
else { return $number_of_line_in_json; } | |
} | |
else{ | |
print("\n\n"); | |
die_and_print_error("\t~X Error : Either or Both Of the Input Files Doesn't exist!"); | |
} | |
} | |
sub die_and_print_error{ | |
my $error_string = $_[0]; | |
die print colored( sprintf($error_string), 'white on_red' ), "\n\n"; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment