Skip to content

Instantly share code, notes, and snippets.

@gr8Adakron
Created December 27, 2017 07:46
Show Gist options
  • Save gr8Adakron/66c7c958d8dde1fd6b10180437757fad to your computer and use it in GitHub Desktop.
Save gr8Adakron/66c7c958d8dde1fd6b10180437757fad to your computer and use it in GitHub Desktop.
Its a perl script which on-time creates a berkeley-DB with the JSON IDs as primary_key. Whle forming a output JSON from input CSV, it searches the extra attributes of CSV in JSON-DB(berkeley-DB){Just like Vlookup}. the only difference is it takes, creates, utilize everything using parallel processing{reading as well writing} and it is lightening…
#Author : gr8_Adakron.
use Term::ANSIColor;
use Time::HiRes qw(time);
use strict;
use warnings;
use Carp;
use POSIX ":sys_wait_h";
use Data::Dumper;
use JSON;
use lib '/junk/lib';
use JSON;
use Text::CSV;
use forks::BerkeleyDB;
use BerkeleyDB;
use threads::shared;
use BerkeleyDB::Hash;
use Time::HiRes qw(gettimeofday tv_interval);
use constant DBPATH => 'raw_json_hash.db'; #.... hash data base file name.
our $max_processors = 1; #.... initialize max_number_of_processor_on_cpu = 1.
my $json_converter = JSON->new->allow_nonref;
my $start = time;
$|++;
# print backtrace info
sub bt {
croak "DIED ($.)\n"; }
chomp(my $cpu_count = `grep -c -P '^processor\\s+:' /proc/cpuinfo`);
#-------------> Arguments .
my $input_file = $ARGV[0] or die "Error : 1st argument is Model_Output.CSV file\n";
my $raw_input_file = $ARGV[1] or die "Error : 2nd argument is Raw_Data_Input_File.JSON file\n";
our $final_output_file = $ARGV[2] or die "Error : 3rd argument is Meged_Output.JSON file\n";
#-------------> global variables.
my ($no_of_json_row) = number_of_line_in_input_files($raw_input_file,$input_file);
if($no_of_json_row>5000){
$max_processors = int($cpu_count); #.... max_number_of_processor_on_cpu
}
my $time_stamp = time;
our $temporary_cache_file = "CSVtoJSON_cache$time_stamp";
our $db_directory = "$temporary_cache_file/databases";
our $csv_split_files_directory = "$temporary_cache_file/csv_segment";
our $segment_of_csv_name = "segment";
our $output_json_segment_directory = "$temporary_cache_file/output_segment";
our $final_output_directory = "$temporary_cache_file/merged";
our $raw_json_split_files_directory = "$temporary_cache_file/inputs_json_segments";
our $segment_of_raw_json_name = "raw_json_child";
our $completed_mapped_data = 0;
my $headers = `head -1 $input_file`;
my @csv_headers = $headers =~/([a-z_]+)/ig;
#-------------> Creating Segment Directories.
my $create_temporary_cache = `mkdir $temporary_cache_file`;
my $create_csv_split_directory = `mkdir $csv_split_files_directory`;
my $create_json_split_directory = `mkdir $output_json_segment_directory`;
my $create_raw_json_split_directory = `mkdir $raw_json_split_files_directory`;
my $create_db_directory = `mkdir $db_directory`;
#..... Main : starts .....#
#-------------> Mapping Process .
print("\n ~~>> Number of Core Avalaible : $cpu_count.");
print("\n ~~>> Processing... ");
print("\n >>__________________________________________ Mapping Json... ____________________________________________<<\n\n");
#-------------> Mapping Section.
my @raw_json_sub_file = spliting_file_process($raw_input_file,$raw_json_split_files_directory,$segment_of_raw_json_name);
my $number_raw_json_file = int($#raw_json_sub_file)+1;
$SIG{__WARN__} = \&bt; #..... install signal handlers
for my $iteration (0..$#raw_json_sub_file){
simultaneous_mapping_hash($raw_json_sub_file[$iteration],$iteration+1);
}
for (1..$number_raw_json_file) { my $pid = wait; }
for my $iteration (0..$#raw_json_sub_file){
my $row_mapped = `wc -l $raw_json_split_files_directory/$raw_json_sub_file[$iteration].json`;
$row_mapped =~/(\d+)/is;
$row_mapped = int($1);
$completed_mapped_data = $completed_mapped_data + $row_mapped;
}
my $duration_mapped_seconds = sprintf("%.2f", time - $start);
my $duration_mapped_minutes = sprintf("%.2f", $duration_mapped_seconds/60);
#print colored( sprintf("hello"), 'magenta' ), "\n";
print colored( sprintf("\n ~~>> Total Row Mapped : $completed_mapped_data in $duration_mapped_minutes min . (i.e: $duration_mapped_seconds sec)"), 'red');
print("\n\n >>__________________________________________ Matching... _____________________________________________<<\n");
#-------------> Reading CSV and threading.
my $env = get_environment(); #........ db enviroment init
our $db_instance_to_match = formed_db_instance($env);
my @sub_file = spliting_file_process($input_file,$csv_split_files_directory,$segment_of_csv_name);
our $num_of_threads = int($#sub_file)+1;
print("\n ~~> Segment File created in the Segment folder :");
print("\n ==>> [ @sub_file ]");
print("\n -- -------- -------- ---------- --------- ---------");
print("\n ~~> Number of Threads, Formed : $num_of_threads");
print("\n -- -------- -------- ---------- --------- ---------");
print("\n ~~>> Removing None, process start... threads running... ");
print("\n -- -------- -------- ---------- --------- ---------\n\n");
my @threads = initThreads();
for my $iteration (0..$#threads){
if($sub_file[$iteration] eq 'segmentaa'){
my $removing_header_from_csv = `sed -i '1d' $csv_split_files_directory/$sub_file[$iteration]`;
}
$threads[$iteration] = threads->create(\&doOperation,$sub_file[$iteration],\@csv_headers);
}
foreach(@threads){
$_->join();
}
#-------------> Post Processing Function.
print("\n\n >>_______________________________ Merging Output Segment to $final_output_file ... _________________________________<<\n\n");
my $number_of_prediction = merged_all_output_segments(); #.... merging all output segments to output file.
delete_all_temporary_directory(); #.... deleting all temporary folder for killing threads.
#-------------> Program Time Calculation.
my $duration_seconds = sprintf("%.2f", time - $start);
my $duration_minutes = sprintf("%.2f", $duration_seconds/60);
print("\n -- -------- -------- ---------- --------- ---------");
print colored( sprintf("\n ~~>> Total Prediction Transformed : $number_of_prediction. "), 'red');
print("\n -- -------- -------- ---------- --------- ---------");
print("\n ~~>> Completed.(Removing None Completed). ");
print colored( sprintf("\n ~~>> Execution time: $duration_minutes . minutes . (in $duration_seconds . sec)\n"), 'red');
print(" ~~>> Successfully Ended, Good Bye!\n");
#..... Main : End .....#
#-----------> subroutines zone
#-ErrFile => $dberr,
sub get_environment {
BerkeleyDB::Env->new (
-Home => "$db_directory",
-Cachesize => 4 * 1024 * 1024,
-Flags => DB_CREATE |
DB_INIT_MPOOL |
DB_INIT_CDB,
) or die
"Couldn't initialize BerkeleyDB environment: $BerkeleyDB::Error";
}
sub formed_db_instance {
my $env = $_[0];
my $db = new BerkeleyDB::Hash(
-Filename => DBPATH,
-Env => $env,
-Flags => DB_CREATE())
or croak "Failed to open DBPATH: '$!' ($BerkeleyDB::Error)\n";
return $db;
}
sub initThreads{
my @initThreads;
for(my $i = 1;$i<=$num_of_threads;$i++){
push(@initThreads,$i);
}
return @initThreads;
}
sub mapping_data{
my $segment_to_be_mapped = $_[0];
my $db_instance_created = $_[1];
my $mapping_index = 0;
open my $fh, '<', $segment_to_be_mapped or die "$segment_to_be_mapped: $!";
while( <$fh> ) {
$mapping_index = $.;
my $raw_single_json = give_json_object($_);
$db_instance_created->db_put($raw_single_json->{'item_id'}, $_);
}
close $fh;
return $mapping_index
}
sub doOperation{
my @arguments = @_;
my @input_headers = @{$_[1]};
my $input_file = "$csv_split_files_directory/$arguments[0]";
my $output_file = "$output_json_segment_directory/output_$arguments[0].json";
my $id = threads->tid(); #......... thread id.
merging_data($input_file,$output_file,\@input_headers);
print " ==>> Thread $id, Executing $input_file.csv, DONE! ..................................................ok \n";
threads->exit(); #........ Exit the thread
}
sub simultaneous_mapping_hash {
my $id = $_[1];
my $pid = fork;
croak "Could not fork: '$!'\n" unless (defined($pid));
return unless ($pid == 0);
my $env = get_environment(); #........ db enviroment init
my $db_instance = formed_db_instance($env); #........ db instance init
my $one_thread_mapped_total = 0;
my $input_file = "$raw_json_split_files_directory/$_[0].json";
my $convert_to_json_extension = `mv $raw_json_split_files_directory/$_[0] $input_file`;
my $segments_mapped = mapping_data($input_file,$db_instance);
$one_thread_mapped_total = $one_thread_mapped_total + $segments_mapped;
print " ==>> Thread $id, Executing $input_file, DONE! = $one_thread_mapped_total .................ok \n";
exit 0;
}
sub spliting_file_process{
my $input_file = $_[0]; #..... input file to be splited
my $split_files_directory = $_[1]; #..... directory where segment file should be stored.
my $segment_name = $_[2]; #..... name of the segments to be stored!
my $number_of_entries_in_your_dataset = `wc -l $input_file`;
$number_of_entries_in_your_dataset =~/(\d+)/is;
$number_of_entries_in_your_dataset = int($1);
my $number_split=int($number_of_entries_in_your_dataset/$max_processors);
my $split_Files=`split -l $number_split $input_file $split_files_directory/$segment_name`;
my @sub_file = ();
opendir(DIRECTORY, $split_files_directory) or die $!;
while (my $file = readdir(DIRECTORY)) {
if($file=~/^\./){next;}
push @sub_file, $file;
}
return @sub_file;
}
sub give_json_object {
my @arguments = @_;
my $input = $arguments[0];
my $json_object_create;
{
local $/; #........ Enable 'slurp' mode
$json_object_create = $input;
}
return decode_json($json_object_create);
}
sub write_json {
my @taken_list = @_;
my $json = $taken_list[0];
my $output_file = $taken_list[1];
my $file_location = $output_file;
open my $o_file, '>>', $file_location or die $!;
print $o_file "$json\n";
close $o_file;
}
sub make_it_array {
my @arguments = @_;
my $data_to_check = $arguments[0];
if($data_to_check){
if ($data_to_check =~ /^ *$/) {
$data_to_check = []
}
elsif ($data_to_check=~ /\[/){
my @string_to_array = $data_to_check =~/'(.*?)'(?:\s*,|\s*])/g;
$data_to_check = \@string_to_array;
return $data_to_check;
}
else{
$data_to_check = [$data_to_check];
}
}
else{
$data_to_check = [];
}
return $data_to_check;
}
sub merged_all_output_segments{
my $final_output = $final_output_file;
opendir(DIRECTORY, $output_json_segment_directory) or die $!;
my $merged_output_file = `touch $final_output`;
while (my $output_segment_file = readdir(DIRECTORY)) {
my $combining = `cat $final_output $output_json_segment_directory/$output_segment_file >> $final_output`;
}
my $number_of_predicted_row = `wc -l $final_output`;
$number_of_predicted_row =~/(\d+)/is;
$number_of_predicted_row = int($1);
#print("\n => Total Row Transformed : $number_of_predicted_row.\n");
return $number_of_predicted_row;
}
sub delete_all_temporary_directory{
my $delelte_all_temporary_cache = `rm -rf $temporary_cache_file`;
}
sub merging_data{
my @arguments = @_;
my $file = $arguments[0]; #........ input_file
my $output_file = $arguments[1]; #........ output_file
my @headers = @{$_[2]};
my %hash;
my $csv = Text::CSV->new ({
});
my $sum = 0;
#my @headers;
open(my $data,"<:encoding(utf8)" ,$file) or die "Could not open '$file' $!\n";
while (my $fields = $csv->getline( $data )) {
my $predicted = $fields->[4];
$predicted =~ tr/A-Z/a-z/;
if($predicted eq "none"){
next;
}
else{
my @coloumns =();
@coloumns = @$fields;
@hash{@headers} = @coloumns;
my $raw_input;
my $json = encode_json \%hash;
my $single_json = decode_json($json);
my $item_id = $single_json->{'item_id'};
my $sts = $db_instance_to_match->db_get($item_id, $raw_input); #$raw_input is returned as string, compulsary to be converted to json_object.
if ($sts) {
print "Error: . $sts -------> $raw_input \n";
next;
}
my $raw_single_json = give_json_object($raw_input); #send string to be converted to json object.
$single_json->{'predictions'}=make_it_array($single_json->{'predictions'});
$single_json->{'unitsOfMeasure'}=make_it_array($single_json->{'unitsOfMeasure'});
$single_json->{'title'} = $raw_single_json->{'title'};
$single_json->{'short_description'} = $raw_single_json->{'short_description'};
$single_json->{'long_description'} = $raw_single_json->{'long_description'};
my $merged_json =$json_converter->ascii (1)->encode( $single_json );
write_json($merged_json,$output_file); # write merged json to output file.
} # If not equal to none loop ending.
}
close $data;
}
sub number_of_line_in_input_files{
my $json_file_for_total_line = $_[0];
my $csv_file_for_total_line = $_[1];
if(-f $json_file_for_total_line && -f $csv_file_for_total_line){
my $number_of_line_in_json;
my $number_of_line_in_csv;
$number_of_line_in_json = `wc -l $json_file_for_total_line`;
$number_of_line_in_json =~/(\d+)/is;
$number_of_line_in_json = int($1);
$number_of_line_in_csv = `wc -l $csv_file_for_total_line`;
$number_of_line_in_csv =~/(\d+)/is;
$number_of_line_in_csv = int($1);
if($number_of_line_in_csv < 2 || $number_of_line_in_json == 0){
# die colored( sprintf("\n\n\t~X Error : Either or Both Of the Input Files Probably Truncate!.\n"), 'white on_red' ), "\n";
print("\n\n");
die_and_print_error("\t~X Error : Either or Both Of the Input Files Probably Truncate!.");
}
else { return $number_of_line_in_json; }
}
else{
print("\n\n");
die_and_print_error("\t~X Error : Either or Both Of the Input Files Doesn't exist!");
}
}
sub die_and_print_error{
my $error_string = $_[0];
die print colored( sprintf($error_string), 'white on_red' ), "\n\n";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment