Last active
December 2, 2016 05:28
-
-
Save krakjoe/4750943 to your computer and use it in GitHub Desktop.
PCNTL+pools+workers+pthreads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(ticks=1); | |
/* | |
This is your container for workers, it is only used in the process itself, so there is no need to descend from pthreads | |
*/ | |
class Pool { | |
public $max; | |
public $config; | |
public $stop; | |
public $workers; | |
/* your pool creates a maximum of $max redis workers using $config */ | |
public function __construct($max, $config) { | |
$this->max = $max; | |
$this->config = $config; | |
$this->stop = false; | |
} | |
/* | |
Upon submission you should either start or select a worker to execute the work | |
*/ | |
public function submit($work) { | |
if (!$this->stop) { | |
/* start a new worker if required */ | |
if (count($this->workers) < $this->max) { | |
$nextid = count($this->workers); | |
$this->workers[$nextid]=new RedisWorker($this->config); | |
$this->workers[$nextid]->stack($work); | |
$this->workers[$nextid]->start(); | |
return $nextid; | |
} else { | |
/* stack the work anywhere */ | |
$randid = array_rand($this->workers); | |
$this->workers[$randid]->stack($work); | |
return $randid; | |
} | |
} | |
return -1; | |
} | |
/* | |
Shutdown all the workers | |
*/ | |
public function shutdown() { | |
/* | |
Set a stop variable to force submit to break | |
*/ | |
$this->stop = true; | |
/* | |
Shutdown all workers, all jobs will be executed first | |
*/ | |
printf("%s: shutting down %d workers:\n", __METHOD__, count($this->workers)); | |
foreach ($this->workers as $worker) { | |
if ($worker->shutdown()) { | |
printf("%s: shut down %s #%lu\n", __METHOD__, get_class($worker), $worker->getThreadId()); | |
} else printf("%s: failed to shut down %s #%lu\n", __METHOD__, get_class($worker), $worker->getThreadId()); | |
} | |
} | |
} | |
/* this is a redis dummy, replace with redis :) */ | |
class RedisDummy { | |
public function connect(){} | |
/* I'm unsure of how you get the next job and want to show you a working example */ | |
public function next() { | |
/* there is no reason to sleep, only so you can see the output of the demo */ | |
usleep(100000); | |
/* dummy method/data */ | |
return array("key" => rand(0, 1000)); | |
} | |
} | |
/* | |
RedisWorker will establish a connection nothing else is required of the worker | |
*/ | |
class RedisWorker extends Worker { | |
/* this will be set by each workers run method */ | |
/* static storage amounts to Thread local, so set in the context you wish to manipulate the object in */ | |
/* some objects don't do so well when stored in the pthreads context, this is expected and safe */ | |
/* a stackable and it's worker share a context, every stacked object therefore has access to a thread local redis instance */ | |
/* the maximum number of connections to redis will be the maximum number of worker threads + 1 (master) */ | |
public static $redis; | |
/* private/public are ignored by pthreads, a wasteful pointless instruction in my view */ | |
/* every property is considered public, this offsets some of the overhead of locking upon access */ | |
public $config; | |
/* this will construct this object with $config, which I assume to contain redis connection information */ | |
public function __construct($config) { | |
$this->config = $config; | |
} | |
public function run() { | |
/* replace this with the real initialization of redis */ | |
self::$redis = new RedisDummy(); | |
self::$redis->connect($this->config); | |
/* once the connection is established the worker does not need to do anything else */ | |
} | |
} | |
/* | |
An object to represent a redis job | |
*/ | |
class RedisWork extends Stackable { | |
public $job; | |
public function __construct($job) { | |
$this->job = $job; | |
} | |
public function run() { | |
/* RedisWorker::$redis is how you access the thread local connection to redis */ | |
var_dump(RedisWorker::$redis); | |
try { | |
/* execute the job */ | |
var_dump($this->job); | |
/* use the same control structures in your "job", any errors that are thrown will be caught and can be handled */ | |
/* without affecting the ability of this worker to execute more stackables */ | |
} catch(Exception $ex) { | |
/* do something with exception */ | |
var_dump($ex); | |
} | |
} | |
} | |
/* configure redis here */ | |
$config = array("configure" => "redis"); | |
/* | |
* Master connection to redis | |
*/ | |
$master = new RedisDummy(); | |
/* | |
Setup a pool to be manipulated in this context only | |
*/ | |
$pool = new Pool(5, $config); | |
/* | |
Setup signal handler to shutdown workers | |
*/ | |
pcntl_signal(SIGINT, array($pool, "shutdown")); | |
/* | |
Poll for jobs from redis | |
You must stop submitting when the signal is caught, $pool->shutdown will cause next() to return negatively if no more work can be submitted | |
*/ | |
while (($job = $master->next())) { | |
if ($pool->submit(new RedisWork($job)) < 0) { | |
/* if we get to here, no more work can be done CTRL+C was caught */ | |
break; | |
} | |
} | |
/* that is all */ | |
?> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment