Skip to content

Instantly share code, notes, and snippets.

@joelwurtz
Last active March 2, 2024 11:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joelwurtz/bd4a6b49c265da6c0f1cf159d3255434 to your computer and use it in GitHub Desktop.
Save joelwurtz/bd4a6b49c265da6c0f1cf159d3255434 to your computer and use it in GitHub Desktop.
amqp-with-signal.php
<?php
$shouldStop = false;
pcntl_signal(SIGINT, function ($signo) use (&$shouldStop) {
var_dump('SIGINT received');
$shouldStop = true;
});
pcntl_signal(SIGTERM, function ($signo) use (&$shouldStop) {
var_dump('SIGTERM received');
$shouldStop = true;
});
$stream = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
$pid = pcntl_fork();
if ($pid == -1) {
echo 'Could not fork Process.';
die();
}
if (!$pid) {
/*child*/
fclose($stream[1]);
$cnn = new AMQPConnection();
$cnn->setHost("message-queue");
$cnn->setLogin("guest");
$cnn->setPassword("guest");
$cnn->setVhost("/");
$cnn->connect();
$channel1 = new AMQPChannel($cnn);
$logsQueue = new AMQPQueue($channel1);
$logsQueue->setName("queue");
$running = false;
$shouldStop = false;
stream_set_blocking($stream[0], true);
pcntl_signal(SIGINT, SIG_DFL);
pcntl_signal(SIGTERM, SIG_DFL);
$logsQueue->consume(function (AMQPEnvelope $envelope, AMQPQueue $queue) use ($stream) {
$message = $envelope->getBody();
$length = strlen($message);
fwrite($stream[0], pack('N', $length), 4);
fwrite($stream[0], $message, $length);
$length = unpack('N', fread($stream[0], 4))[1];
$answer = fread($stream[0], $length);
var_dump($length);
var_dump($answer);
if ($answer === 'ACK') {
$queue->nack($envelope->getDeliveryTag());
} else {
$queue->nack($envelope->getDeliveryTag());
return false;
}
});
var_dump('end of consume');
exit(0);
}
fclose($stream[0]);
$read = [$stream[1]];
$write = [];
$except = [];
stream_set_blocking($stream[1], true);
while (@stream_select($read, $write, $except, 10, 100_000) !== false) {
if ($read) {
$running = true;
$length = unpack('N', fread($stream[1], 4))[1];
$message = fread($stream[1], $length);
var_dump($message);
if (fwrite($stream[1], pack('N', 3), 4) === false) {
$shouldStop = true;
}
fwrite($stream[1], 'ACK', 3);
$running = false;
}
var_dump('should stop: ' . $shouldStop);
$read = [$stream[1]];
if ($shouldStop) {
break;
}
}
var_dump('end of loop');
sleep(2);
// kill child process - it may be not necessary if the child process is already dead
posix_kill($pid, SIGKILL);
pcntl_waitpid($pid, $status);
sleep(10);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment