Last active
February 28, 2024 11:44
-
-
Save fskale/8414795416933e2abfc44a9e8ac75adf to your computer and use it in GitHub Desktop.
Filebeat package crawler using Mojolicious non-blocking async I/O and a demontration how queuing works
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env perl | |
package Filebeat::Package::Crawler; | |
use Mojo::Base -base, -signatures, -async_await; | |
use Mojo::Collection qw(c); | |
use Mojo::ByteStream qw(b); | |
use Mojo::Util qw(decode encode secure_compare dumper); | |
use Mojo::JSON qw(decode_json encode_json); | |
use Digest::SHA qw(sha512); | |
our $VERSION = "1.00"; | |
BEGIN { | |
#Windows is untested (feel free to do so and report back) | |
die(sprintf("Windows OS not supported !\n")) if $^O =~ /Win/; | |
#uncomment to enable IOLOOP debugging. | |
#$ENV{MOJO_CLIENT_DEBUG} = 1; | |
my $modules = {q{Net::DNS::Native} => {minver => q{0.20}}, Mojolicious => {minver => q{9.30}}}; | |
my $ev = q{use EV 4.34}; | |
#EV tested on recent osx(openbsd), linux and freebsd. | |
if ($^O =~ qr/(?:linux|darwin|freebsd|openbsd)/i) { | |
eval $ev; | |
die(sprintf("Please install module EV version %s or higher !\nError:%s\n", q{4.34}, $@)) if $@; | |
} | |
#Mojolicious and other modules should work with most IX OS ! | |
foreach my $module (keys %{$modules}) { | |
my $minver = $modules->{$module}->{minver}; | |
eval qq{use $module $minver}; | |
die(sprintf("Please install module %s version %s or higher !\nError:%s\n", $module, $minver, $@)) if $@; | |
} | |
} | |
has ua => sub { Mojo::UserAgent->new }; | |
has srv_url => sub {q{https://www.elastic.co/de/downloads/beats/filebeat}}; | |
has search => sub {qr{<script id="__NEXT_DATA__" type="application/json">}}; | |
has s_end => sub {qr{</script>$}}; | |
has path => sub {q{/Users/dev/source/filebeat/download}}; | |
has c => sub { Mojo::Collection->new }; | |
has urls => sub { Mojo::Collection->new }; | |
has debug => sub {1}; | |
has conc => sub {2}; | |
has store => sub { {} }; | |
has urls_p => sub { {} }; | |
async sub _find_script_p($self, $tx) { | |
my $err = $tx->error; | |
if (!$err || $err->{code}) { | |
$tx->result->dom->find('script')->each( | |
sub($w, $) { | |
if ($tx->result->is_success) { | |
push(@{$self->c}, b($w)->trim->to_string); | |
} | |
} | |
); | |
} | |
} | |
async sub _p_result($self) { | |
return unless $self->c->size; | |
my ($search, $s_end, $json) = ($self->search, $self->s_end, undef); | |
$self->c->each( | |
sub($w, $) { | |
if ($w =~ $search) { | |
if (($json) = $w =~ /^$search(.*)$s_end/) { | |
if (my $j_dec = decode_json(encode('UTF-8', $json))) { | |
my $c_urls = Mojo::Collection->new(@{$j_dec->{props}->{pageProps}->{productVersions}->[0]->[0]->{package}}); | |
$c_urls->each( | |
sub($w, $) { | |
push(@{$self->urls}, {url => $_->{url}, hash => $_->{hash_url}}); | |
} | |
); | |
} | |
} | |
} | |
} | |
); | |
} | |
async sub _p_download_hashes($self) { | |
return unless $self->urls->size; | |
my $sha = Digest::SHA->new('sha512'); | |
my @urls_d = map { $_->{hash} } @{$self->urls}; | |
my $_sub = undef; | |
$_sub = sub { | |
my @promises = (); | |
push(@promises, $self->ua->get_p(shift @urls_d)) for (1 .. $self->conc); | |
Mojo::Promise->all(@promises)->then(sub (@_promises) { | |
foreach my $tx (@_promises) { | |
if (ref $tx->[0]->res->content->asset eq 'Mojo::Asset::Memory') { | |
my $hash = b($tx->[0]->res->content->asset->slurp)->trim; | |
my ($sha512, $fname) = $hash =~ /^(\S+)(?:\s+)(\S+)$/g; | |
$self->store->{$sha512} = $fname; | |
my $path = sprintf("%s/%s", $self->path, $self->store->{$sha512}); | |
my $url = $self->urls->map(sub { $_->{url} })->grep(qr{$fname})->join('')->to_string; | |
if (!-f $path) { | |
$self->urls_p->{$fname}->{url} = $url; | |
$self->urls_p->{$fname}->{path} = $path; | |
} | |
else { | |
$sha->addfile($path); | |
if (!secure_compare($sha512, $sha->hexdigest)) { | |
$self->urls_p->{$fname}->{url} = $url; | |
$self->urls_p->{$fname}->{path} = $path; | |
} | |
else { | |
printf(STDERR "File already downloaded: %s\n", sprintf("%s/%s", $self->path, $self->store->{$sha512})) | |
if $self->debug; | |
} | |
} | |
} | |
} | |
})->catch(sub ($err) { | |
warn "Something went wrong: $err"; | |
})->finally(sub { | |
$_sub->() if $#urls_d >= 0 | |
}); | |
}; | |
$_sub->(); | |
} | |
async sub _p_download_files($self) { | |
return unless %{$self->urls_p}; | |
my $sha = Digest::SHA->new('sha512'); | |
my @urls_d = map { $self->urls_p->{$_}->{url} } keys %{$self->urls_p}; | |
my $_sub = undef; | |
$_sub = sub { | |
my @promises = (); | |
push(@promises, $self->ua->get_p(shift @urls_d)) for (1 .. $self->conc); | |
Mojo::Promise->all(@promises)->then(sub (@promises) { | |
foreach my $tx (@promises) { | |
if (ref $tx->[0]->res->content->asset eq 'Mojo::Asset::File') { | |
my $asset = $tx->[0]->res->content->asset; | |
my $hash = $sha->add($asset->slurp); | |
my $digest = $sha->hexdigest; | |
if (my $fname = $self->store->{$digest}) { | |
my $path = sprintf("%s/%s", $self->path, $fname); | |
printf(STDERR "Found new hash for file: %s\n", $self->store->{$digest}) if $self->debug; | |
$asset->move_to($path); | |
} | |
else { | |
printf(STDERR "File not found for digest: %s\n", $digest) if $self->debug; | |
} | |
} | |
} | |
})->finally(sub { | |
$_sub->() if $#urls_d >= 0; | |
})->catch(sub ($err) { | |
warn "Something went wrong: $err"; | |
}); | |
}; | |
$_sub->(); | |
} | |
async sub _process($self, $url) { | |
$self->ua->get_p($url)->then(sub($tx) { | |
$self->_find_script_p($tx); | |
$self->_p_result(); | |
return $self->_p_download_hashes(); | |
})->then(sub { | |
return $self->_p_download_files(); | |
})->catch(sub ($err) { | |
warn "Something went wrong: $err"; | |
})->wait; | |
} | |
async sub process($self) { | |
await $self->_process($self->srv_url); | |
} | |
package main; | |
use Mojo::Base -strict; | |
Filebeat::Package::Crawler->new->process(); | |
1; |
Change self->patch to valid one ;-)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Queuing mech -> $self->conc defaults to "2"