Created
January 21, 2017 05:51
-
-
Save adokoy001/24d4185a860df6f14e4904f567adae91 to your computer and use it in GitHub Desktop.
MapReduce::Framework::Simple
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use strict; | |
use warnings; | |
use MapReduce::Framework::Simple; | |
use Data::Dumper; | |
use DBI; | |
my $mfs = MapReduce::Framework::Simple->new(); | |
my $data_bk = | |
[ | |
[ | |
{ | |
dsn => 'dbi:Pg:dbname=postgres;host=localhost;port=5432', | |
dbuser => 'postgres', | |
dbpass => 'postgres', | |
offset => 0, | |
limit => 10_000 | |
} | |
, | |
'http://localhost:5000/eval' | |
], | |
[ | |
{ | |
dsn => 'dbi:Pg:dbname=postgres;host=localhost;port=5432', | |
dbuser => 'postgres', | |
dbpass => 'postgres', | |
offset => 10_000, | |
limit => 20_000 | |
} | |
, | |
'http://localhost:5000/eval' | |
], | |
[ | |
{ | |
dsn => 'dbi:Pg:dbname=postgres;host=localhost;port=5432', | |
dbuser => 'postgres', | |
dbpass => 'postgres', | |
offset => 20_000, | |
limit => 30_000 | |
} | |
, | |
'http://localhost:5000/eval' | |
] | |
]; | |
my $mapper = sub { | |
my $input = shift; | |
require DBI; | |
my $output; | |
my $dbh = DBI->connect( | |
$input->{dsn}, | |
$input->{dbuser}, | |
$input->{dbpass} | |
); | |
my $sth = $dbh->prepare("select data_text from test order by serial_id offset ? limit ?"); | |
$sth->execute($input->{offset},$input->{limit}); | |
while(my @row = $sth->fetchrow_array){ | |
my $tmp_doc = $row[0]; | |
my @words = split('',$tmp_doc); | |
for(0 .. $#words){ | |
if(defined($output->{$words[$_]})){ | |
$output->{$words[$_]} += 1; | |
}else{ | |
$output->{$words[$_]} = 1; | |
} | |
} | |
} | |
$sth->finish; | |
$dbh->disconnect; | |
return($output); | |
}; | |
my $reducer = sub { | |
my $input = shift; | |
my $output; | |
for(0 .. $#$input){ | |
my $tmp_input = $input->[$_]; | |
foreach my $key (keys %$tmp_input){ | |
if(defined($output->{$key})){ | |
$output->{$key} += $tmp_input->{$key}; | |
}else{ | |
$output->{$key} = $tmp_input->{$key}; | |
} | |
} | |
} | |
return($output); | |
}; | |
my $result = $mfs->map_reduce( | |
$data_bk, | |
$mapper, | |
$reducer, | |
4, | |
{remote => 1} | |
); | |
foreach my $key (sort keys %$result){ | |
print "$key : $result->{$key}\n"; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use strict; | |
use warnings; | |
use MapReduce::Framework::Simple; | |
use Data::Dumper; | |
use List::Util; | |
my $mfs = MapReduce::Framework::Simple->new(); | |
my $data = [ | |
[[1 .. 100],'http://localhost:5000/eval'], | |
[[1 .. 100],'http://localhost:5000/eval'], | |
[[1 .. 100],'http://localhost:5000/eval'] | |
]; | |
my $mapper = sub { | |
my $input = shift; | |
my @output = sort {$a <=> $b} @$input; | |
return(\@output); | |
}; | |
my $reducer = sub { | |
my $input = shift; | |
my @output; | |
for(0 .. $#$input){ | |
push(@output,@{$input->[$_]}); | |
} | |
@output = sort { $a <=> $b } @output; | |
return(\@output); | |
}; | |
my $result = $mfs->map_reduce( | |
$data, | |
$mapper, | |
$reducer, | |
3, | |
{remote => 1} | |
); | |
print Dumper $result; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use strict; | |
use warnings; | |
use MapReduce::Framework::Simple; | |
use Data::Dumper; | |
my $mfs = MapReduce::Framework::Simple->new(); | |
my $data; | |
push(@$data,['I am a student.','http://localhost:5000/eval']); | |
push(@$data,['I like Sushi.','http://localhost:5000/eval']); | |
push(@$data,['Who am I?','http://localhost:5000/eval']); | |
my $mapper = sub { | |
my $input = shift; | |
$input =~ s/\.|\,|\?|\!//eg; | |
my @words = split(/\s+/,$input); | |
my $output; | |
for(0 .. $#words){ | |
if(defined($output->{$words[$_]})){ | |
$output->{$words[$_]} += 1; | |
}else{ | |
$output->{$words[$_]} = 1; | |
} | |
} | |
return($output); | |
}; | |
my $reducer = sub { | |
my $input = shift; | |
my $output; | |
for(0 .. $#$input){ | |
my $tmp_input = $input->[$_]; | |
foreach my $key (keys %$tmp_input){ | |
if(defined($output->{$key})){ | |
$output->{$key} += $tmp_input->{$key}; | |
}else{ | |
$output->{$key} = $tmp_input->{$key}; | |
} | |
} | |
} | |
return($output); | |
}; | |
my $result = $mfs->map_reduce( | |
$data, | |
$mapper, | |
$reducer, | |
3, | |
); | |
print Dumper $result; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment