Last active
September 4, 2019 10:18
-
-
Save fskale/23fe035b271e73135cb63cc9acae8a9b to your computer and use it in GitHub Desktop.
S3 incremental Backup using Mojolicious Non-Blocking I/O (concurrent) (Check Script for required modules) supports Amazon and Hitachi S3 storages (no warranty !)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env perl | |
package S3::Backup; | |
use Mojo::Base -base; | |
use Carp qw(carp croak); | |
use POSIX qw(strftime ceil floor); | |
use Fcntl qw(O_WRONLY O_APPEND O_CREAT); | |
use Fcntl ':mode'; | |
#set utf8 output encoding explicitly | |
binmode( STDOUT, ":encoding(utf8)" ); | |
BEGIN { | |
my $mods_mojo = { | |
q{Mojo::Collection} => ['c'], | |
q{Mojo::File} => ['path'], | |
q{Mojo::Log} => [], | |
q{Mojo::Util} => [ 'decode', 'dumper', 'encode', 'secure_compare', 'url_escape', 'url_unescape' ], | |
q{Mojo::JSON} => ['decode_json'] | |
}; | |
my $mods_req = { | |
q{XML::Simple} => { import => [], minver => '2.20' }, | |
q{HTTP::Request} => { import => [], minver => '6.00' }, | |
q{MIME::Types} => { import => [], minver => '2.09' }, | |
q{Number::Bytes::Human} => { import => ['format_bytes'], minver => '0.09' }, | |
q{IO::Socket::SSL} => { import => [], minver => '2.0.49' }, | |
q{Net::DNS::Native} => { import => [], minver => '0.15' }, | |
q{EV} => { import => [], minver => '4.22' }, | |
q{Net::Amazon::Signature::V4} => { import => [], minver => '0.18' }, | |
q{CryptX} => { import => [], minver => '0.060' }, | |
q{Crypt::Digest::MD5} => { import => [], minver => '0.060' }, | |
q{DateTime::Format::ISO8601} => { import => [], minver => '0.08' } | |
}; | |
#check for necessary version dependencies | |
#perl Version | |
die( sprintf("Perl version >= 5.20.2 required !\n") ) unless $^V ge 'v5.20.2'; | |
#windows ? | |
warn( sprintf("Windows officially not supported anymore !\n") ) if $^O eq 'windows'; | |
#Mojolicious Sub Modules | |
while ( my ( $key, $value ) = each %{$mods_mojo} ) { | |
next if ref $value ne 'ARRAY'; | |
if ( @{$value} ) { | |
eval qq{use $key qw(@{$value})}; | |
die( sprintf( "Cannot load Mojolicious Submodule: %s\nError: %s", $key, $@ ) ) if $@; | |
} | |
else { | |
eval qq{use $key}; | |
die( sprintf( "Cannot load Mojolicious Submodule: %s\nError: %s", $key, $@ ) ) if $@; | |
} | |
} | |
#third party modules | |
while ( my ( $key, $value ) = each %{$mods_req} ) { | |
next if ref $value ne 'HASH'; | |
if ( @{ $value->{import} } ) { | |
next if ref $value->{import} ne 'ARRAY' or !$value->{minver}; | |
eval qq{use $key $value->{minver} qw(@{$value->{import}})}; | |
die( sprintf( "Cannot load Module: %s\nError: %s", $key, $@ ) ) if $@; | |
} | |
else { | |
eval qq{use $key $value->{minver}}; | |
die( sprintf( "Cannot load Module: %s\nError: %s", $key, $@ ) ) if $@; | |
} | |
} | |
} | |
#Globals for CARP | |
$Carp::MaxArgLen = 0; | |
#args passed via commandline | |
my @args | |
= (qw(region endpoint bucket id secret directory list max concurrency timeout debug debug2 log dircfg restore)); | |
has [@args] => sub { }; | |
#UserAgent Object and ua options | |
has uagent => sub {q{S3Backup Version/1.0}}; | |
has _timeout => sub { shift->timeout // 15 }; | |
has ua => sub { | |
my $self = shift; | |
my $timeout = $self->_timeout; | |
my $uagent = $self->uagent; | |
#strict cert checking default as of version 7.80 ! (so disabling it) ! | |
my $ua = Mojo::UserAgent->new->request_timeout($timeout)->connect_timeout($timeout)->inactivity_timeout($timeout); | |
#By default Mojo::UserAgent will now reject all invalid TLS certificates ! | |
$ua->insecure(1) if $Mojolicious::VERSION ge '7.80'; | |
$ua->transactor( Mojo::UserAgent::Transactor->new )->transactor->name($uagent); | |
$ua->on( | |
start => sub { | |
my ( $ua, $tx ) = @_; | |
$self->_p_out( sprintf( "Starting transaction: %s\n", $tx->req->headers->host ) ) if $self->debug; | |
} | |
); | |
$ua; | |
}; | |
#general attributes | |
has url => sub {q{}}; | |
has max_keys => sub {100}; | |
has is_trunc => sub {0}; | |
has marker => sub {q{}}; | |
has service => sub {q{s3}}; | |
has def_endpoint => sub {q{s3.amazonaws.com}}; | |
has def_region => sub {q{us-east-1}}; | |
has server_type => sub {q{default}}; | |
has uploader => sub {q{S3-Backup}}; | |
has total => sub { | |
{ length => 0, objects => 0 } | |
}; | |
has max_size => sub {5368709120}; | |
has part_size => sub {8388608}; | |
#Object Collections | |
has [qw(objects objects_s objects_m objects_r)] => sub { c() }; | |
#Object data | |
has obj_data => sub { {} }; | |
has max_size => sub {5368709120}; | |
has part_size => sub {8388608}; | |
has shutdown => sub {0}; | |
#for shutdown signal handler, store the uploadid and the object | |
has c_upload => sub { ( q{}, q{} ) }; | |
#directory config specific | |
has _dircfg => sub {q{/etc/s3-backup/directories.conf}}; | |
has _dircfg_base => sub {q{/var/shares}}; | |
#logging enabled ? | |
has _log => sub {0}; | |
#XML Object | |
has keyAttr => sub {q{}}; | |
has xs => sub { | |
XML::Simple->new( | |
ForceArray => 0, | |
KeepRoot => 1, | |
SuppressEmpty => 1, | |
KeyAttr => shift->keyAttr | |
); | |
}; | |
#concurrent non-blocking connections ! | |
has conc => sub {5}; | |
#error Handling | |
sub _whowasi { | |
my $self = shift; | |
( caller(2) )[3]; | |
} | |
sub _error { | |
my ( $self, $msg ) = @_; | |
$self->_log | |
? ( $self->_log_to( sprintf( "%s at %s", $msg ? $msg : 'unknown error', $self->_whowasi ) ) | |
and croak( sprintf( "%s at %s\n", $msg ? $msg : 'unknown error', $self->_whowasi ) ) ) | |
: croak( sprintf( "%s at %s\n", $msg ? $msg : 'unknown error', $self->_whowasi ) ); | |
} | |
sub _warning { | |
my ( $self, $msg ) = @_; | |
$self->_log | |
? ( $self->_log_to( sprintf( "%s at %s", $msg ? $msg : 'unknown warning', $self->_whowasi ) ) | |
and carp( sprintf( "%s at %s\n", $msg ? $msg : 'unknown warning', $self->_whowasi ) ) ) | |
: carp( sprintf( "%s at %s\n", $msg ? $msg : 'unknown warning', $self->_whowasi ) ); | |
} | |
sub _log_to { | |
my ( $self, $message ) = @_; | |
return if !$self->_log; | |
$message =~ s/[\r\n]+//g; | |
my $path = path( $self->log ); | |
my $dir = $path->dirname; | |
$dir->make_path( { mode => 0700 } ); | |
chmod( 0600, $path->to_string ); | |
my $log = Mojo::Log->new( path => $path->to_string, level => 'debug' ); | |
$log->debug( sprintf( "%s", $message ) ); | |
return 1; | |
} | |
sub _p_out { | |
my ( $self, $message ) = @_; | |
return if !$message; | |
$self->_log | |
? $self->_log_to( sprintf( "%s", decode( 'UTF-8', $message ) ) ) | |
: printf( "%s", decode( 'UTF-8', $message ) ); | |
return 1; | |
} | |
#extra debug output | |
sub _debug { | |
my ( $self, $msg ) = @_; | |
$self->_log | |
? ( $self->_log_to( sprintf( "\nDebug: %s %s\n", $msg ? $msg : '', $self->_whowasi ) ) | |
and carp( sprintf( "\nDebug: %s %s\n", $msg ? $msg : '', $self->_whowasi ) ) ) | |
: carp( sprintf( "\nDebug: %s %s\n", $msg ? $msg : '', $self->_whowasi ) ); | |
} | |
#shortcuts | |
sub _sig { | |
my $self = shift; | |
return Net::Amazon::Signature::V4->new( $self->id, $self->secret, $self->region, $self->service ); | |
} | |
sub _uri { | |
my ( $self, $file ) = @_; | |
return if !ref $file or ref $file ne 'Mojo::File'; | |
my $filename = url_escape( $file->basename ); | |
if ( $file->dirname ne q{.} ) { | |
return sprintf( "/%s/%s", url_escape( $file->dirname, '^A-Za-z0-9\-._~/' ), $filename ); | |
} | |
else { ( return sprintf( "/%s", $filename ) ); } | |
} | |
sub _host { | |
my $self = shift; | |
return sprintf( "%s.%s", $self->bucket, $self->endpoint ); | |
} | |
sub _date { | |
my $self = shift; | |
return strftime( '%Y%m%dT%H%M%SZ', gmtime ); | |
} | |
sub _fspecs { | |
my ( $self, $file ) = @_; | |
return if !ref $file or ref $file ne 'Mojo::File'; | |
my ( $dev, $ino, $mode, $nlink, $uid, $gid, $rdev, $size, $atime, $mtime, $ctime, $blksize, $blocks ) | |
= stat( $file->to_string ); | |
my ( | |
$d_dev, $d_ino, $d_mode, $d_nlink, $d_uid, $d_gid, $d_rdev, | |
$d_size, $d_atime, $d_mtime, $d_ctime, $d_blksize, $d_blocks | |
) = stat( $file->dirname ); | |
return ( | |
$mtime, sprintf( "%04o", S_IMODE($mode) ), | |
$uid, $gid, $d_mtime, sprintf( "%04o", S_IMODE($d_mode) ), | |
$d_uid, $d_gid | |
); | |
} | |
sub _type { | |
my ( $self, $file ) = @_; | |
return if !ref $file or ref $file ne 'Mojo::File'; | |
return MIME::Types->new->mimeTypeOf( $file->to_string ) // q{data}; | |
} | |
sub _md5_digest_b64 { | |
my ( $self, $file ) = @_; | |
eval { | |
return ( ref $file and ref $file eq 'Mojo::File' ) | |
? Crypt::Digest::MD5->new->addfile( $file->to_string )->b64digest | |
: Crypt::Digest::MD5->new->add($file)->b64digest; | |
}; | |
} | |
sub _parts { | |
my ( $self, $file, $size ) = @_; | |
return if !ref $file or ref $file ne 'Mojo::File'; | |
my @parts = (); | |
if ($size) { | |
push( @parts, $_ ) for ( 1 .. ceil( $size / $self->part_size ) ); | |
} | |
else { | |
my $fsize = int( -s $file->to_string ); | |
push( @parts, $_ ) for ( 1 .. ceil( $fsize / $self->part_size ) ); | |
} | |
return @parts; | |
} | |
sub _format_file_meta { | |
my ( $self, $req, $header, $length ) = @_; | |
$self->_warning( sprintf("Got false data for formatting output !") ) and return | |
if ref $header ne 'Mojo::Headers' | |
or !$header->{headers} | |
or !$length; | |
#return if not an object ! Again, HCP bug workaround ! | |
return if $req =~ qr{/$}; | |
( my $req_path ) = $self->_url_unescape($req) =~ /^\/(.*)$/; | |
#fill headers hash with x-amz-meta headers | |
my $headers = { | |
map { $_ => $header->{headers}->{$_}->[0] } | |
grep { $_ =~ q{x-amz} } keys %{ $header->{headers} } | |
}; | |
my $obj = $self->obj_data->{ decode( 'UTF-8', $req_path ) }; | |
$obj->{headers} = $headers; | |
$obj->{length} = format_bytes($length); | |
$obj->{headers}->{length} = $length; | |
if ( my $mod_time = $obj->{headers}->{q{x-amz-meta-mtime}} ) { | |
$obj->{ModTime} = DateTime->from_epoch( | |
epoch => $mod_time, | |
time_zone => 'local' | |
)->strftime("%b %d %Y %H:%M:%S"); | |
} | |
else { | |
my $dt = DateTime::Format::ISO8601->parse_datetime( $obj->{Mtime} ); | |
$dt->set_time_zone('local'); | |
$obj->{ModTime} = sprintf( "%s %.2d %d %s", $dt->month_abbr, $dt->day, $dt->year, $dt->hms ); | |
} | |
$self->_p_out( sprintf( "Object Meta Data: %s\n%s\n", $req_path, dumper $obj) ) if $self->debug; | |
return 1; | |
} | |
sub _restore_file_meta { | |
my ( $self, $rel_path ) = @_; | |
my $abs_path = sprintf( "%s/%s", $self->directory, $rel_path ); | |
my $dir = path($abs_path)->dirname; | |
my $obj = $self->obj_data->{ decode( 'UTF-8', $rel_path ) }; | |
if ( my $mtime = $obj->{headers}->{q{x-amz-meta-mtime}} ) { | |
utime( $mtime, $mtime, $abs_path ) || $self->_warning( sprintf( "couldn't touch %s: %s", $abs_path, $! ) ); | |
} | |
if ( | |
my ( $uid, $gid, $d_uid, $d_gid ) = ( | |
$obj->{headers}->{q{x-amz-meta-uid}}, $obj->{headers}->{q{x-amz-meta-gid}}, | |
$obj->{headers}->{q{x-amz-meta-dir-uid}}, $obj->{headers}->{q{x-amz-meta-dir-gid}} | |
) | |
) | |
{ | |
chown( $uid, $gid, $abs_path ) || $self->_warning( sprintf( "couldn't chown file %s: %s", $abs_path, $! ) ); | |
chown( $d_uid, $d_gid, $dir ) || $self->_warning( sprintf( "couldn't chown dir %s: %s", $abs_path, $! ) ); | |
} | |
if ( my ( $mode, $d_mode ) = ( $obj->{headers}->{q{x-amz-meta-mode}}, $obj->{headers}->{q{x-amz-meta-dir-mode}} ) ) | |
{ | |
chmod( oct($mode), $abs_path ) || $self->_warning( sprintf( "couldn't chmod file %s: %s", $abs_path, $! ) ); | |
chmod( oct($d_mode), $dir ) || $self->_warning( sprintf( "couldn't chmod dir %s: %s", $abs_path, $! ) ); | |
} | |
return 1; | |
} | |
#workaround for amazons escaping problem ;-) | |
#https://forums.aws.amazon.com/thread.jspa?messageID=722673 | |
sub _url_unescape { | |
my ( $self, $escaped ) = @_; | |
if ( $self->server_type eq q{amazon} ) { | |
$escaped =~ s/\+/%20/g; | |
return url_unescape($escaped); | |
} | |
else { return url_unescape($escaped); } | |
} | |
sub _process_args { | |
my $self = shift; | |
foreach my $arg ( | |
( grep( !/^(?:list|max|concurrency|directory|timeout|endpoint|region|debug|log|dircfg|restore)/, @args ) ) ) | |
{ | |
$self->_error( sprintf( "Missing commandline argument: %s", $arg ) ) | |
if !$self->$arg; | |
} | |
#logging | |
$self->_log(1) if $self->log and path( $self->log )->is_abs; | |
#dircfg | |
if ( $self->dircfg ) { | |
$self->_error( sprintf( "Directory not found: %s", $self->_dircfg ) ) if !-f $self->_dircfg; | |
$self->_error( sprintf( "Base Directory not found: %s", $self->_dircfg_base ) ) if !-d $self->_dircfg_base; | |
$self->directory( $self->_dircfg_base ); | |
} | |
#store absolute path ! | |
$self->directory( path( $self->directory )->to_abs ); | |
my $endpoint; | |
$self->endpoint | |
? ( $endpoint = $self->endpoint and $self->url(qq{https://$endpoint}) ) | |
: ( $endpoint = $self->def_endpoint | |
and $self->url(qq{https://$endpoint}) | |
and $self->endpoint( $self->def_endpoint ) ); | |
#misc option settings | |
$self->max_keys( $self->max ) if ( $self->max and $self->max > 1 ); | |
$self->conc( $self->concurrency ) | |
if ($self->concurrency | |
and $self->concurrency > 5 | |
and $self->concurrency < 100 ); | |
$self->_warning( | |
sprintf( | |
"Max keys lower than concurrency level makes no sense ! %d:%d", | |
$self->max_keys, $self->concurrency // 0 | |
) | |
) if $self->max_keys < $self->conc; | |
$self->_timeout( $self->timeout ) | |
if ( $self->timeout and $self->timeout >= 1 ); | |
#region validation | |
my $region; | |
$self->region | |
? ( $region = $self->region ) | |
: ( $region = $self->def_region and $self->region( $self->def_region ) ); | |
#region check. valid: ^[a-z0-9\-]+$ | |
$self->_error( sprintf( "Region invalid: %s Allowed characters:^[a-z0-9-]", $region ) ) | |
if $region !~ /^[a-z0-9\-]+$/; | |
#test connection and redirect if needed to ! | |
my $promise = $self->_test_connection_promise; | |
$promise->then( | |
sub { | |
my $tx = shift; | |
my $result = $tx->result; | |
$self->server_type(q{amazon}) if $result->headers->server and $result->headers->server =~ q{AmazonS3}; | |
$self->_check_redirect($tx) if $result->is_redirect; | |
$self->_check_redirect($tx) if $result->code == 400; | |
if ( $result->code == 403 ) { | |
$self->_error( sprintf( "Authentication failed: %d %s", $tx->error->{code}, $tx->error->{message} ) ); | |
exit(1); | |
} | |
} | |
)->catch( | |
sub { | |
my $err = shift; | |
$self->_warning( sprintf( "Connection Error: %s", $err ) ); | |
exit(1); | |
} | |
)->wait; | |
if ( $self->list ) { | |
$self->_get_object_list_promise->_get_object_detail_promise; | |
$self->_p_out( | |
sprintf( "\nSize: %s\t\t\tObjects: %d\n", format_bytes( $self->total->{length} ), $self->total->{objects} ) | |
); | |
} | |
elsif ( $self->restore ) { | |
$self->_error( sprintf("Either supply a directory or a directory config (dircfg)") ) if !$self->directory; | |
$self->_error( sprintf( "Destination directory: %s not existing", $self->directory ) ) | |
if !-d $self->directory; | |
chdir( $self->directory ); | |
$self->_get_object_list_promise->_get_object_detail_promise->_check_file_digests_promise | |
->_restore_object_promise; | |
$self->_p_out( | |
sprintf( "\nSize: %s\t\t\tObjects: %d\n", format_bytes( $self->total->{length} ), $self->total->{objects} ) | |
); | |
} | |
else { | |
$self->_error( sprintf("Either supply a directory or a directory config (dircfg)") ) if !$self->directory; | |
$self->_error( sprintf( "Source directory: %s not existing", $self->directory ) ) | |
if !-d $self->directory; | |
chdir( $self->directory ); | |
$self->_backup_dir_promise; | |
$self->_p_out( | |
sprintf( "\nSize: %s\t\t\tObjects: %d\n", format_bytes( $self->total->{length} ), $self->total->{objects} ) | |
); | |
} | |
} | |
#we need to make promises! ;-) | |
sub _do_request_promise { | |
my ( $self, $req ) = @_; | |
my $tx = $self->ua->build_tx( $req->method => sprintf( "%s%s", $self->url, $req->uri->as_string ) ); | |
foreach my $hdr_name ( $req->headers->header_field_names ) { | |
next if ( secure_compare( $hdr_name, q{Content-Length} ) | |
or secure_compare( $hdr_name, q{User-Agent} ) ); | |
foreach my $hdr ( $req->headers->header($hdr_name) ) { | |
#fix: some headers can have trailing \0 if not provided bugfree. | |
#Remove it anyway !!! | |
chomp($hdr); | |
$tx->req->headers->append( $hdr_name => $hdr ); | |
} | |
} | |
#disable gzip encoding ! | |
$tx->req->headers->remove('Accept-Encoding'); | |
$tx->req->headers->expect('100-continue'); | |
$tx->req->body( $req->content ) if $req->content; | |
#return Mojo::Promise object | |
return $self->ua->start_p($tx); | |
} | |
sub _check_redirect { | |
my ( $self, $tx ) = @_; | |
my $result = $tx->result; | |
if ( secure_compare( $result->code, 301 ) | |
or secure_compare( $result->code, 307 ) ) | |
{ | |
#Server: AmazonS3 | |
#x-amz-bucket-region: eu-west-1 | |
if ( $self->server_type eq q{amazon} | |
and ( my $region = $result->headers->every_header('x-amz-bucket-region')->[0] ) ) | |
{ | |
#to be sure that we're connected to amazon S3 process the required headers ! | |
$self->region($region); | |
$self->endpoint(qq{s3-$region.amazonaws.com}); | |
$self->url(qq{https://s3-$region.amazonaws.com}); | |
return 1; | |
} | |
} | |
elsif ( secure_compare( $result->code, 400 ) | |
and ( my $ref = $self->keyAttr(q{Error})->xs->XMLin( $result->body ) ) ) | |
{ | |
if ( $ref->{Error}->{Message} | |
=~ /^(?:The authorization header is malformed; the region '\S+' is wrong; expecting '\S+')$/ig ) | |
{ | |
my $region = $ref->{Error}->{Region} | |
// $self->_error( sprintf("Cannot determine region information. S3 Host ?") ); | |
$self->region($region); | |
$self->endpoint(qq{s3-$region.amazonaws.com}); | |
$self->url(qq{https://s3-$region.amazonaws.com}); | |
return 1; | |
} | |
else { return 0 } | |
} | |
else { return 0 } | |
} | |
sub _get_object_list_promise { | |
my $self = shift; | |
my $bucket = $self->bucket; | |
my $max_keys = $self->max_keys; | |
my @objects = (); | |
my $marker = $self->marker // ''; | |
my $req; | |
my ( $total, $total_length ) = int(0); | |
if ( $self->server_type eq 'amazon' ) { | |
if ($marker) { | |
$req = HTTP::Request->new( GET => qq{/?list-type=2&prefix=&max-keys=$max_keys&continuation-token=$marker} ); | |
} | |
else { $req = HTTP::Request->new( GET => qq{/?list-type=2&prefix=&max-keys=$max_keys} ); } | |
} | |
else { | |
if ($marker) { | |
$req = HTTP::Request->new( GET => qq{/$bucket?prefix=&max-keys=$max_keys&marker=$marker} ); | |
} | |
else { $req = HTTP::Request->new( GET => qq{/$bucket?prefix=&max-keys=$max_keys} ); } | |
} | |
$self->server_type eq 'amazon' | |
? $req->header( 'Host' => $self->_host ) | |
: $req->header( 'Host' => $self->endpoint ); | |
$req->header( 'Date' => $self->_date ); | |
$self->_sig->sign($req); | |
my $promise = $self->_do_request_promise($req); | |
$promise->then( | |
sub { | |
my $tx = shift; | |
my $result = $tx->result; | |
if ( my $res = $result->is_success ) { | |
my $ref = eval { $self->keyAttr(q{ListBucketResult})->xs->XMLin( $result->body ); }; | |
$self->_error( sprintf( "ListBucket: Cannot decode answer: %s", $@ ) ) | |
if $@; | |
if ( secure_compare( ref $ref->{ListBucketResult}->{Contents}, q{ARRAY} ) ) { | |
foreach my $key ( keys @{ $ref->{ListBucketResult}->{Contents} } ) { | |
my $obj = $ref->{ListBucketResult}->{Contents}->[$key]; | |
my $obj_key = $obj->{Key}; | |
push( @objects, encode( 'UTF-8', $obj_key ) ); | |
$self->obj_data->{$obj_key}->{Mtime} = $obj->{LastModified} | |
if $obj_key !~ qr{/$}; | |
} | |
} | |
elsif ( $ref->{ListBucketResult}->{Contents} | |
and secure_compare( ref $ref->{ListBucketResult}->{Contents}, q{HASH} ) ) | |
{ | |
my $obj = $ref->{ListBucketResult}->{Contents}; | |
my $obj_key = $obj->{Key}; | |
push( @objects, encode( 'UTF-8', $obj_key ) ); | |
$self->obj_data->{$obj_key}->{Mtime} = $obj->{LastModified} | |
if $obj_key !~ qr{/$}; | |
} | |
if ( $ref->{ListBucketResult}->{IsTruncated} and $ref->{ListBucketResult}->{IsTruncated} =~ qr/true/i ) | |
{ | |
$self->is_trunc(1); | |
if ( $self->server_type eq 'amazon' ) { | |
$self->marker( url_escape( $ref->{ListBucketResult}->{NextContinuationToken} ) ); | |
} | |
else { $self->marker( $objects[$#objects] ); } | |
} | |
} | |
else { | |
my $err = $result->is_error; | |
my $ref = eval { $self->keyAttr(q{Error})->xs->XMLin( $result->body ) }; | |
$self->_warning( | |
sprintf( | |
"Bucket: %s ListBucket response: %s", $self->bucket, $ref ? dumper $ref : '' | |
) | |
)if $err; | |
} | |
} | |
)->catch( | |
sub { | |
my $err = shift; | |
$self->_warning( sprintf( "Connection Error: %s", $err ) ); | |
exit(1); | |
} | |
)->wait; | |
$self->is_trunc(0) and $self->_get_object_list_promise if $self->is_trunc; | |
#weed out directories ! Bug in HCP, where directories falsly listed as object keys | |
my @objects_rem = ( map {$_} grep { $_ !~ qr{/$} } @objects ); | |
$self->total->{objects} += ( $#objects_rem + 1 ); | |
push( @{ $self->objects }, @objects_rem ); | |
$self; | |
} | |
sub _get_object_detail_promise { | |
my $self = shift; | |
while ( @{ $self->objects } ) { | |
my @promises = map $self->_get_object_head_promise( $self->objects ), ( 1 .. $self->conc ); | |
Mojo::Promise->all(@promises)->then( | |
sub { | |
my @promises_sub = @_; | |
foreach my $promise_sub (@promises_sub) { | |
my $tx = $promise_sub->[0]; | |
my $result = $tx->result; | |
if ( $result->is_success ) { | |
my $content_length = int( $result->headers->content_length ); | |
$self->total->{length} += $content_length; | |
#format self->obj_data | |
my $req_path = $tx->req->url->to_abs->path->to_string; | |
$self->_format_file_meta( $req_path, $result->headers, $content_length ); | |
( my $rel_path ) = $self->_url_unescape($req_path) =~ /^\/(.*)$/; | |
my $cur_obj = $self->obj_data->{ decode( 'UTF-8', $rel_path ) }; | |
$self->_p_out( sprintf( "%s %s\t%s\n", $cur_obj->{ModTime}, $cur_obj->{length}, $rel_path ) ); | |
} | |
elsif ( $result->is_error ) { | |
$self->_warning( sprintf( "Cannot get object details: %s", $result->message ) ); | |
} | |
else { $self->_warning( sprintf("Cannot get object details: Unknow Error") ); } | |
} | |
} | |
)->catch( | |
sub { | |
my $err = shift; | |
$self->_warning( sprintf( "Connection Error: %s", $err ) ); | |
exit(1); | |
} | |
)->finally( | |
sub { | |
$self->_debug( sprintf( "get_object_head_promise: %d promises done\n", $self->conc ) ) if $self->debug; | |
} | |
)->wait; | |
} | |
$self; | |
} | |
sub _test_connection_promise { | |
my $self = shift; | |
my $req = HTTP::Request->new( HEAD => q{/} ); | |
$req->header( 'Host' => $self->_host ); | |
$req->header( 'Date' => $self->_date ); | |
$self->_sig->sign($req); | |
return $self->_do_request_promise($req); | |
} | |
sub _get_object_head_promise { | |
my ( $self, $objects ) = @_; | |
my $object = shift @{$objects}; | |
return if !defined $object; | |
my $escaped = url_escape( $object, '^A-Za-z0-9\-._~/' ); | |
my $req = HTTP::Request->new( HEAD => qq{/$escaped} ); | |
$req->header( 'Host' => $self->_host ); | |
$req->header( 'Date' => $self->_date ); | |
$self->_sig->sign($req); | |
return $self->_do_request_promise($req); | |
} | |
sub _initiate_multipart_upload { | |
my ( $self, $file ) = @_; | |
return if !defined $file; | |
#file specs | |
my ( $mtime, $mode, $uid, $gid, $d_mtime, $d_mode, $d_uid, $d_gid ) = $self->_fspecs($file); | |
my $uri = $self->_uri($file); | |
my $req = HTTP::Request->new( POST => qq{$uri?uploads} ); | |
$req->header( 'Host' => $self->_host ); | |
$req->header( 'Content-Type' => $self->_type($file) ); | |
$req->header( 'Date' => $self->_date ); | |
$req->header( 'X-Amz-Meta-Uploader' => $self->uploader ); | |
$req->header( 'X-Amz-Meta-Dir-Mtime' => $d_mtime ); | |
$req->header( 'X-Amz-Meta-Dir-Uid' => $d_uid ); | |
$req->header( 'X-Amz-Meta-Dir-Gid' => $d_gid ); | |
$req->header( 'X-Amz-Meta-Dir-Mode' => $d_mode ); | |
$req->header( 'X-Amz-Meta-Mtime' => $mtime ); | |
$req->header( 'X-Amz-Meta-Uid' => $uid ); | |
$req->header( 'X-Amz-Meta-Gid' => $gid ); | |
$req->header( 'X-Amz-Meta-Mode' => $mode ); | |
$req->header( 'X-Amz-Meta-Digest' => $self->_md5_digest_b64($file) ); | |
$req->header( 'X-Amz-Acl' => q{bucket-owner-full-control} ); | |
$self->_sig->sign($req); | |
return $self->_do_request_promise($req); | |
} | |
sub _upload_multipart_promise { | |
my ( $self, $parts, $uploadid, $file ) = @_; | |
my $part_num = shift @{$parts}; | |
return | |
if !defined $part_num | |
or !ref $file | |
or ref $file ne 'Mojo::File' | |
or !defined $uploadid; | |
my $buffer; | |
my $offset = ( $self->part_size * ( $part_num - 1 ) ); | |
my $handle = $file->open('r+'); | |
my $uri = $self->_uri($file); | |
seek( $handle, $offset, 0 ); | |
read( $handle, $buffer, $self->part_size ); | |
close($handle); | |
my $req = HTTP::Request->new( PUT => qq{$uri?partNumber=$part_num&uploadId=$uploadid} ); | |
$req->content($buffer); | |
$req->header( 'Host' => $self->_host ); | |
$req->header( 'Content-Length' => length($buffer) ); | |
$req->header( 'Content-MD5' => $self->_md5_digest_b64($buffer) ); | |
$req->header( 'Date' => $self->_date ); | |
$self->_sig->sign($req); | |
$req->header( 'X-Part' => $part_num ); | |
return $self->_do_request_promise($req); | |
} | |
sub _complete_multipart_upload_promise { | |
my ( $self, $uploadid, $xml, $file ) = @_; | |
return | |
if !defined $uploadid | |
or !defined $xml | |
or !ref $file | |
or ref $file ne 'Mojo::File'; | |
my $uri = $self->_uri($file); | |
my $req = HTTP::Request->new( POST => qq{$uri?uploadId=$uploadid} ); | |
$req->content($xml); | |
$req->header( 'Host' => $self->_host ); | |
$req->header( 'Content-Length' => length($xml) ); | |
$req->header( 'Date' => $self->_date ); | |
$self->_sig->sign($req); | |
return $self->_do_request_promise($req); | |
} | |
sub _abort_multipart_upload_promise { | |
my ( $self, $uploadid, $file ) = @_; | |
return if !defined $uploadid or !defined $file; | |
my $uri = $self->_uri($file); | |
my $req = HTTP::Request->new( DELETE => qq{$uri?uploadId=$uploadid} ); | |
$req->header( 'Host' => $self->_host ); | |
$req->header( 'Date' => $self->_date ); | |
$self->_sig->sign($req); | |
return $self->_do_request_promise($req); | |
} | |
sub _abort_multipart_upload { | |
my ( $self, $uploadid, $object ) = @_; | |
return if !$uploadid or !$object; | |
my $abort_promise = $self->_abort_multipart_upload_promise( $uploadid, $object ); | |
$abort_promise->then( | |
sub { | |
my $tx = shift; | |
my $result = $tx->result; | |
if ( my $res = $result->is_success ) { | |
$self->_warning( sprintf( "Cancelled upload id: %s for file: %s", $uploadid, $object->to_string ) ); | |
} | |
else { | |
$self->_warning( sprintf( "Failed upload id: %s for file: %s", $uploadid, $object->to_string ) ); | |
} | |
exit(1); | |
} | |
)->catch( | |
sub { | |
my $err = shift; | |
$self->_warning( sprintf( "Connection Error: %s", $err ) ); | |
exit(1); | |
} | |
)->finally( sub { undef @{ $self->c_upload }; exit(1); } )->wait; | |
} | |
sub _upload_single_object_promise { | |
my ( $self, $files ) = @_; | |
my $file = shift @{$files}; | |
return if !defined $file; | |
my $body = $file->slurp; | |
my $digest = $self->_md5_digest_b64($body); | |
#file specs | |
my ( $mtime, $mode, $uid, $gid, $d_mtime, $d_mode, $d_uid, $d_gid ) = $self->_fspecs($file); | |
my $req = HTTP::Request->new( PUT => $self->_uri($file) ); | |
$req->content($body); | |
$req->header( 'Content-Length' => int( -s $file->to_string ) ); | |
$req->header( 'Content-MD5' => $digest ); | |
$req->header( 'Content-Type' => $self->_type($file) ); | |
$req->header( 'Host' => $self->_host ); | |
$req->header( 'Date' => $self->_date ); | |
$req->header( 'X-Amz-Meta-Uploader' => $self->uploader ); | |
$req->header( 'X-Amz-Meta-Dir-Mtime' => $d_mtime ); | |
$req->header( 'X-Amz-Meta-Dir-Uid' => $d_uid ); | |
$req->header( 'X-Amz-Meta-Dir-Gid' => $d_gid ); | |
$req->header( 'X-Amz-Meta-Dir-Mode' => $d_mode ); | |
$req->header( 'X-Amz-Meta-Mtime' => $mtime ); | |
$req->header( 'X-Amz-Meta-Uid' => $uid ); | |
$req->header( 'X-Amz-Meta-Gid' => $gid ); | |
$req->header( 'X-Amz-Meta-Mode' => $mode ); | |
$req->header( 'X-Amz-Meta-Digest' => $digest ); | |
$req->header( 'X-Amz-Acl' => q{bucket-owner-full-control} ); | |
#sign request; | |
$self->_sig->sign($req); | |
return $self->_do_request_promise($req); | |
} | |
sub _get_single_object_promise { | |
my ( $self, $files ) = @_; | |
my $file = shift @{$files}; | |
return if !defined $file; | |
my $req = HTTP::Request->new( GET => url_escape( encode( 'UTF-8', qq{/$file} ), '^A-Za-z0-9\-._~/' ) ); | |
$req->header( 'Host' => $self->_host ); | |
$req->header( 'Date' => $self->_date ); | |
#sign request; | |
$self->_sig->sign($req); | |
return $self->_do_request_promise($req); | |
} | |
sub _get_range_object_promise { | |
my ( $self, $parts, $size, $file ) = @_; | |
my $part_num = shift @{$parts}; | |
return if !defined $part_num; | |
my $parts_all = ceil( $size / $self->part_size ); | |
my $range; | |
my $part_size = $self->part_size; | |
my $offset = ( $part_size * ( $part_num - 1 ) ); | |
if ( $offset == 0 ) { | |
$range = sprintf( "%d-%d", 0, $part_size ); | |
} | |
elsif ( $part_num == $parts_all ) { | |
my $rest = $size % $part_size; | |
$range = sprintf( "%d-%d", ( $size - $rest ) + 1, $size ); | |
} | |
else { $range = sprintf( "%d-%d", $offset + 1, $offset + $part_size ); } | |
my $req = HTTP::Request->new( GET => url_escape( encode( 'UTF-8', qq{/$file} ), '^A-Za-z0-9\-._~/' ) ); | |
$req->header( 'Host' => $self->_host ); | |
$req->header( 'Date' => $self->_date ); | |
$req->header( q{X-Part} => $part_num ); | |
$req->header( 'Range' => qq{bytes=$range} ); | |
#sign request; | |
$self->_sig->sign($req); | |
return $self->_do_request_promise($req); | |
} | |
sub _check_file_digests_promise { | |
my $self = shift; | |
if ( $self->restore and $self->obj_data ) { | |
while ( my ( $key, $value ) = each %{ $self->obj_data } ) { | |
my $r_digest = $value->{headers}->{q{x-amz-meta-digest}} // ''; | |
my $l_digest = $self->_md5_digest_b64( path($key) ) // ''; | |
push( @{ $self->objects }, $key ) if !secure_compare( $r_digest, $l_digest ); | |
} | |
} | |
else { | |
my @objects = (); | |
push( @objects, @{ $self->objects } ); | |
return if !@objects; | |
while (@objects) { | |
my @promises = map $self->_get_object_head_promise( \@objects ), ( 1 .. $self->conc ); | |
Mojo::Promise->all(@promises)->then( | |
sub { | |
my @promises_sub = @_; | |
foreach my $promise_sub (@promises_sub) { | |
my $tx = $promise_sub->[0]; | |
my $result = $tx->result; | |
( my $file ) = $self->_url_unescape( $tx->req->url->to_abs->path->to_string ) =~ /^\/(.*)$/; | |
if ( -f $file ) { | |
my $digest_local = $self->_md5_digest_b64( path($file) ); | |
my $digest_remote = $result->headers->every_header(q{x-amz-meta-Digest})->[0] // q{}; | |
push( @{ $self->objects_r }, path($file) ) | |
if !secure_compare( $digest_local, $digest_remote ); | |
$self->_debug( sprintf( "local <=> remote digest match. File: %s\n", $file ) ) | |
if $self->debug | |
and secure_compare( $digest_local, $digest_remote ); | |
} | |
else { push( @{ $self->objects_r }, path($file) ); } | |
} | |
} | |
)->catch( | |
sub { | |
my $err = shift; | |
$self->_warning( sprintf( "Connection Error: %s", $err ) ); | |
exit(1); | |
} | |
)->wait; | |
} | |
} | |
$self; | |
} | |
sub _backup_dir_promise { | |
my $self = shift; | |
chdir( $self->directory ); | |
#read dircfg of use the -d option supplied ! | |
if ( $self->dircfg ) { | |
my $dirs = decode_json( path( $self->_dircfg )->slurp )->{directories}; | |
$self->_error( sprintf( "Cannot read config file: %s\n", $self->_dircfg ) ) | |
if !ref $dirs | |
or ref $dirs ne 'ARRAY'; | |
my $collection = c( @{$dirs} ); | |
$collection->each( | |
sub { | |
my ( $key, $value ) = @_; | |
my $re_string = $self->_dircfg_base; | |
my ( undef, $rel_dir ) = $key =~ /^($re_string\/)(.*)$/; | |
if ( -d $rel_dir ) { | |
push( @{ $self->objects }, path( encode( 'UTF-8', $rel_dir ) )->list_tree->each ); | |
} | |
} | |
); | |
} | |
else { push( @{ $self->objects }, path(q{.})->list_tree->each ); } | |
#check for remaining objects by comparing local and remote digest ! | |
$self->_check_file_digests_promise; | |
#empty objects collection. | |
# objects_r will hold the objects remaining after comparison ! | |
undef @{ $self->objects }; | |
$self->objects_s( $self->objects_r->map( sub { return $_ if int( -s $_ ) <= $self->part_size; } )->compact ); | |
$self->objects_m( | |
$self->objects_r->map( | |
sub { | |
return $_ | |
if int( -s $_ ) > $self->part_size | |
and int( -s $_ ) <= $self->max_size; | |
} | |
)->compact | |
); | |
#empty objects collection. | |
# objects_single(multi) will hold the objects remaining after mapping ! | |
undef @{ $self->objects_r }; | |
$self->_p_out( | |
sprintf( | |
"\nTo backup: %d | single part files: %d | multi part files: %d | |
\n------------------------------------------------------------------------------------------------------------\n\n", | |
$self->objects_s->size + $self->objects_m->size, | |
$self->objects_s->size, $self->objects_m->size | |
) | |
); | |
if ( $self->objects_s->size ) { | |
while ( @{ $self->objects_s } ) { | |
#undef single objects if signal QUIT,INT,TERM has been received !!! | |
undef @{ $self->objects_s } and return if ( $self->shutdown ); | |
my @promises = map $self->_upload_single_object_promise( $self->objects_s ), ( 1 .. $self->conc ); | |
Mojo::Promise->all(@promises)->then( | |
sub { | |
my @promises_sub = @_; | |
foreach my $promise_sub (@promises_sub) { | |
my $tx = $promise_sub->[0]; | |
( my $req_path ) = $self->_url_unescape( $tx->req->url->to_abs->path->to_string ) =~ /^\/(.*)$/; | |
my $result = $tx->result; | |
if ( $result->is_success ) { | |
my $size = $tx->req->headers->content_length; | |
$self->total->{length} += $size; | |
$self->total->{objects} += 1; | |
$self->_p_out( | |
sprintf( | |
"file: %s size: %s Rescode: %s\n", | |
$req_path, format_bytes($size), $result->code | |
) | |
); | |
} | |
else { | |
if ( my $error = $self->keyAttr(q{Error})->xs->XMLin( $result->body )->{Error} ) { | |
my $err_msg = sprintf( "Code: %s Message: %s", $error->{Code}, $error->{Message} ); | |
$self->_warning( | |
sprintf( "Failed to upload object: %s Reason: %s", $req_path, dumper $err_msg ) ); | |
} | |
} | |
} | |
} | |
)->catch( | |
sub { | |
my $err = shift; | |
$self->_warning( sprintf( "Connection Error: %s", $err ) ); | |
exit(1); | |
} | |
)->finally( | |
sub { | |
$self->_debug( sprintf( "_backup_single_part_promise: %d promises done", $self->conc ) ) | |
if $self->debug; | |
} | |
)->wait; | |
} | |
} | |
if ( $self->objects_m->size ) { | |
while ( @{ $self->objects_m } ) { | |
my $object = shift @{ $self->objects_m }; | |
last if not defined $object; | |
my $init_failed = 0; | |
my $promise = $self->_initiate_multipart_upload($object); | |
my $size = int( -s $object->to_string ); | |
my $uploadid; | |
$promise->then( | |
sub { | |
my $tx = shift; | |
my $result = $tx->result; | |
if ( $result->is_success ) { | |
( my $req_path ) = $self->_url_unescape( $tx->req->url->to_abs->path->to_string ) =~ /^\/(.*)$/; | |
my $size = int( -s $object->to_string ); | |
$self->total->{length} += $size; | |
$self->total->{objects} += 1; | |
if ( my $ref = $self->keyAttr(q{InitiateMultipartUploadResult})->xs->XMLin( $result->body ) ) { | |
$uploadid = $ref->{InitiateMultipartUploadResult}->{UploadId} // q{}; | |
$self->_p_out( | |
sprintf( | |
"file: %s size: %s Rescode: %s ID: %s (init)\n", | |
$req_path, format_bytes($size), $result->code, $uploadid | |
) | |
); | |
} | |
else { $init_failed = 1 } | |
} | |
else { | |
$init_failed = 1; | |
if ( my $error = $self->keyAttr(q{Error})->xs->XMLin( $result->body )->{Error} ) { | |
$self->_warning( | |
sprintf( | |
"Multipart init failed: | |
File: %s Code: %s Message: %s BucketName: %s", | |
$object->to_string, $error->{Code}, | |
$error->{Message}, $error->{BucketName} | |
) | |
); | |
} | |
} | |
} | |
)->catch( | |
sub { | |
my $err = shift; | |
$self->_warning( sprintf( "Connection Error: %s", $err ) ); | |
exit(1); | |
} | |
)->finally( | |
sub { | |
$self->_debug( sprintf( "_initiate_multi_part_promise: %s promises done\n", $object->to_string ) ) | |
if $self->debug; | |
} | |
)->wait; | |
if ($init_failed) { $self->_abort_multipart_upload( $uploadid, $object ); } | |
else { | |
my @parts = $self->_parts($object); | |
my $complete = {}; | |
my ( $upload_failed, $complete_failed ); | |
while (@parts) { | |
#store uploadid and object in case of signal handler triggers shutdown ! | |
$self->c_upload( $uploadid, $object ); | |
#undef parts objects if signal QUIT,INT,TERM has been received !!! | |
if ( $self->shutdown ) { | |
undef @parts; | |
undef @{ $self->objects_m }; | |
$self->_abort_multipart_upload( $uploadid, $object ); | |
return; | |
} | |
my @promises = map $self->_upload_multipart_promise( \@parts, $uploadid, $object ), | |
( 1 .. $self->conc ); | |
Mojo::Promise->all(@promises)->then( | |
sub { | |
my @promises_sub = @_; | |
foreach my $promise_sub (@promises_sub) { | |
my $tx = $promise_sub->[0]; | |
my $result = $tx->result; | |
my $headers_req = $tx->req->headers->to_hash; | |
my $headers_res = $result->headers->to_hash; | |
if ( $result->is_success ) { | |
( my $req_path ) | |
= $self->_url_unescape( $tx->req->url->to_abs->path->to_string ) =~ /^\/(.*)$/; | |
my $size = $headers_req->{q{Content-Length}}; | |
my $part_now = $headers_req->{q{X-Part}}; | |
my $etag = $headers_res->{ETag}; | |
$complete->{CompleteMultipartUpload}->{Part}->[$part_now]->{PartNumber}->[0] | |
= $part_now; | |
$complete->{CompleteMultipartUpload}->{Part}->[$part_now]->{ETag}->[1] = $etag; | |
$self->_p_out( | |
sprintf( | |
"file: %s size: %s Rescode: %s Part: %d Etag: %s\n", | |
$req_path, format_bytes($size), $result->code, $part_now, $etag | |
) | |
); | |
} | |
else { | |
$self->_warning( | |
sprintf( | |
"Failed upload part %d for file: %s", | |
$headers_req->{q{X-Part}}, | |
$object->to_string | |
) | |
); | |
$upload_failed = 1; | |
undef(@parts); | |
} | |
} | |
} | |
)->catch( | |
sub { | |
my $err = shift; | |
$self->_warning( sprintf( "Connection Error: %s", $err ) ); | |
exit(1); | |
} | |
)->wait; | |
} | |
if ($upload_failed) { $self->_abort_multipart_upload( $uploadid, $object ); } | |
else { | |
my $complete_promise = $self->_complete_multipart_upload_promise( $uploadid, | |
$self->keyAttr(q{CompleteMultipartUpload})->xs->XMLout($complete), $object ); | |
$complete_promise->then( | |
sub { | |
my $tx = shift; | |
my $result = $tx->result; | |
if ( my $res = $result->is_success ) { | |
if ( my $ref | |
= $self->keyAttr(q{CompleteMultipartUploadResult})->xs->XMLin( $result->body ) ) | |
{ | |
my $xml = $ref->{CompleteMultipartUploadResult}; | |
$self->_p_out( | |
sprintf( | |
"file: %s Rescode: %s ID: %s (complete)\n", | |
$object->to_string, $result->code, $uploadid | |
) | |
); | |
$self->_debug( sprintf( "(complete multipart) XML:\n%s", dumper $ref) ) | |
if $self->debug; | |
} | |
} | |
else { | |
$complete_failed = 1; | |
if ( my $error = $self->keyAttr(q{Error})->xs->XMLin( $result->body )->{Error} ) { | |
my $err_msg = sprintf( "Code: %s Message: %s", $error->{Code}, $error->{Message} ); | |
$self->_warning( | |
sprintf( | |
"Failed upload object: %s Reason: %s", | |
$object->to_string, dumper $err_msg | |
) | |
); | |
} | |
} | |
} | |
)->catch( | |
sub { | |
my $err = shift; | |
$self->_warning( sprintf( "Connection Error: %s", $err ) ); | |
exit(1); | |
} | |
)->wait; | |
} | |
$self->_abort_multipart_upload( $uploadid, $object ) if $complete_failed; | |
} | |
} | |
} | |
} | |
sub _restore_object_promise { | |
my $self = shift; | |
$self->objects_s( | |
$self->objects->map( sub { return $_ if $self->obj_data->{$_}->{headers}->{length} <= $self->part_size; } ) | |
->compact ); | |
$self->objects_m( | |
$self->objects->map( | |
sub { | |
return $_ | |
if $self->obj_data->{$_}->{headers}->{length} > $self->part_size | |
and $self->obj_data->{$_}->{headers}->{length} <= $self->max_size; | |
} | |
)->compact | |
); | |
#empty objects collection. | |
# $self->objects_single(multi) will hold the objects remaining after mapping ! | |
undef @{ $self->objects }; | |
$self->total->{length} = 0; | |
$self->total->{objects} = 0; | |
$self->_p_out("\n\n----------Downloading Objects----------\n\n") | |
if $self->objects_s->size or $self->objects_m->size; | |
if ( $self->objects_s->size ) { | |
while ( @{ $self->objects_s } ) { | |
#undef single objects if signal QUIT,INT,TERM has been received !!! | |
undef @{ $self->objects_s } and return if ( $self->shutdown ); | |
my @promises = map $self->_get_single_object_promise( $self->objects_s ), ( 1 .. $self->conc ); | |
Mojo::Promise->all(@promises)->then( | |
sub { | |
my @promises_sub = @_; | |
foreach my $promise_sub (@promises_sub) { | |
my $tx = $promise_sub->[0]; | |
( my $req_path ) = $self->_url_unescape( $tx->req->url->to_abs->path->to_string ) =~ /^\/(.*)$/; | |
my $result = $tx->result; | |
if ( $result->is_success ) { | |
my $size = $result->headers->content_length; | |
$self->total->{length} += $size; | |
$self->total->{objects} += 1; | |
my $abs_path = sprintf( "%s/%s", $self->directory, $req_path ); | |
$self->_warning( sprintf( "File %s already exists and will be overwritten !", $abs_path ) ) | |
if -f $abs_path; | |
path($abs_path) | |
->dirname->make_path->tap( sub { path($abs_path)->spurt( $result->body ); } ); | |
#change the mtime of the file by examining the headers stored in $self->obj_data | |
$self->_restore_file_meta($req_path); | |
$self->_p_out( | |
sprintf( | |
"file: %s size: %s Rescode: %s\n", | |
$req_path, format_bytes($size), $result->code | |
) | |
); | |
} | |
else { | |
if ( my $error = $self->keyAttr(q{Error})->xs->XMLin( $result->body )->{Error} ) { | |
my $err_msg = sprintf( "Code: %s Message: %s", $error->{Code}, $error->{Message} ); | |
$self->_warning( | |
sprintf( "Failed to get object: %s Reason: %s", $req_path, dumper $err_msg ) ); | |
} | |
} | |
} | |
} | |
)->catch( | |
sub { | |
my $err = shift; | |
$self->_warning( sprintf( "Connection Error: %s", $err ) ); | |
exit(1); | |
} | |
)->finally( | |
sub { | |
$self->_debug( sprintf( "_get_single_object_promise: %d promises done", $self->conc ) ) | |
if $self->debug; | |
} | |
)->wait; | |
} | |
} | |
if ( $self->objects_m->size ) { | |
while ( @{ $self->objects_m } ) { | |
my $object = shift @{ $self->objects_m }; | |
last if not defined $object; | |
my $size = $self->obj_data->{$object}->{headers}->{length}; | |
my @parts = $self->_parts( path($object), $size ); | |
my $abs_path = sprintf( "%s/%s", $self->directory, $object ); | |
if ( -f $abs_path ) { | |
$self->_warning( sprintf( "File %s already exists and will be overwritten !", $abs_path ) ); | |
eval { unlink $abs_path }; | |
if ($!) { | |
$self->_warning( sprintf( "unlink of file %s failed.Omitting file Error: %s", $abs_path, $! ) ); | |
next; | |
} | |
} | |
#undef multi objects if signal QUIT,INT,TERM has been received !!! | |
undef @{ $self->objects_m } and return if ( $self->shutdown ); | |
path($abs_path)->dirname->make_path; | |
while (@parts) { | |
#undef parts objects if signal QUIT,INT,TERM has been received !!! | |
if ( $self->shutdown ) { | |
unlink($abs_path) if ( -f $abs_path ); | |
undef @parts; | |
return; | |
} | |
my @promises = map $self->_get_range_object_promise( \@parts, $size, $object ), ( 1 .. $self->conc ); | |
Mojo::Promise->all(@promises)->then( | |
sub { | |
my @promises_sub = @_; | |
foreach my $promise_sub (@promises_sub) { | |
my $tx = $promise_sub->[0]; | |
my $result = $tx->result; | |
my $headers_req = $tx->req->headers->to_hash; | |
my $headers_res = $result->headers->to_hash; | |
if ( $result->is_success ) { | |
( my $req_path ) | |
= $self->_url_unescape( $tx->req->url->to_abs->path->to_string ) =~ /^\/(.*)$/; | |
my $size = $headers_res->{q{Content-Length}}; | |
my $part_now = $headers_req->{q{X-Part}}; | |
$self->total->{length} += $size; | |
$self->_p_out( | |
sprintf( | |
"file: %s size: %s Rescode: %s Part: %d\n", | |
$req_path, format_bytes($size), $result->code, $part_now | |
) | |
); | |
my $handle = path($abs_path)->open( O_CREAT | O_WRONLY | O_APPEND ); | |
syswrite( $handle, $result->body ); | |
$self->_warning( sprintf( "Cannot write part file %s. Error: %s", $abs_path, $! ) ) | |
if $!; | |
close($handle); | |
} | |
else { | |
$self->_warning( | |
sprintf( | |
"Failed get part %d for file: %s", $headers_req->{q{X-Part}}, qq{$object} | |
) | |
); | |
} | |
} | |
} | |
)->finally( | |
sub { | |
$self->_debug( sprintf( "_get_multi_object_promise: %d promises done", $self->conc ) ) | |
if $self->debug; | |
} | |
)->catch( | |
sub { | |
my $err = shift; | |
$self->_warning( sprintf( "Connection Error: %s", $err ) ); | |
exit(1); | |
} | |
)->wait; | |
} | |
$self->total->{objects} += 1; | |
$self->_restore_file_meta( encode( 'UTF-8', $object ) ); | |
} | |
} | |
} | |
sub start { | |
my $self = shift; | |
# Cleanup when receiving signals INT,TERM or QUIT ! | |
$SIG{$_} = sub { | |
$self->_warning("\n\nStopping gracefully !\nPlease wait for cleanup !\n\n"); | |
$self->shutdown(1); | |
} | |
for qw(INT QUIT TERM); | |
#debugging (see MOJO_IOLOOP_DEBUG and MOJO_USERAGENT_DEBUG for details). | |
if ( $self->debug2 ) { | |
$self->debug(1); | |
#Client Agent debug as of version 7.83 | |
$ENV{MOJO_CLIENT_DEBUG} = 1; | |
#Client Agent debug as of version < 7.83 | |
$ENV{MOJO_USERAGENT_DEBUG} = 1; | |
$ENV{MOJO_IOLOOP_DEBUG} = 1; | |
require Mojolicious; | |
die( sprintf("Mojolicious version >= 7.52 required !!!\n") ) unless $Mojolicious::VERSION ge '7.52'; | |
} | |
else { | |
require Mojolicious; | |
die( sprintf("Mojolicious version >= 7.52 required !!!\n") ) unless $Mojolicious::VERSION ge '7.52'; | |
} | |
$0 = q{S3Backup}; | |
$self->_process_args; | |
} | |
package main; | |
use Mojo::Base -strict; | |
use Getopt::Long qw(:config posix_default no_ignore_case); | |
use Pod::Usage; | |
my $opts = {}; | |
GetOptions( | |
$opts, | |
( | |
qw(list|l help|h man|m bucket|b=s region|r=s endpoint|e=s id|i=s secret|s=s), | |
qw( directory|d=s max|x=i concurrency|c=i timeout|t=i debug debug2 log=s dircfg restore) | |
) | |
) | |
or pod2usage( -exitval => 'NOEXIT' ) | |
and exit(1); | |
!%{$opts} && pod2usage( -exitval => 'NOEXIT' ) and exit(1); | |
$opts->{man} && pod2usage( -exitstatus => 0, -verbose => 2 ) and exit(1); | |
$opts->{help} && pod2usage( -exitval => 'NOEXIT' ) and exit(1); | |
S3::Backup->new($opts)->start; | |
1; | |
=head1 NAME | |
s3-backup.pl | |
=head1 SYNOPSIS | |
B<s3-backup.pl> [I<options>...] | |
-m,--man extensive manual | |
-h,--help display this help and exit | |
-l,--list list S3 objects recursively | |
-b,--bucket S3 bucket to use | |
-r,--region S3 region to use e.g us-east-1 | |
-e,--endpoint endpoint hostname | |
-i,--id secret access id | |
-s,--secret secret access key | |
-d,--directory directory to upload recursively | |
-x,--max max keys/request (defaults to 100) | |
-c,--concurrency max concurrent requests. (max. 100) | |
-t,--timeout Mojo::UserAgent timeout parameters | |
amazon usually needs more then 2s ! | |
--debug debugging output. Metadata. | |
--debug2 debugging output incuding IOLoop and UserAgent | |
--log=logfile log output to file. (absolute path !) | |
--dircfg enable directory config lookup. | |
(/etc/knoxdata/s3-backup/directories.conf) | |
--restore restore objects | |
destination directory taken from -d argument ! | |
existing files will be overwritten. | |
modification time will be gathered by using | |
the mtime meta header ! | |
=head1 DESCRIPTION | |
Small Module to backup data to a S3 compatible service (non-blocking); | |
It uses x-amz-meta to add the file modification time and digest to the object. | |
Later backups will only sync changed files using the md5 digest as comparator ! | |
=over | |
=item B<-h>, B<--help> | |
show help | |
=item B<-l>, B<--list> | |
List all S3 object referred by the bucket | |
=item B<-b>, B<--bucket> | |
S3 bucket name | |
=item B<-r>, B<--region> | |
region for the S3 connection. B<form: [a-z0-9-]> | |
=item B<-e>, B<--endpoint> | |
fully qualified domain name of the host connecting to. (without bucket or protocol) | |
protocol will always be HTTPS ! | |
=item B<-i>, B<--id> | |
id | |
S3 access key id. (check the AWS IAM security credentials) | |
=item B<-s>, B<--secret> | |
secret | |
S3 secret access key. (check the AWS IAM security credentials) | |
=item B<-t>, B<--timeout> | |
timeout | |
set connection timeout parameters for the ua object. | |
Don't set too low, since amazon is quite busy sometimes, thus | |
output and backup can be incomplete !!! | |
Default: 15s ! | |
=back | |
=head1 EXAMPLES | |
To backup the directory /home/upload/test directory to S3 | |
B<s3-backup.pl -b bucket -i id -s secret -d /home/upload/test> | |
To list all object in the S3 bucket | |
B< ./s3-backup.pl -l -b bucket -i id -s secret key> | |
=head1 COPYRIGHT | |
Copyright (c) 2018 by Franz Skale. All rights reserved. | |
=head1 LICENSE | |
This program is free software: you can redistribute it and/or modify it | |
under the terms of the GNU General Public License as published by the Free | |
Software Foundation, either version 3 of the License, or (at your option) | |
any later version. | |
This program is distributed in the hope that it will be useful, but WITHOUT | |
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for | |
more details. | |
You should have received a copy of the GNU General Public License along with | |
this program. If not, see L<http://www.gnu.org/licenses/>. | |
=head1 AUTHOR | |
S<Franz Skale E<lt>franz.skale@citycom-austria.comE<gt>>, | |
=head1 HISTORY | |
2018-05-07 initial release | |
=cut |
Directory Config is JSON encoded (for cron jobs).
E.g:
{ "directories" : [ "/var/shares/share01/test01東西", "/var/shares/share01/testtest" ] }
Example backup of one directory w/o config file:
./s3-backup.pl -i xxxxxxxx -s xxxxxxxxxxx -b test-bucket -d /Users/test/temp/testdir/ To backup: 1 | single part files: 0 | multi part files: 1 ------------------------------------------------------------------------------------------------------------ file: testfile size: 100M Rescode: 200 ID: B.XsPXtbX.BQY7Z6mGp4isebBFmglKs8qlZdXRjQmz10n0Haz.iySQFdWtym2e4urzSaUYf_SBklV4XyaSWI1fQL8zyHcP6OuMcjbAu90ICQKrRFqhQLUSD0SIph7vry (init) file: testfile size: 8.0M Rescode: 200 Part: 1 Etag: "ade6c6b603a1d6acd4b07c906a6ab95e" file: testfile size: 8.0M Rescode: 200 Part: 2 Etag: "703e6741ebd62796cc771512a8bd6385" file: testfile size: 8.0M Rescode: 200 Part: 3 Etag: "33d9380d23ab85362be8d660f190cdd8" file: testfile size: 8.0M Rescode: 200 Part: 4 Etag: "ec4186135bfee93e6b079ef37b438867" file: testfile size: 8.0M Rescode: 200 Part: 5 Etag: "b82057a90983d07ef59fef3c3b479729" file: testfile size: 8.0M Rescode: 200 Part: 6 Etag: "64c6a853ebbc80bd528d6b8d4d6a1841" file: testfile size: 8.0M Rescode: 200 Part: 7 Etag: "3759cff400843cc25dae45d370245189" file: testfile size: 8.0M Rescode: 200 Part: 8 Etag: "fe500f4023d66bc4e425ee575e6e0f46" file: testfile size: 8.0M Rescode: 200 Part: 9 Etag: "9d0d4906091db8c345bd643cde3d6035" file: testfile size: 8.0M Rescode: 200 Part: 10 Etag: "0d5eeb400270f7974754b9c47186045a" file: testfile size: 8.0M Rescode: 200 Part: 11 Etag: "23826d7ca8f468433bf932efceb8ccdd" file: testfile size: 8.0M Rescode: 200 Part: 12 Etag: "19f152fa23257bc39ca0e7ff56b47265" file: testfile size: 4.0M Rescode: 200 Part: 13 Etag: "efc6bbad1eb7c630e6ceaef2882e219a" file: testfile Rescode: 200 ID: B.XsPXtbX.BQY7Z6mGp4isebBFmglKs8qlZdXRjQmz10n0Haz.iySQFdWtym2e4urzSaUYf_SBklV4XyaSWI1fQL8zyHcP6OuMcjbAu90ICQKrRFqhQLUSD0SIph7vry (complete) Size: 100M Objects: 1
List bucket:
./s3-backup.pl -l -i xxxxxx -s xxxxxx -b test-bucket Jun 06 2019 13:26:56 100M testfile Size: 100M Objects: 1
Restore to the current directory:
./s3-backup.pl -i xxxxxxxxxxx -s xxxxxxxxxxx -b test-bucket --restore $PWD Jun 06 2019 14:09:30 2.3M DSC_0102.JPG Jun 06 2019 14:09:30 2.9M IMAG0709.jpg Jun 06 2019 14:09:30 3.1M IMAG0713.jpg Jun 06 2019 14:09:30 2.1M IMG_0167.JPG Jun 06 2019 14:09:30 3.1M IMG_20161109_111803.jpg Jun 06 2019 14:09:30 2.4M IMG_20170118_125518.jpg Jun 06 2019 14:09:30 3.1M IMG_20170503_114716.jpg Jun 06 2019 14:09:30 2.9M IMG_20170505_111151.jpg Jun 06 2019 14:09:30 3.5M IMG_20170509_100954.jpg Jun 06 2019 14:09:30 2.7M IMG_20170510_135422.jpg Jun 06 2019 14:09:30 3.1M IMG_20170511_113929.jpg Jun 06 2019 14:09:30 3.0M IMG_20170512_115304.jpg Jun 06 2019 14:09:30 3.2M IMG_20170522_130551.jpg Jun 06 2019 14:09:30 555K IMG_20170522_130615_823.jpg Jun 06 2019 14:09:30 2.8M IMG_20170522_130859.jpg Jun 06 2019 14:09:30 721K IMG_20170530_110734_884.jpg Jun 06 2019 14:09:30 626K IMG_20170622_105903_741.jpg Jun 06 2019 14:09:30 3.8M IMG_20170822_120458.jpg Jun 06 2019 14:09:30 548K IMG_20170830_120441_570.jpg Jun 06 2019 14:09:30 6.8M test1.jpg Jun 06 2019 14:09:30 6.1K test2.jpg Jun 06 2019 14:09:30 56K image002.jpg Jun 06 2019 14:09:30 12K test3.jpg Jun 06 2019 14:09:30 17K test4.jpg Jun 06 2019 14:09:30 378K test5.jpg ----------Downloading Objects---------- file: IMG_20170510_135422.jpg size: 2.7M Rescode: 200 file: test4.jpg size: 17K Rescode: 200 file: test2.jpg size: 6.1K Rescode: 200 file: IMG_20170505_111151.jpg size: 2.9M Rescode: 200 file: IMG_20170822_120458.jpg size: 3.8M Rescode: 200 file: IMG_20170511_113929.jpg size: 3.1M Rescode: 200 file: test3.jpg size: 12K Rescode: 200 file: IMG_20161109_111803.jpg size: 3.1M Rescode: 200 file: IMAG0713.jpg size: 3.1M Rescode: 200 file: IMG_20170522_130551.jpg size: 3.2M Rescode: 200 file: DSC_0102.JPG size: 2.3M Rescode: 200 file: IMAG0709.jpg size: 2.9M Rescode: 200 file: IMG_20170118_125518.jpg size: 2.4M Rescode: 200 file: IMG_20170830_120441_570.jpg size: 548K Rescode: 200 file: IMG_20170522_130859.jpg size: 2.8M Rescode: 200 file: IMG_20170530_110734_884.jpg size: 721K Rescode: 200 file: IMG_0167.JPG size: 2.1M Rescode: 200 file: IMG_20170509_100954.jpg size: 3.5M Rescode: 200 file: test1.jpg size: 6.8M Rescode: 200 file: IMG_20170622_105903_741.jpg size: 626K Rescode: 200 file: IMG_20170522_130615_823.jpg size: 555K Rescode: 200 file: IMG_20170503_114716.jpg size: 3.1M Rescode: 200 file: image002.jpg size: 56K Rescode: 200 file: IMG_20170512_115304.jpg size: 3.0M Rescode: 200 file: test5.jpg size: 378K Rescode: 200 Size: 53M Objects: 25
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
To test, you have to install dependecies:
or
Be sure to have build-essesntials and ssl-development libs installed !