Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Demo code used in my "Multitasking in PHP" presentation at the December 2012 Atlanta PHP user group meetup
<?php
require dirname(__FILE__).'/Process_Manager.php';
// Create a batch of test messages to send
$email = array(
'to' => 'test@test.com',
'subject' => 'This is a test',
'body' => 'Hello, world of multi-processing!'
);
$queue = array_fill(0, 50, $email);
// Create a function simulate sending an email message
$sender = function($message_id, $message)
{
// Pretend to send it, we'll assume a normal latency of 500-1000ms
$ms = rand(500, 1000);
usleep($ms * 1000);
printf("Process %d: sent message %d (%d ms)\n", posix_getpid(), $message_id, $ms);
};
// Start the timer
$start_time = microtime(TRUE);
// Send the emails
foreach ($queue as $message_id => $message)
{
$sender($message_id, $message);
}
// Stop the timer
$runtime = microtime(TRUE) - $start_time;
printf("\nDone! Sent %d messages in %d seconds\n\n", count($queue), $runtime);
exit;
<?php
require dirname(__FILE__).'/Process_Manager.php';
$pm = new Process_Manager();
declare(ticks = 1);
// Setup our signal handlers
$callback = array($pm, 'signal_handler');
pcntl_signal(SIGTERM, $callback);
pcntl_signal(SIGINT, $callback);
pcntl_signal(SIGCHLD, $callback);
// Create a batch of test messages to send
$email = array(
'to' => 'test@test.com',
'subject' => 'This is a test',
'body' => 'Hello, world of multi-processing!'
);
$queue = array_fill(0, 50, $email);
// Create a function simulate sending an email message
$sender = function($message_id, $message)
{
// Pretend to send it, we'll assume a normal latency of 500-1000ms
$ms = rand(500, 1000);
usleep($ms * 1000);
printf("Process %d: sent message %d (%d ms)\n", posix_getpid(), $message_id, $ms);
};
// Start the timer
$start_time = microtime(TRUE);
// Fork processes to send the emails
foreach ($queue as $message_id => $message)
{
$args = array(
'message_id' => $message_id,
'message' => $message,
);
// Execution will not proceed past this line
// except for in the parent process.
$pm->fork_child($sender, $args);
// Limit concurrency to 5 processes
if (count($pm) >= 5)
{
while (count($pm) >= 5)
{
usleep(500000); // sleep 500 ms
}
}
}
// Wait for all processes to end
echo "The queue is empty, waiting for all processes to finish\n";
while (count($pm) > 0)
{
usleep(500000); // sleep 500 ms
}
// Stop the timer
$runtime = microtime(TRUE) - $start_time;
printf("\nDone! Sent %d messages in %d seconds\n\n", count($queue), $runtime);
exit;
<?php
class Process_Manager implements Countable
{
protected $processes = array();
protected $is_child = FALSE;
public function count()
{
return count($this->processes);
}
public function signal_handler($signal)
{
// Don't do anything if we're not in the parent process
if ($this->is_child)
{
return;
}
switch ($signal)
{
case SIGINT:
case SIGTERM:
echo "\nUser terminated the application\n";
// Kill all child processes before terminating the parent
$this->kill_all();
exit(0);
case SIGCHLD:
// Reap a child process
//echo "SIGCHLD received\n";
$this->reap_child();
}
}
public function kill_all()
{
foreach ($this->processes as $pid => $is_running)
{
posix_kill($pid, SIGKILL);
}
}
public function fork_child($callback, $data)
{
$pid = pcntl_fork();
switch($pid)
{
case 0:
// Child process
$this->is_child = TRUE;
call_user_func_array($callback, $data);
posix_kill(posix_getppid(), SIGCHLD);
exit;
case -1:
// Parent process, fork failed
throw new Exception("Out of memory!");
default:
// Parent process, fork succeeded
$this->processes[$pid] = TRUE;
return $pid;
}
}
public function reap_child()
{
// Check if any child process has terminated,
// and if so remove it from memory
$pid = pcntl_wait($status, WNOHANG);
if ($pid < 0)
{
throw new Exception("Out of memory");
}
elseif ($pid > 0)
{
unset($this->processes[$pid]);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment