Created
October 30, 2012 07:34
-
-
Save ysasaki/3978801 to your computer and use it in GitHub Desktop.
AnyEvent::Handle on_error時の再接続処理
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env perl | |
use v5.14; | |
use warnings; | |
use AnyEvent; | |
use AnyEvent::Socket; | |
use AnyEvent::Handle; | |
use Log::Minimal; | |
use Test::TCP qw(empty_port wait_port); | |
use Scalar::Util qw(refaddr); | |
local $Log::Minimal::COLOR = 1; | |
my $port = empty_port(); | |
my $pid = fork // die "fork failed: $!"; | |
if ($pid) { | |
# parent | |
# ちょいちょい切断してくる外部サーバ | |
local $Log::Minimal::PRINT = sub { | |
my ( $time, $type, $message, $trace, $raw_message) = @_; | |
warn "[S] [$type] $message\n"; | |
}; | |
my $cv = AE::cv; | |
my $max_ping = 2; | |
my $cur_ping = 0; | |
my %client; | |
sub add_client { $client{refaddr($_[0])} = $_[0] } | |
sub remove_client { delete $client{refaddr($_[0])} } | |
my $accepted = sub { | |
my ( $fh, $host, $port ) = @_; | |
my $hdl = AnyEvent::Handle->new( | |
fh => $fh, | |
poll => 'r', | |
on_read => sub { | |
my ($hdl) = @_; | |
$hdl->push_read( | |
line => sub { | |
my ( $hdl, $line ) = @_; | |
chomp $line; | |
$cur_ping++; | |
infof("%s", $line); | |
# 接続断の状況を作るために、適当に切断する | |
if ($cur_ping >= $max_ping) { | |
warnf("Close addr:%s", refaddr($hdl)); | |
close $fh; | |
remove_client($hdl); | |
$cur_ping = 0; | |
} | |
} | |
); | |
}, | |
on_error => sub { | |
my ($hdl, $fatal, $msg) = @_; | |
critf("Errno:%d, Errmsg:%s", $fatal, $msg); | |
remove_client($hdl); | |
} | |
); | |
infof("Connected addr:%s from %s:%s", refaddr($hdl), $host, $port); | |
add_client($hdl); | |
}; | |
my $prepared = sub { | |
my ( $fh, $host, $port ) = @_; | |
infof("Starting server %s:%s", $host, $port); | |
}; | |
my $server = tcp_server '127.0.0.1', $port, $accepted, $prepared; | |
my $w; $w = AE::signal INT => sub { | |
infof("SININT received"); | |
$cv->send; | |
undef $w; | |
}; | |
$cv->recv; | |
waitpid($pid, 0); | |
} | |
else { | |
# child | |
# on_drainで順次データを送信するクライアント | |
# 自動で再接続する | |
local $Log::Minimal::PRINT = sub { | |
my ( $time, $type, $message, $trace, $raw_message) = @_; | |
warn "[C] [$type] $message\n"; | |
}; | |
# wait until the server startup | |
wait_port($port); | |
my $cv = AE::cv; | |
my $max_ping = 20; | |
my $cur_ping = 0; | |
my ($client, $connect, $hdl); | |
my $on_drain = sub { | |
my ($hdl) = @_; | |
debugf("on_drain called"); | |
my $t; $t = AE::timer 1, undef, sub { | |
if ( $cur_ping >= $max_ping ) { | |
warnf("cur_ping %d reatched max_ping: %d", $cur_ping, $max_ping); | |
$cv->send; | |
return; | |
} | |
if ( $hdl->destroyed ) { | |
critf("CANNOT PING BECAUSE HANDLE DESTROYED"); | |
return; | |
} | |
$cur_ping++; | |
$hdl->push_write("ping:$cur_ping\n"); | |
infof("ping:%d", $cur_ping); | |
undef $t; | |
}; | |
}; | |
$connect = sub { | |
tcp_connect '127.0.0.1', $port, sub { | |
my ($fh) = @_; | |
$hdl = AnyEvent::Handle->new( | |
fh => $fh, | |
poll => 'w', | |
on_error => sub { | |
my ($hdl, $fatal, $msg) = @_; | |
critf("Errno:%d, Errmsg:%s", $fatal, $msg); | |
$hdl->destroyed; | |
critf("SET HANDLE DESTROYED!!"); | |
$client = $connect->(); | |
}, | |
); | |
$hdl->on_drain($on_drain); | |
}; | |
}; | |
$client = $connect->(); | |
$cv->recv; | |
infof("exit"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment