Skip to content

Instantly share code, notes, and snippets.

@chuyskywalker
Created August 13, 2014 05:46
Show Gist options
  • Save chuyskywalker/52cc85e943bfb68c3575 to your computer and use it in GitHub Desktop.
Save chuyskywalker/52cc85e943bfb68c3575 to your computer and use it in GitHub Desktop.
Parallel Tasks Spawn Other Tasks As Available
<?php
require './defs.php';
$gearmanClient = new GearmanClient();
$gearmanClient->addServer();
$modelsToRun = [
ModelA::CLASS,
ModelB::CLASS,
ModelC::CLASS,
ModelD::CLASS,
ModelE::CLASS,
];
$modelSet = $factSetsRequired = [];
foreach ($modelsToRun as $modelName) {
/** @var Model $newModel */
$newModel = new $modelName();
$modelSet[$modelName] = $newModel;
$factSetsRequired = array_merge($factSetsRequired, $newModel->requiredFactSets());
}
$factSetsRequired = array_values(array_unique($factSetsRequired));
echo "Models & FactSets\nAttempting to run these models which, in aggregate, require these facts:";
print_r($modelsToRun);
print_r($factSetsRequired);
$modelResults = $availableFactSets = [];
function taskReturn(GearmanTask $task) {
global $gearmanClient, $modelResults, $modelSet, $availableFactSets;
if ($task->functionName() == 'factSet') {
/** @var FactSet $factSet */
$factSet = unserialize($task->data());
$availableFactSets[get_class($factSet)] = $factSet;
echo "<- Facts results from: " . get_class($factSet) . "\n";
// see if any models are ready to be queued off
foreach ($modelSet as $modelName => $model) { /** @var Model $model */
if (array_diff($model->requiredFactSets(), array_keys($availableFactSets)) === []) {
echo "-> Model " . get_class($model) . " now has all required facts, queueing it\n";
// only include the required factSets to the model job
$factSets = [];
foreach ($model->requiredFactSets() as $factSetName) {
$factSets[$factSetName] = $availableFactSets[$factSetName];
}
$gearmanClient->addTask('model', serialize([
'factSets' => $factSets,
'model' => get_class($model)
]));
unset($modelSet[$modelName]); // done with this one, remove from the list
}
else {
echo "-- Model not ready: " . get_class($model) . "\n";
}
}
}
else {
// a model result, save it up
$resp = unserialize($task->data());
echo "<- Model results for: " . $resp['model'] . "\n";
$modelResults[$resp['model']] = $resp['result'];
}
}
$gearmanClient->setCompleteCallback("taskReturn");
// start by queueing up tasks for all the fact gathering
// which, when we have enough for models, queue off model jobs
foreach ($factSetsRequired as $factSet) {
$gearmanClient->addTask('factSet', $factSet);
}
echo "Starting Fact Gathering:\n";
$gearmanClient->runTasks();
echo "All facts and models completed, predictions are:\n";
print_r($modelResults);
<?php
interface FactSet {
public function gatherFacts();
public function getFact($factIdId);
}
abstract class FactSetTemplate implements FactSet {
protected $facts = [];
public function gatherFacts() {
usleep(rand(1000000,5000000)); // 1000000 == 1 sec
}
final public function getFact($factId) {
return isset($this->facts[$factId]) ? $this->facts[$factId] : null;
}
}
class UserInfoFactSet extends FactSetTemplate {
const AGE = 1;
const HEIGHT = 2;
const WEIGHT = 3;
const GENDER = 4;
public function gatherFacts() {
parent::gatherFacts();
$this->facts = [
self::AGE => rand(18,99),
self::HEIGHT => rand(45,84),
self::WEIGHT => rand(120,250),
self::GENDER => rand(0,1),
];
return true;
}
}
class SchoolInfoFactSet extends FactSetTemplate {
const GRADE = 1;
const GPA = 2;
const REGION = 3;
public function gatherFacts() {
parent::gatherFacts();
$this->facts = [
self::GRADE => rand(4,12),
self::GPA => rand(0,100),
self::REGION=> rand(0,6),
];
return true;
}
}
class ClickInfoFactSet extends FactSetTemplate {
const SPORTS = 1;
const GLOBALNEWS = 2;
const TECHNOLOGY = 3;
public function gatherFacts() {
parent::gatherFacts();
$this->facts = [
self::SPORTS => rand(0,100),
self::GLOBALNEWS => rand(0,100),
self::TECHNOLOGY => rand(0,100),
];
return true;
}
}
class SocialFactSet extends FactSetTemplate {
const FBLIKES = 1;
const TWEETS = 2;
const GPLUSES = 3;
public function gatherFacts() {
parent::gatherFacts();
$this->facts = [
self::FBLIKES => rand(0,100),
self::TWEETS => rand(0,100),
self::GPLUSES => rand(0,100),
];
return true;
}
}
interface Model {
/**
* @return array
*/
public function requiredFactSets();
/**
* @param FactSet[] $factSets
* @return float
*/
public function run($factSets);
}
class ModelA implements Model {
/** @inheritdoc */
public function requiredFactSets() {
return [
UserInfoFactSet::CLASS,
];
}
/** @inheritdoc */
public function run($factSets) {
usleep(rand(1000000,5000000)); // 1000000 == 1 sec
return (
($factSets[UserInfoFactSet::CLASS]->getFact(UserInfoFactSet::AGE) * 5)
+ ($factSets[UserInfoFactSet::CLASS]->getFact(UserInfoFactSet::HEIGHT) * .5)
- ($factSets[UserInfoFactSet::CLASS]->getFact(UserInfoFactSet::WEIGHT) * 2)
);
}
}
class ModelB implements Model {
/** @inheritdoc */
public function requiredFactSets() {
return [
UserInfoFactSet::CLASS,
SchoolInfoFactSet::CLASS,
];
}
/** @inheritdoc */
public function run($factSets) {
usleep(rand(1000000,5000000)); // 1000000 == 1 sec
return (
($factSets[SchoolInfoFactSet::CLASS]->getFact(SchoolInfoFactSet::GPA) * 5)
+ ($factSets[UserInfoFactSet::CLASS]->getFact(UserInfoFactSet::AGE) * 1.5)
- ($factSets[SchoolInfoFactSet::CLASS]->getFact(SchoolInfoFactSet::REGION) * 2)
);
}
}
class ModelC implements Model {
/** @inheritdoc */
public function requiredFactSets() {
return [
SchoolInfoFactSet::CLASS,
ClickInfoFactSet::CLASS,
];
}
/** @inheritdoc */
public function run($factSets) {
usleep(rand(1000000,5000000)); // 1000000 == 1 sec
return (
($factSets[SchoolInfoFactSet::CLASS]->getFact(SchoolInfoFactSet::GPA) * 5)
+ ($factSets[ClickInfoFactSet::CLASS]->getFact(ClickInfoFactSet::GLOBALNEWS) * 7.2)
- ($factSets[SchoolInfoFactSet::CLASS]->getFact(SchoolInfoFactSet::REGION) * 2)
);
}
}
class ModelD implements Model {
/** @inheritdoc */
public function requiredFactSets() {
return [
SchoolInfoFactSet::CLASS,
ClickInfoFactSet::CLASS,
SocialFactSet::CLASS,
];
}
/** @inheritdoc */
public function run($factSets) {
usleep(rand(1000000,5000000)); // 1000000 == 1 sec
return (
($factSets[SchoolInfoFactSet::CLASS]->getFact(SchoolInfoFactSet::GPA) * 5)
+ ($factSets[ClickInfoFactSet::CLASS]->getFact(ClickInfoFactSet::GLOBALNEWS) * 7.2)
- ($factSets[SchoolInfoFactSet::CLASS]->getFact(SchoolInfoFactSet::REGION) * 2)
- ($factSets[SocialFactSet::CLASS]->getFact(SocialFactSet::FBLIKES) * 2)
);
}
}
class ModelE implements Model {
/** @inheritdoc */
public function requiredFactSets() {
return [
SocialFactSet::CLASS,
];
}
/** @inheritdoc */
public function run($factSets) {
usleep(rand(1000000,5000000)); // 1000000 == 1 sec
return (
($factSets[SocialFactSet::CLASS]->getFact(SocialFactSet::FBLIKES) * 2)
);
}
}
<?php
require './defs.php';
$worker = new GearmanWorker();
$worker->addServer();
$worker->addFunction('factSet', function(GearmanJob $job) {
echo "Got job\n";
$serializedFactSet = $job->workload();
/** @var FactSet $factSetObj */
$factSetObj = new $serializedFactSet();
$factSetObj->gatherFacts();
return serialize($factSetObj);
});
while ($worker->work());
<?php
require './defs.php';
$worker = new GearmanWorker();
$worker->addServer();
$worker->addFunction('model', function(GearmanJob $job) {
echo "Got job\n";
$todo = unserialize($job->workload());
$factSets = $todo['factSets'];
$modelName = $todo['model'];
/** @var Model $model */
$model = new $modelName();
$prediction = $model->run($factSets);
return serialize([
'model' => $modelName,
'result' => $prediction
]);
});
while ($worker->work());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment