Skip to content

Instantly share code, notes, and snippets.

@2072
Created September 10, 2014 18:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save 2072/19a0a8ce38768e2f792a to your computer and use it in GitHub Desktop.
Save 2072/19a0a8ce38768e2f792a to your computer and use it in GitHub Desktop.
Pthreads (PHP) shared object management
<?php
// PHP PThreads issue 329 diagnostic script by John Wellesz
// Also a nice example of shared object management...
const ONEMUSEC = 1000000;
printf('file age: %d', time() - filemtime(__file__));
echo PHP_EOL;
// a threaded object shared between different thread contexts
class sharedObject extends \Threaded {
public $name = "";
public function __construct($name) {
$this->name = $name;
}
public function getName() {
return $this->name;
}
}
// a thread that uses the sharedObject
class objectUserThread extends \Thread {
public $broker = null;
public $id = 0;
public $objectOps = 0;
public function __construct (objectServer $broker, $id) {
$this->broker = $broker;
$this->id = $id;
}
public function printStatus() {
printf ("%s-%d status: made %d operations on shared objects.%s"
, __CLASS__
, $this->id
, $this->objectOps
, PHP_EOL
);
}
public function run () {
$myID = Thread::getCurrentThreadId();
$loop = 0;
$objNum = 0;
for (;;) {
//usleep(mt_rand(ONEMUSEC/100, ONEMUSEC/10));
$ops = 0;
$objNum = $this->broker->getObjectNum();
if (! $objNum || mt_rand(1, 50) == 1 && $objNum < 10) {
$this->broker->createObject(sprintf('%X-%d', substr(dechex($myID), -1 * mt_rand(1, strlen(dechex($myID)))), $loop));
++$ops;
}
$objects = $this->broker->getObjects(mt_rand(1, $objNum ? $objNum : 1));
$todelete = [];
// do something with those objects
foreach($objects as $name=>$obj) {
if (! $obj instanceof sharedObject)
throw new \Exception(sprintf(
"Something is wrong: obj '%s' is a %s, obtained objects: %s %s", $name, gettype($obj), print_r(array_keys($objects), true), PHP_EOL
));
if ($name !== $obj->getName())
throw new \Exception(
"Something is wrong: \$name !== \$obj->getName()"
);
++$ops;
if (mt_rand(1, 10000) == 1)
$todelete[] = $name;
}
foreach ($todelete as $name) {
$this->broker->runQuery('delete', [$name]);
++$ops;
}
$this->objectOps += $ops;
++$loop;
}
}
}
// the thread that creates and hold internal references to sharedObject
// instances
class objectServer extends \Thread {
public $queryType = "";
public $queryOrigin = null;
public $queryParams = null;
public $queryResults = null;
public $queryServedCount = 0;
public $getCount = 0;
public $getnumCount = 0;
public $createCount = 0;
// init and start the thread
public function __construct() {
$this->queryOrigin = null; // the id of the querying thread
$this->queryType = 'init';
$this->start();
do {
$this->synchronized(function() {
if ($this->queryType !== null)
if (!$this->wait(10 * ONEMUSEC)) {
if ($this->hasStopped()) {
$this->queryType = false;
}
}
});
} while ($this->queryType !== null);
printf("Object broker thread ready for query%s", PHP_EOL);
}
public function printStatus() {
vprintf (__CLASS__ . " status: served %d queries: g:%s, c:%d, gn:%d, holding %d objects. Mem usage: %s" . PHP_EOL
, $this->runQuery('getStats')
);
}
private function doGet(array &$objectRepo, array &$deletedRepo) {
$number = $this->queryParams->shift();
$origin = $this->queryOrigin;
if ($number > count($objectRepo))
$number = count($objectRepo);
$objectIds = array_keys($objectRepo);
shuffle($objectIds);
$objects = [];
for ($i = 0 ; $i < $number ; $i++) {
$objectID = $objectIds[$i];
$object = $objectRepo[$objectID];
$object->lock();
if (! isset($deletedRepo[$objectID]))
$objects[$objectID] = $objectRepo[$objectID];
$object->unlock();
}
$this->queryResults->merge($objects);
$this->getCount += 1;
}
public function run () {
// reference holder for internal shared threaded instances
$holder = [];
$holder['properties'] = [
'queryParams' => new \Threaded(),
'queryResults' => new \Threaded(),
];
$this->merge($holder['properties']);
// external shared object reference holder
$objectRepo = [];
// object pending deletion
$deletedRepo = [];
// main loop that must never stop until everything else has
for (;;) {
$queryType = $this->notifyAndWaitForQuery();
$now = time();
// remove objects deleted more than five seconds ago
foreach ($deletedRepo as $objectID=>$time)
if ($now - $time > 5) {
unset($deletedRepo[$objectID]);
unset($objectRepo[$objectID]);
}
switch ($queryType) {
case 'getObjectNum':
$this->queryResults->merge([count($objectRepo)]);
$this->getnumCount += 1;
break;
case 'delete':
$objectID = $this->queryParams->shift();
if (! isset($deletedRepo[$objectID]))
$deletedRepo[$objectID] = $now;
$this->queryResults->merge([true]);
break;
case 'get':
$this->doGet($objectRepo, $deletedRepo);
break;
case 'getStats':
$this->queryResults->merge([[
$this->queryServedCount
, $this->getCount
, $this->createCount
, $this->getnumCount
, count($objectRepo)
, number_format(memory_get_usage(true))
]]);
break;
case 'create':
$objectID = $this->queryParams->shift();
if (! isset($objectRepo[$objectID])) {
$objectRepo[$objectID] = new sharedObject($objectID);
$ret = true;
} else
$ret = false;
$this->queryResults->merge([$ret]);
$this->createCount += 1;
printf('Created object: %s%s', $objectID, PHP_EOL);
break;
default:
printf ('Unsupported queryType: %s%s', var_Export($queryType, true), PHP_EOL);
break;
}
$this->queryServedCount += 1;
}
}
public function getObjectNum () {
return $this->runQuery('getObjectNum');
}
public function getObjects ($number) {
return $this->runQuery('get', [$number]);
}
public function createObject ($name) {
return $this->runQuery('create', [$name]);
}
private function notifyAndWaitForQuery() {
$firstPass = true;
do {
// wait for instructions
$this->synchronized(function() use ($firstPass) {
if ($firstPass) {
$this->queryType = null;
$this->notify();
}
if ($this->queryType === null)
$this->wait(10 * ONEMUSEC);
});
$firstPass = false;
} while ($this->queryType === null);
return $this->queryType;
}
public function submitQueryAndWait($queryType) {
$this->queryOrigin = Thread::getCurrentThreadId();
$pass = 1;
do {
if ($pass > 1 && $this->hasStopped())
throw new \Exception($this, 'Crashed');
$this->synchronized(function($queryType) use ($pass) {
if ($pass == 1) {
$this->queryType = $queryType;
$this->notify();
}
if ($this->queryType !== null)
$this->wait(10 * ONEMUSEC);
}, $queryType);
++$pass;
} while ($this->queryType !== null);
}
protected function runQuery ($type, array $params = null) {
if ($this->queryParams->count())
throw new \Exception('Unfinished query...');
if ($params)
$this->queryParams->merge($params);
$this->submitQueryAndWait($type);
// extract the results and return them to the caller
$queryResults = $this->queryResults;
$queryResults->lock();
$i = $queryResults->count();
if ($i)
$result = $queryResults->chunk($i, true);
else
$result = [];
$queryResults->unlock();
if ($i != count($result))
throw new \Exception('!!!! $i != count($result) %d/%d %s', $i, count($result), PHP_EOL);
// if there is only one and there is no specific key
if ($i == 1 && isset($result[0]))
// return THE result directly instead of an array
return $result[0];
else
return $result;
}
}
$broker = new objectServer();
$user_1 = new objectUserThread($broker, 1);
$user_2 = new objectUserThread($broker, 2);
$user_3 = new objectUserThread($broker, 3);
$user_1->start();
$user_2->start();
$user_3->start();
for (;;) {
sleep(1);
$broker->printStatus();
$user_1->printStatus();
$user_2->printStatus();
$user_3->printStatus();
echo PHP_EOL;
}
echo PHP_EOL;
echo PHP_EOL;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment