Skip to content

Instantly share code, notes, and snippets.

@fskale
Last active February 28, 2024 11:44
Show Gist options
  • Save fskale/8414795416933e2abfc44a9e8ac75adf to your computer and use it in GitHub Desktop.
Save fskale/8414795416933e2abfc44a9e8ac75adf to your computer and use it in GitHub Desktop.
Filebeat package crawler using Mojolicious non-blocking async I/O and a demontration how queuing works
#!/usr/bin/env perl
package Filebeat::Package::Crawler;
use Mojo::Base -base, -signatures, -async_await;
use Mojo::Collection qw(c);
use Mojo::ByteStream qw(b);
use Mojo::Util qw(decode encode secure_compare dumper);
use Mojo::JSON qw(decode_json encode_json);
use Digest::SHA qw(sha512);
our $VERSION = "1.00";
BEGIN {
#Windows is untested (feel free to do so and report back)
die(sprintf("Windows OS not supported !\n")) if $^O =~ /Win/;
#uncomment to enable IOLOOP debugging.
#$ENV{MOJO_CLIENT_DEBUG} = 1;
my $modules = {q{Net::DNS::Native} => {minver => q{0.20}}, Mojolicious => {minver => q{9.30}}};
my $ev = q{use EV 4.34};
#EV tested on recent osx(openbsd), linux and freebsd.
if ($^O =~ qr/(?:linux|darwin|freebsd|openbsd)/i) {
eval $ev;
die(sprintf("Please install module EV version %s or higher !\nError:%s\n", q{4.34}, $@)) if $@;
}
#Mojolicious and other modules should work with most IX OS !
foreach my $module (keys %{$modules}) {
my $minver = $modules->{$module}->{minver};
eval qq{use $module $minver};
die(sprintf("Please install module %s version %s or higher !\nError:%s\n", $module, $minver, $@)) if $@;
}
}
has ua => sub { Mojo::UserAgent->new };
has srv_url => sub {q{https://www.elastic.co/de/downloads/beats/filebeat}};
has search => sub {qr{<script id="__NEXT_DATA__" type="application/json">}};
has s_end => sub {qr{</script>$}};
has path => sub {q{/Users/dev/source/filebeat/download}};
has c => sub { Mojo::Collection->new };
has urls => sub { Mojo::Collection->new };
has debug => sub {1};
has conc => sub {2};
has store => sub { {} };
has urls_p => sub { {} };
async sub _find_script_p($self, $tx) {
my $err = $tx->error;
if (!$err || $err->{code}) {
$tx->result->dom->find('script')->each(
sub($w, $) {
if ($tx->result->is_success) {
push(@{$self->c}, b($w)->trim->to_string);
}
}
);
}
}
async sub _p_result($self) {
return unless $self->c->size;
my ($search, $s_end, $json) = ($self->search, $self->s_end, undef);
$self->c->each(
sub($w, $) {
if ($w =~ $search) {
if (($json) = $w =~ /^$search(.*)$s_end/) {
if (my $j_dec = decode_json(encode('UTF-8', $json))) {
my $c_urls = Mojo::Collection->new(@{$j_dec->{props}->{pageProps}->{productVersions}->[0]->[0]->{package}});
$c_urls->each(
sub($w, $) {
push(@{$self->urls}, {url => $_->{url}, hash => $_->{hash_url}});
}
);
}
}
}
}
);
}
async sub _p_download_hashes($self) {
return unless $self->urls->size;
my $sha = Digest::SHA->new('sha512');
my @urls_d = map { $_->{hash} } @{$self->urls};
my $_sub = undef;
$_sub = sub {
my @promises = ();
push(@promises, $self->ua->get_p(shift @urls_d)) for (1 .. $self->conc);
Mojo::Promise->all(@promises)->then(sub (@_promises) {
foreach my $tx (@_promises) {
if (ref $tx->[0]->res->content->asset eq 'Mojo::Asset::Memory') {
my $hash = b($tx->[0]->res->content->asset->slurp)->trim;
my ($sha512, $fname) = $hash =~ /^(\S+)(?:\s+)(\S+)$/g;
$self->store->{$sha512} = $fname;
my $path = sprintf("%s/%s", $self->path, $self->store->{$sha512});
my $url = $self->urls->map(sub { $_->{url} })->grep(qr{$fname})->join('')->to_string;
if (!-f $path) {
$self->urls_p->{$fname}->{url} = $url;
$self->urls_p->{$fname}->{path} = $path;
}
else {
$sha->addfile($path);
if (!secure_compare($sha512, $sha->hexdigest)) {
$self->urls_p->{$fname}->{url} = $url;
$self->urls_p->{$fname}->{path} = $path;
}
else {
printf(STDERR "File already downloaded: %s\n", sprintf("%s/%s", $self->path, $self->store->{$sha512}))
if $self->debug;
}
}
}
}
})->catch(sub ($err) {
warn "Something went wrong: $err";
})->finally(sub {
$_sub->() if $#urls_d >= 0
});
};
$_sub->();
}
async sub _p_download_files($self) {
return unless %{$self->urls_p};
my $sha = Digest::SHA->new('sha512');
my @urls_d = map { $self->urls_p->{$_}->{url} } keys %{$self->urls_p};
my $_sub = undef;
$_sub = sub {
my @promises = ();
push(@promises, $self->ua->get_p(shift @urls_d)) for (1 .. $self->conc);
Mojo::Promise->all(@promises)->then(sub (@promises) {
foreach my $tx (@promises) {
if (ref $tx->[0]->res->content->asset eq 'Mojo::Asset::File') {
my $asset = $tx->[0]->res->content->asset;
my $hash = $sha->add($asset->slurp);
my $digest = $sha->hexdigest;
if (my $fname = $self->store->{$digest}) {
my $path = sprintf("%s/%s", $self->path, $fname);
printf(STDERR "Found new hash for file: %s\n", $self->store->{$digest}) if $self->debug;
$asset->move_to($path);
}
else {
printf(STDERR "File not found for digest: %s\n", $digest) if $self->debug;
}
}
}
})->finally(sub {
$_sub->() if $#urls_d >= 0;
})->catch(sub ($err) {
warn "Something went wrong: $err";
});
};
$_sub->();
}
async sub _process($self, $url) {
$self->ua->get_p($url)->then(sub($tx) {
$self->_find_script_p($tx);
$self->_p_result();
return $self->_p_download_hashes();
})->then(sub {
return $self->_p_download_files();
})->catch(sub ($err) {
warn "Something went wrong: $err";
})->wait;
}
async sub process($self) {
await $self->_process($self->srv_url);
}
package main;
use Mojo::Base -strict;
Filebeat::Package::Crawler->new->process();
1;
@fskale
Copy link
Author

fskale commented Feb 20, 2024

Queuing mech -> $self->conc defaults to "2"

@fskale
Copy link
Author

fskale commented Feb 20, 2024

Change self->patch to valid one ;-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment