Skip to content

Instantly share code, notes, and snippets.

@adokoy001
Created January 21, 2017 05:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adokoy001/24d4185a860df6f14e4904f567adae91 to your computer and use it in GitHub Desktop.
Save adokoy001/24d4185a860df6f14e4904f567adae91 to your computer and use it in GitHub Desktop.
MapReduce::Framework::Simple
use strict;
use warnings;
use MapReduce::Framework::Simple;
use Data::Dumper;
use DBI;
my $mfs = MapReduce::Framework::Simple->new();
my $data_bk =
[
[
{
dsn => 'dbi:Pg:dbname=postgres;host=localhost;port=5432',
dbuser => 'postgres',
dbpass => 'postgres',
offset => 0,
limit => 10_000
}
,
'http://localhost:5000/eval'
],
[
{
dsn => 'dbi:Pg:dbname=postgres;host=localhost;port=5432',
dbuser => 'postgres',
dbpass => 'postgres',
offset => 10_000,
limit => 20_000
}
,
'http://localhost:5000/eval'
],
[
{
dsn => 'dbi:Pg:dbname=postgres;host=localhost;port=5432',
dbuser => 'postgres',
dbpass => 'postgres',
offset => 20_000,
limit => 30_000
}
,
'http://localhost:5000/eval'
]
];
my $mapper = sub {
my $input = shift;
require DBI;
my $output;
my $dbh = DBI->connect(
$input->{dsn},
$input->{dbuser},
$input->{dbpass}
);
my $sth = $dbh->prepare("select data_text from test order by serial_id offset ? limit ?");
$sth->execute($input->{offset},$input->{limit});
while(my @row = $sth->fetchrow_array){
my $tmp_doc = $row[0];
my @words = split('',$tmp_doc);
for(0 .. $#words){
if(defined($output->{$words[$_]})){
$output->{$words[$_]} += 1;
}else{
$output->{$words[$_]} = 1;
}
}
}
$sth->finish;
$dbh->disconnect;
return($output);
};
my $reducer = sub {
my $input = shift;
my $output;
for(0 .. $#$input){
my $tmp_input = $input->[$_];
foreach my $key (keys %$tmp_input){
if(defined($output->{$key})){
$output->{$key} += $tmp_input->{$key};
}else{
$output->{$key} = $tmp_input->{$key};
}
}
}
return($output);
};
my $result = $mfs->map_reduce(
$data_bk,
$mapper,
$reducer,
4,
{remote => 1}
);
foreach my $key (sort keys %$result){
print "$key : $result->{$key}\n";
}
use strict;
use warnings;
use MapReduce::Framework::Simple;
use Data::Dumper;
use List::Util;
my $mfs = MapReduce::Framework::Simple->new();
my $data = [
[[1 .. 100],'http://localhost:5000/eval'],
[[1 .. 100],'http://localhost:5000/eval'],
[[1 .. 100],'http://localhost:5000/eval']
];
my $mapper = sub {
my $input = shift;
my @output = sort {$a <=> $b} @$input;
return(\@output);
};
my $reducer = sub {
my $input = shift;
my @output;
for(0 .. $#$input){
push(@output,@{$input->[$_]});
}
@output = sort { $a <=> $b } @output;
return(\@output);
};
my $result = $mfs->map_reduce(
$data,
$mapper,
$reducer,
3,
{remote => 1}
);
print Dumper $result;
use strict;
use warnings;
use MapReduce::Framework::Simple;
use Data::Dumper;
my $mfs = MapReduce::Framework::Simple->new();
my $data;
push(@$data,['I am a student.','http://localhost:5000/eval']);
push(@$data,['I like Sushi.','http://localhost:5000/eval']);
push(@$data,['Who am I?','http://localhost:5000/eval']);
my $mapper = sub {
my $input = shift;
$input =~ s/\.|\,|\?|\!//eg;
my @words = split(/\s+/,$input);
my $output;
for(0 .. $#words){
if(defined($output->{$words[$_]})){
$output->{$words[$_]} += 1;
}else{
$output->{$words[$_]} = 1;
}
}
return($output);
};
my $reducer = sub {
my $input = shift;
my $output;
for(0 .. $#$input){
my $tmp_input = $input->[$_];
foreach my $key (keys %$tmp_input){
if(defined($output->{$key})){
$output->{$key} += $tmp_input->{$key};
}else{
$output->{$key} = $tmp_input->{$key};
}
}
}
return($output);
};
my $result = $mfs->map_reduce(
$data,
$mapper,
$reducer,
3,
);
print Dumper $result;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment