-
-
Save Mons/9dc4e2e3097c231551d4f0d130986149 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env perl | |
use strict; | |
use 5.010; | |
# use Getopt::Long; | |
use AnyEvent::Socket; | |
use AnyEvent::Handle; | |
# use DDP; | |
# use Devel::Hexdump; | |
use YAML::XS qw(Load); | |
# use Protocol::Tarantool; | |
my (@from,@to); | |
@ARGV == 2 | |
and do { $ARGV[0] =~ /^(\S+):(\S+)$/ and @from = ($1,$2); } | |
and do { $ARGV[1] =~ /^(\S+):(\S+)$/ and @to = ($1,$2); } | |
or die "Usage:\n\t$0 <from-host:port> <to-host:port>\n"; | |
; | |
my $cv = AE::cv; | |
# warn "Connect @from"; | |
my $master_replication_port; | |
sub get_admin_connection { | |
my $cb = pop; | |
my ($host,$rport) = @_; | |
tcp_connect($host,$rport, sub { | |
# warn "@_"; | |
my $fh = shift | |
or return $cb->(undef, "Failed to connect $host:$rport: $!"); | |
my $h;$h = AnyEvent::Handle->new( | |
fh => $fh, | |
timeout => 1, | |
on_error => sub { | |
shift;shift; | |
warn "@_"; | |
$h->destroy; | |
$cb->(undef, $_[0]); | |
}, | |
); | |
my $body = pack "V w/a* V w/a*",0,"box.dostring",1,"return {tostring(box.cfg.admin_port)}"; | |
$h->push_write(pack("VVVa*",22,length($body),0xa202020,$body)); | |
$h->push_read(chunk => 12, sub { | |
shift; | |
my $raw = $_[0]; | |
my ($seq,$len,$pkt) = unpack "VVV", $_[0]; | |
if ($seq == 0xa2d2d2d) { # ---\n | |
# already an admin port | |
$h->unshift_read( line => qr/^\.\.\.\r?\n$/m, sub { | |
shift; | |
$raw .= $_[0].$_[1]; | |
$h->on_error(undef); | |
$h->timeout(undef); | |
$cb->($h); | |
return; | |
} ); | |
return; | |
} | |
# say "$seq,$len,$pkt"; | |
if ($len > 1024*1024) { warn "Long read"; return $h->destroy; } | |
$h->unshift_read(chunk => $len, sub { | |
shift; | |
my ($code,$data) = unpack "Va*", $_[0]; | |
if ($code) { | |
substr($data,-1,1,''); | |
# p $data; | |
return; | |
} | |
my ($cnt,$tsz,$car,$admport) = unpack "VVV w/a* ", $data; | |
if ($cnt == 1 and $car == 1 and $admport) { | |
# warn "Got port: $admport"; | |
$h->destroy; | |
tcp_connect $host, $admport, sub { | |
my $fh = shift | |
or return $cb->(undef, "Failed to connect $host:$admport: $!"); | |
my $h;$h = AnyEvent::Handle->new( | |
fh => $fh, | |
); | |
$cb->($h); | |
}, sub {1}; | |
} | |
else { | |
$cb->(undef, "Wrong response"); | |
} | |
}); | |
}); | |
}, sub { 1 }); | |
} | |
sub cmd { | |
my ($h, $cmd, $cb) = @_; | |
# warn ">>> $cmd"; | |
$h->timeout(1); | |
$h->on_error(sub { | |
shift; | |
$h->destroy; | |
warn "Connection failed: $_[0]"; | |
}); | |
$h->push_write("$cmd\n"); | |
$h->push_read( line => qr/^\.\.\.\r?\n$/m, sub { | |
shift; | |
$h->timeout(undef); | |
$h->on_error(undef); | |
my $x = eval {Load($_[0])}; | |
if ($x) { | |
if (ref $x) { | |
$cb->($x); | |
} | |
else { | |
$cb->(undef, $x); | |
} | |
} | |
else { | |
$_[0] =~ s{^---\n(.+)\n\.\.\.\n*$}{$1}; | |
$cb->(undef, $_[0]); | |
} | |
} ); | |
}; | |
my ($master, $replica, $replica_source); | |
my $st = AE::cv; $st->begin; | |
$st->begin; | |
get_admin_connection @from, sub { | |
if (my $h = shift) { | |
warn "Got master connection @from\n"; | |
cmd($h, "show info", sub { | |
if (my $info = shift) { | |
if ($info->{info}{status} ne 'primary') { | |
warn "Got wrong status $info->{info}{status} for master @from"; | |
} | |
else { | |
cmd($h, "show conf", sub { | |
if (my $c = shift) { | |
$master = $h; | |
$master_replication_port = $c->{configuration}{replication_port}; | |
} | |
else { | |
warn "@_"; | |
} | |
$st->end; | |
}); | |
return; | |
} | |
} else { | |
warn "Got no info from @from"; | |
} | |
$st->end; | |
}); | |
} else { | |
warn "Failed to connect master: @_\n"; | |
$st->end; | |
} | |
}; | |
$st->begin; | |
get_admin_connection @to, sub { | |
if (my $h = shift) { | |
say "Got replica connection @to\n"; | |
cmd($h, "show info", sub { | |
if (my $info = shift) { | |
if ($info->{info}{status} !~ m{^replica/([^/]+)/connected$}) { | |
warn "Got wrong status $info->{info}{status} for replica @to"; | |
} | |
else { | |
$replica = $h; | |
$replica_source = $1; | |
} | |
} else { | |
warn "Got no info from @to"; | |
} | |
$st->end; | |
}); | |
} else { | |
warn "Failed to connect: @_\n"; | |
$st->end; | |
} | |
}; | |
$st->cb(sub { | |
say "Ready: master:$master ($master_replication_port), replica:$replica (from $replica_source)"; | |
if ($master and $replica) { | |
if ($replica_source ne "$from[0]:$master_replication_port") { | |
warn "Replica port mismatch: $from[0]:$master_replication_port / $replica_source"; | |
return $cv->croak(1); | |
} | |
cmd($master,"relo conf", sub { | |
if ($_[1] eq 'ok') { | |
cmd($master, "show info", sub { | |
if (my $info = shift) { | |
warn "Master after reload: $info->{info}{lsn}, $info->{info}{status}\n"; | |
my $lsn = $info->{info}{lsn}; | |
if ($info->{info}{status} ne 'primary') { | |
# master became ro, wait and reload replica | |
my $wait;$wait = sub { | |
cmd($replica, "show info", sub { | |
if (my $info = shift) { | |
if ($info->{info}{lsn} == $lsn) { | |
say "Got lsn, do reload on replica"; | |
cmd($replica, "reload conf", sub { | |
if ($_[1] eq 'ok') { | |
say "Replica config reloaded"; | |
cmd($replica, "show info", sub { | |
if (my $info = shift) { | |
if ($info->{info}{status} eq 'primary') { | |
say "Successfully finished"; | |
return $cv->send; | |
} | |
else { | |
return $cv->croak("Status of replica remains: $info->{info}{status}.\n". | |
"Emergency config fix and manual reload required!"); | |
} | |
} | |
else { | |
return $cv->croak("Failed to fetch info from replica: @_"); | |
} | |
}); | |
} | |
else { | |
# p @_; | |
say YAML::XS::Dump \@_; | |
return $cv->croak("Failed: @_"); | |
} | |
}); | |
} | |
elsif ($info->{info}{lsn} < $lsn) { | |
warn "$info->{info}{lsn} < $lsn, waiting..."; | |
my $w;$w = AE::timer 0.03,0, sub { | |
undef $w; | |
$wait->(); | |
}; | |
} | |
else { | |
return $cv->croak("WTF: lsn overrun"); | |
} | |
} | |
else { | |
return $cv->croak("Process not completed! @_"); | |
} | |
# $wait->(); | |
}); | |
}; $wait->(); | |
} | |
else { | |
$cv->croak("Master still master after reload, check config"); | |
} | |
} | |
}); | |
} | |
}); | |
} | |
else { | |
$cv->croak("No connections"); | |
} | |
}); | |
$st->end; | |
$cv->recv; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment