Skip to content

Instantly share code, notes, and snippets.

@marioroy
Last active June 12, 2016 15:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marioroy/037cbea8bb930a50463c71fa9cf6d998 to your computer and use it in GitHub Desktop.
Save marioroy/037cbea8bb930a50463c71fa9cf6d998 to your computer and use it in GitHub Desktop.
use strict; use warnings;
# test script; exists after 1 second
use threads;
use Socket qw/ PF_UNIX PF_UNSPEC SOCK_STREAM /;
use Time::HiRes qw/ sleep /;
my ($self, $zero_bytes, @thrs) = ({}, "\x00\x00\x00\x00");
&sock_pair($self, 'com_r_sock', 'com_w_sock');
push @thrs, threads->create(\&worker_task);
push @thrs, threads->create(\&worker_noop) for ( 1..10 );
my $COM_R_SOCK = $self->{com_r_sock};
print {$COM_R_SOCK} "data\n"; <$COM_R_SOCK>;
print {$COM_R_SOCK} "exit\n"; <$COM_R_SOCK>;
$_->join for @thrs;
exit;
sub worker_task {
my $COM_W_SOCK = $self->{com_w_sock};
while (1) {
# the Windows platform requires having to sock_ready
# otherwise, the script hangs :(
sock_ready($COM_W_SOCK) if ($^O eq 'MSWin32');
my $res = <$COM_W_SOCK>;
print {$COM_W_SOCK} "1\n";
last if ($res eq "exit\n");
}
}
sub worker_noop {
sleep 1; # simulate work
}
sub sock_ready {
my ($bytes, $socket ) = ("\x00\x00\x00\x00", @_);
my ($cnt, $retries, $ptr_bytes) = (1, 0, unpack('I', pack('P', $bytes)));
while (1) {
ioctl($socket, 0x4004667f, $ptr_bytes); # MSWin32 FIONREAD
# return if unpack('I', $bytes);
return if $bytes ne $zero_bytes; # string compare, 2x faster
# delay to not consume a CPU from non-blocking ioctl
if ($cnt) {
if (++$cnt > 900) {
$cnt = 1, sleep 0.015;
$cnt = 0 if ++$retries == 2;
}
next;
}
sleep 0.030;
}
}
sub sock_pair {
my ($self, $rsock, $wsock) = @_;
socketpair( $self->{$rsock}, $self->{$wsock},
PF_UNIX, SOCK_STREAM, PF_UNSPEC ) or die "socketpair: $!\n";
$self->{$rsock}->autoflush;
$self->{$wsock}->autoflush;
return;
}
# loop from the upcoming MCE::Shared 1.100
# 1. awaits for immediate work (1st inner loop), disables after 3 tries to not consume a CPU
# 2. is performant when there's work to be done (2nd inner loop)
if ($_is_MSWin32) {
# The normal loop hangs on Windows when processes/threads start/exit.
# Using ioctl() properly, http://www.perlmonks.org/?node_id=780083
my ($_bytes, $_cnt, $_retries) = ("\x00\x00\x00\x00", 1, 0);
my ($_ptr_bytes, $_nbytes) = (unpack('I', pack('P', $_bytes)));
while (!$_done) {
ioctl($_DAT_R_SOCK, 0x4004667f, $_ptr_bytes); # MSWin32 FIONREAD
unless ($_nbytes = unpack('I', $_bytes)) {
# delay to not consume a CPU from non-blocking ioctl
if ($_cnt) {
if (++$_cnt > 900) {
$_cnt = 1, sleep 0.015;
$_cnt = 0 if ++$_retries == 2;
}
next;
}
sleep 0.030;
next;
}
$_cnt = 1, $_retries = 0;
do {
sysread($_DAT_R_SOCK, $_func, 8);
$_done = 1, last() unless length($_func) == 8;
$_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ];
$_output_function{$_func}();
} while (($_nbytes -= 8) >= 8);
}
}
# cygwin, UNIX
else {
while (!$_done) {
$_func = <$_DAT_R_SOCK>;
last() unless ( length($_func) == 6 );
$_DAU_R_SOCK = $_channels->[ <$_DAT_R_SOCK> ];
$_output_function{$_func}();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment