Skip to content

Instantly share code, notes, and snippets.

@immutef
Created December 10, 2011 12:56
Show Gist options
  • Save immutef/1455105 to your computer and use it in GitHub Desktop.
Save immutef/1455105 to your computer and use it in GitHub Desktop.
ZeroMQ Fan In/Out + Kill
<?php
/**
* I'd like to use IPC sockets (ipc://foo.ipc) but my virtual machine won't let me ... :)
* IPC sockets will be faster than TCP, so this script could run even faster than it does!
*/
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Input\InputOption;
class TestCommand extends Command
{
const OPEN = 'open';
const WAIT = 'wait';
const FAIL = 'fail';
const DONE = 'done';
/**
* {@inheritDoc}
*/
protected function configure()
{
$this
->setName('test')
->addOption('tasks', 't', InputOption::VALUE_OPTIONAL, 'Tasks', 10)
->addOption('workers', 'w', InputOption::VALUE_OPTIONAL, 'Workers', 1)
->setDescription('')
->setHelp(<<<EOT
EOT
)
;
}
/**
* {@inheritDoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$tasks = $input->getOption('tasks');
$workers = $input->getOption('workers');
for ($i = 0; $i < $workers; $i++) {
$pid = pcntl_fork();
if ($pid == 0) {
$this->worker($i);
return 0;
}
}
$this->server($tasks);
}
private function worker($i)
{
echo getmypid(), ' (worker [', $i, ']) > started', PHP_EOL;
$context = new \ZMQContext();
$in = $context->getSocket(\ZMQ::SOCKET_PULL);
$in->connect('tcp://*:1394');
$out = $context->getSocket(\ZMQ::SOCKET_PUSH);
$out->connect('tcp://*:1395');
$sub = $context->getSocket(\ZMQ::SOCKET_SUB);
$sub->connect('tcp://*:1396');
$sub->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, 'kill');
while (true) {
$task = $in->recv(\ZMQ::MODE_NOBLOCK);
if ($task) {
echo 'work[', $i, ']:task:', $task, PHP_EOL;
$out->send($task);
}
$kill = $sub->recv(\ZMQ::MODE_NOBLOCK);
if ($kill) {
echo 'work[', $i, ']:kill', PHP_EOL;
break;
}
usleep(1);
}
}
public function server($tasks)
{
echo getmypid(), ' (server) > started', PHP_EOL;
$context = new \ZMQContext();
$out = $context->getSocket(\ZMQ::SOCKET_PUSH);
$out->bind('tcp://*:1394');
$in = $context->getSocket(\ZMQ::SOCKET_PULL);
$in->bind('tcp://*:1395');
$pub = $context->getSocket(\ZMQ::SOCKET_PUB);
$pub->bind('tcp://*:1396');
sleep(1); // wait for worker spawned (check w/ pcntl)
$tasks = array_fill(1, $tasks, self::OPEN);
do {
foreach (array_keys($tasks, self::OPEN, true) as $task) {
echo 'server:out:task:', $task, PHP_EOL;
$out->send($task);
$tasks[$task] = self::WAIT;
}
while (true) {
$task = $in->recv();
echo 'server:in:task:', $task, PHP_EOL;
$tasks[$task] = self::DONE;
break;
}
} while (!$this->finish($tasks));
$pub->send('kill');
sleep(1); // wait for worker killed (check w/ pcntl)
echo getmypid(), ' (server) > finish', PHP_EOL;
}
private function finish(array $tasks)
{
$fail = count(array_keys($tasks, self::FAIL, true));
$done = count(array_keys($tasks, self::DONE, true));
return count($tasks) == $fail + $done;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment