Skip to content

Instantly share code, notes, and snippets.

@ysasaki
Created October 30, 2012 07:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ysasaki/3978801 to your computer and use it in GitHub Desktop.
Save ysasaki/3978801 to your computer and use it in GitHub Desktop.
AnyEvent::Handle on_error時の再接続処理
#!/usr/bin/env perl
use v5.14;
use warnings;
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use Log::Minimal;
use Test::TCP qw(empty_port wait_port);
use Scalar::Util qw(refaddr);
local $Log::Minimal::COLOR = 1;
my $port = empty_port();
my $pid = fork // die "fork failed: $!";
if ($pid) {
# parent
# ちょいちょい切断してくる外部サーバ
local $Log::Minimal::PRINT = sub {
my ( $time, $type, $message, $trace, $raw_message) = @_;
warn "[S] [$type] $message\n";
};
my $cv = AE::cv;
my $max_ping = 2;
my $cur_ping = 0;
my %client;
sub add_client { $client{refaddr($_[0])} = $_[0] }
sub remove_client { delete $client{refaddr($_[0])} }
my $accepted = sub {
my ( $fh, $host, $port ) = @_;
my $hdl = AnyEvent::Handle->new(
fh => $fh,
poll => 'r',
on_read => sub {
my ($hdl) = @_;
$hdl->push_read(
line => sub {
my ( $hdl, $line ) = @_;
chomp $line;
$cur_ping++;
infof("%s", $line);
# 接続断の状況を作るために、適当に切断する
if ($cur_ping >= $max_ping) {
warnf("Close addr:%s", refaddr($hdl));
close $fh;
remove_client($hdl);
$cur_ping = 0;
}
}
);
},
on_error => sub {
my ($hdl, $fatal, $msg) = @_;
critf("Errno:%d, Errmsg:%s", $fatal, $msg);
remove_client($hdl);
}
);
infof("Connected addr:%s from %s:%s", refaddr($hdl), $host, $port);
add_client($hdl);
};
my $prepared = sub {
my ( $fh, $host, $port ) = @_;
infof("Starting server %s:%s", $host, $port);
};
my $server = tcp_server '127.0.0.1', $port, $accepted, $prepared;
my $w; $w = AE::signal INT => sub {
infof("SININT received");
$cv->send;
undef $w;
};
$cv->recv;
waitpid($pid, 0);
}
else {
# child
# on_drainで順次データを送信するクライアント
# 自動で再接続する
local $Log::Minimal::PRINT = sub {
my ( $time, $type, $message, $trace, $raw_message) = @_;
warn "[C] [$type] $message\n";
};
# wait until the server startup
wait_port($port);
my $cv = AE::cv;
my $max_ping = 20;
my $cur_ping = 0;
my ($client, $connect, $hdl);
my $on_drain = sub {
my ($hdl) = @_;
debugf("on_drain called");
my $t; $t = AE::timer 1, undef, sub {
if ( $cur_ping >= $max_ping ) {
warnf("cur_ping %d reatched max_ping: %d", $cur_ping, $max_ping);
$cv->send;
return;
}
if ( $hdl->destroyed ) {
critf("CANNOT PING BECAUSE HANDLE DESTROYED");
return;
}
$cur_ping++;
$hdl->push_write("ping:$cur_ping\n");
infof("ping:%d", $cur_ping);
undef $t;
};
};
$connect = sub {
tcp_connect '127.0.0.1', $port, sub {
my ($fh) = @_;
$hdl = AnyEvent::Handle->new(
fh => $fh,
poll => 'w',
on_error => sub {
my ($hdl, $fatal, $msg) = @_;
critf("Errno:%d, Errmsg:%s", $fatal, $msg);
$hdl->destroyed;
critf("SET HANDLE DESTROYED!!");
$client = $connect->();
},
);
$hdl->on_drain($on_drain);
};
};
$client = $connect->();
$cv->recv;
infof("exit");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment