Skip to content

Instantly share code, notes, and snippets.

@Mons

Mons/switch.pl Secret

Last active March 13, 2020 09:16
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Mons/9dc4e2e3097c231551d4f0d130986149 to your computer and use it in GitHub Desktop.
Save Mons/9dc4e2e3097c231551d4f0d130986149 to your computer and use it in GitHub Desktop.
#!/usr/bin/env perl
use strict;
use 5.010;
# use Getopt::Long;
use AnyEvent::Socket;
use AnyEvent::Handle;
# use DDP;
# use Devel::Hexdump;
use YAML::XS qw(Load);
# use Protocol::Tarantool;
my (@from,@to);
@ARGV == 2
and do { $ARGV[0] =~ /^(\S+):(\S+)$/ and @from = ($1,$2); }
and do { $ARGV[1] =~ /^(\S+):(\S+)$/ and @to = ($1,$2); }
or die "Usage:\n\t$0 <from-host:port> <to-host:port>\n";
;
my $cv = AE::cv;
# warn "Connect @from";
my $master_replication_port;
sub get_admin_connection {
my $cb = pop;
my ($host,$rport) = @_;
tcp_connect($host,$rport, sub {
# warn "@_";
my $fh = shift
or return $cb->(undef, "Failed to connect $host:$rport: $!");
my $h;$h = AnyEvent::Handle->new(
fh => $fh,
timeout => 1,
on_error => sub {
shift;shift;
warn "@_";
$h->destroy;
$cb->(undef, $_[0]);
},
);
my $body = pack "V w/a* V w/a*",0,"box.dostring",1,"return {tostring(box.cfg.admin_port)}";
$h->push_write(pack("VVVa*",22,length($body),0xa202020,$body));
$h->push_read(chunk => 12, sub {
shift;
my $raw = $_[0];
my ($seq,$len,$pkt) = unpack "VVV", $_[0];
if ($seq == 0xa2d2d2d) { # ---\n
# already an admin port
$h->unshift_read( line => qr/^\.\.\.\r?\n$/m, sub {
shift;
$raw .= $_[0].$_[1];
$h->on_error(undef);
$h->timeout(undef);
$cb->($h);
return;
} );
return;
}
# say "$seq,$len,$pkt";
if ($len > 1024*1024) { warn "Long read"; return $h->destroy; }
$h->unshift_read(chunk => $len, sub {
shift;
my ($code,$data) = unpack "Va*", $_[0];
if ($code) {
substr($data,-1,1,'');
# p $data;
return;
}
my ($cnt,$tsz,$car,$admport) = unpack "VVV w/a* ", $data;
if ($cnt == 1 and $car == 1 and $admport) {
# warn "Got port: $admport";
$h->destroy;
tcp_connect $host, $admport, sub {
my $fh = shift
or return $cb->(undef, "Failed to connect $host:$admport: $!");
my $h;$h = AnyEvent::Handle->new(
fh => $fh,
);
$cb->($h);
}, sub {1};
}
else {
$cb->(undef, "Wrong response");
}
});
});
}, sub { 1 });
}
sub cmd {
my ($h, $cmd, $cb) = @_;
# warn ">>> $cmd";
$h->timeout(1);
$h->on_error(sub {
shift;
$h->destroy;
warn "Connection failed: $_[0]";
});
$h->push_write("$cmd\n");
$h->push_read( line => qr/^\.\.\.\r?\n$/m, sub {
shift;
$h->timeout(undef);
$h->on_error(undef);
my $x = eval {Load($_[0])};
if ($x) {
if (ref $x) {
$cb->($x);
}
else {
$cb->(undef, $x);
}
}
else {
$_[0] =~ s{^---\n(.+)\n\.\.\.\n*$}{$1};
$cb->(undef, $_[0]);
}
} );
};
my ($master, $replica, $replica_source);
my $st = AE::cv; $st->begin;
$st->begin;
get_admin_connection @from, sub {
if (my $h = shift) {
warn "Got master connection @from\n";
cmd($h, "show info", sub {
if (my $info = shift) {
if ($info->{info}{status} ne 'primary') {
warn "Got wrong status $info->{info}{status} for master @from";
}
else {
cmd($h, "show conf", sub {
if (my $c = shift) {
$master = $h;
$master_replication_port = $c->{configuration}{replication_port};
}
else {
warn "@_";
}
$st->end;
});
return;
}
} else {
warn "Got no info from @from";
}
$st->end;
});
} else {
warn "Failed to connect master: @_\n";
$st->end;
}
};
$st->begin;
get_admin_connection @to, sub {
if (my $h = shift) {
say "Got replica connection @to\n";
cmd($h, "show info", sub {
if (my $info = shift) {
if ($info->{info}{status} !~ m{^replica/([^/]+)/connected$}) {
warn "Got wrong status $info->{info}{status} for replica @to";
}
else {
$replica = $h;
$replica_source = $1;
}
} else {
warn "Got no info from @to";
}
$st->end;
});
} else {
warn "Failed to connect: @_\n";
$st->end;
}
};
$st->cb(sub {
say "Ready: master:$master ($master_replication_port), replica:$replica (from $replica_source)";
if ($master and $replica) {
if ($replica_source ne "$from[0]:$master_replication_port") {
warn "Replica port mismatch: $from[0]:$master_replication_port / $replica_source";
return $cv->croak(1);
}
cmd($master,"relo conf", sub {
if ($_[1] eq 'ok') {
cmd($master, "show info", sub {
if (my $info = shift) {
warn "Master after reload: $info->{info}{lsn}, $info->{info}{status}\n";
my $lsn = $info->{info}{lsn};
if ($info->{info}{status} ne 'primary') {
# master became ro, wait and reload replica
my $wait;$wait = sub {
cmd($replica, "show info", sub {
if (my $info = shift) {
if ($info->{info}{lsn} == $lsn) {
say "Got lsn, do reload on replica";
cmd($replica, "reload conf", sub {
if ($_[1] eq 'ok') {
say "Replica config reloaded";
cmd($replica, "show info", sub {
if (my $info = shift) {
if ($info->{info}{status} eq 'primary') {
say "Successfully finished";
return $cv->send;
}
else {
return $cv->croak("Status of replica remains: $info->{info}{status}.\n".
"Emergency config fix and manual reload required!");
}
}
else {
return $cv->croak("Failed to fetch info from replica: @_");
}
});
}
else {
# p @_;
say YAML::XS::Dump \@_;
return $cv->croak("Failed: @_");
}
});
}
elsif ($info->{info}{lsn} < $lsn) {
warn "$info->{info}{lsn} < $lsn, waiting...";
my $w;$w = AE::timer 0.03,0, sub {
undef $w;
$wait->();
};
}
else {
return $cv->croak("WTF: lsn overrun");
}
}
else {
return $cv->croak("Process not completed! @_");
}
# $wait->();
});
}; $wait->();
}
else {
$cv->croak("Master still master after reload, check config");
}
}
});
}
});
}
else {
$cv->croak("No connections");
}
});
$st->end;
$cv->recv;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment