public
Created

AnyEvent::Handle on_error時の再接続処理

  • Download Gist
gistfile1.pl
Perl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
#!/usr/bin/env perl
 
use v5.14;
use warnings;
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use Log::Minimal;
use Test::TCP qw(empty_port wait_port);
use Scalar::Util qw(refaddr);
 
local $Log::Minimal::COLOR = 1;
 
my $port = empty_port();
 
my $pid = fork // die "fork failed: $!";
 
if ($pid) {
 
# parent
# ちょいちょい切断してくる外部サーバ
 
local $Log::Minimal::PRINT = sub {
my ( $time, $type, $message, $trace, $raw_message) = @_;
warn "[S] [$type] $message\n";
};
 
my $cv = AE::cv;
my $max_ping = 2;
my $cur_ping = 0;
 
my %client;
sub add_client { $client{refaddr($_[0])} = $_[0] }
sub remove_client { delete $client{refaddr($_[0])} }
 
my $accepted = sub {
my ( $fh, $host, $port ) = @_;
 
my $hdl = AnyEvent::Handle->new(
fh => $fh,
poll => 'r',
on_read => sub {
my ($hdl) = @_;
 
$hdl->push_read(
line => sub {
my ( $hdl, $line ) = @_;
chomp $line;
 
$cur_ping++;
infof("%s", $line);
 
# 接続断の状況を作るために、適当に切断する
if ($cur_ping >= $max_ping) {
warnf("Close addr:%s", refaddr($hdl));
close $fh;
remove_client($hdl);
$cur_ping = 0;
}
}
);
},
on_error => sub {
my ($hdl, $fatal, $msg) = @_;
critf("Errno:%d, Errmsg:%s", $fatal, $msg);
remove_client($hdl);
}
);
 
infof("Connected addr:%s from %s:%s", refaddr($hdl), $host, $port);
add_client($hdl);
};
 
my $prepared = sub {
my ( $fh, $host, $port ) = @_;
infof("Starting server %s:%s", $host, $port);
};
 
my $server = tcp_server '127.0.0.1', $port, $accepted, $prepared;
 
my $w; $w = AE::signal INT => sub {
infof("SININT received");
$cv->send;
undef $w;
};
 
$cv->recv;
 
waitpid($pid, 0);
}
else {
 
# child
# on_drainで順次データを送信するクライアント
# 自動で再接続する
 
local $Log::Minimal::PRINT = sub {
my ( $time, $type, $message, $trace, $raw_message) = @_;
warn "[C] [$type] $message\n";
};
 
# wait until the server startup
wait_port($port);
 
my $cv = AE::cv;
my $max_ping = 20;
my $cur_ping = 0;
 
my ($client, $connect, $hdl);
 
my $on_drain = sub {
my ($hdl) = @_;
debugf("on_drain called");
 
my $t; $t = AE::timer 1, undef, sub {
if ( $cur_ping >= $max_ping ) {
warnf("cur_ping %d reatched max_ping: %d", $cur_ping, $max_ping);
$cv->send;
return;
}
 
if ( $hdl->destroyed ) {
critf("CANNOT PING BECAUSE HANDLE DESTROYED");
return;
}
 
$cur_ping++;
$hdl->push_write("ping:$cur_ping\n");
infof("ping:%d", $cur_ping);
 
undef $t;
 
};
};
 
$connect = sub {
tcp_connect '127.0.0.1', $port, sub {
my ($fh) = @_;
 
$hdl = AnyEvent::Handle->new(
fh => $fh,
poll => 'w',
on_error => sub {
my ($hdl, $fatal, $msg) = @_;
critf("Errno:%d, Errmsg:%s", $fatal, $msg);
 
$hdl->destroyed;
critf("SET HANDLE DESTROYED!!");
 
$client = $connect->();
},
);
$hdl->on_drain($on_drain);
};
};
 
$client = $connect->();
$cv->recv;
infof("exit");
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.