Skip to content

Instantly share code, notes, and snippets.

@krakjoe
Last active December 2, 2016 05:28
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save krakjoe/4750943 to your computer and use it in GitHub Desktop.
Save krakjoe/4750943 to your computer and use it in GitHub Desktop.
PCNTL+pools+workers+pthreads
<?php
declare(ticks=1);
/*
This is your container for workers, it is only used in the process itself, so there is no need to descend from pthreads
*/
class Pool {
public $max;
public $config;
public $stop;
public $workers;
/* your pool creates a maximum of $max redis workers using $config */
public function __construct($max, $config) {
$this->max = $max;
$this->config = $config;
$this->stop = false;
}
/*
Upon submission you should either start or select a worker to execute the work
*/
public function submit($work) {
if (!$this->stop) {
/* start a new worker if required */
if (count($this->workers) < $this->max) {
$nextid = count($this->workers);
$this->workers[$nextid]=new RedisWorker($this->config);
$this->workers[$nextid]->stack($work);
$this->workers[$nextid]->start();
return $nextid;
} else {
/* stack the work anywhere */
$randid = array_rand($this->workers);
$this->workers[$randid]->stack($work);
return $randid;
}
}
return -1;
}
/*
Shutdown all the workers
*/
public function shutdown() {
/*
Set a stop variable to force submit to break
*/
$this->stop = true;
/*
Shutdown all workers, all jobs will be executed first
*/
printf("%s: shutting down %d workers:\n", __METHOD__, count($this->workers));
foreach ($this->workers as $worker) {
if ($worker->shutdown()) {
printf("%s: shut down %s #%lu\n", __METHOD__, get_class($worker), $worker->getThreadId());
} else printf("%s: failed to shut down %s #%lu\n", __METHOD__, get_class($worker), $worker->getThreadId());
}
}
}
/* this is a redis dummy, replace with redis :) */
class RedisDummy {
public function connect(){}
/* I'm unsure of how you get the next job and want to show you a working example */
public function next() {
/* there is no reason to sleep, only so you can see the output of the demo */
usleep(100000);
/* dummy method/data */
return array("key" => rand(0, 1000));
}
}
/*
RedisWorker will establish a connection nothing else is required of the worker
*/
class RedisWorker extends Worker {
/* this will be set by each workers run method */
/* static storage amounts to Thread local, so set in the context you wish to manipulate the object in */
/* some objects don't do so well when stored in the pthreads context, this is expected and safe */
/* a stackable and it's worker share a context, every stacked object therefore has access to a thread local redis instance */
/* the maximum number of connections to redis will be the maximum number of worker threads + 1 (master) */
public static $redis;
/* private/public are ignored by pthreads, a wasteful pointless instruction in my view */
/* every property is considered public, this offsets some of the overhead of locking upon access */
public $config;
/* this will construct this object with $config, which I assume to contain redis connection information */
public function __construct($config) {
$this->config = $config;
}
public function run() {
/* replace this with the real initialization of redis */
self::$redis = new RedisDummy();
self::$redis->connect($this->config);
/* once the connection is established the worker does not need to do anything else */
}
}
/*
An object to represent a redis job
*/
class RedisWork extends Stackable {
public $job;
public function __construct($job) {
$this->job = $job;
}
public function run() {
/* RedisWorker::$redis is how you access the thread local connection to redis */
var_dump(RedisWorker::$redis);
try {
/* execute the job */
var_dump($this->job);
/* use the same control structures in your "job", any errors that are thrown will be caught and can be handled */
/* without affecting the ability of this worker to execute more stackables */
} catch(Exception $ex) {
/* do something with exception */
var_dump($ex);
}
}
}
/* configure redis here */
$config = array("configure" => "redis");
/*
* Master connection to redis
*/
$master = new RedisDummy();
/*
Setup a pool to be manipulated in this context only
*/
$pool = new Pool(5, $config);
/*
Setup signal handler to shutdown workers
*/
pcntl_signal(SIGINT, array($pool, "shutdown"));
/*
Poll for jobs from redis
You must stop submitting when the signal is caught, $pool->shutdown will cause next() to return negatively if no more work can be submitted
*/
while (($job = $master->next())) {
if ($pool->submit(new RedisWork($job)) < 0) {
/* if we get to here, no more work can be done CTRL+C was caught */
break;
}
}
/* that is all */
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment