Skip to content

Instantly share code, notes, and snippets.

@funkytaco
Last active December 18, 2015 19:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save funkytaco/5833737 to your computer and use it in GitHub Desktop.
Save funkytaco/5833737 to your computer and use it in GitHub Desktop.
<?php
# sender
require_once($LIB_DIR_PATH .'/pheanstalk/pheanstalk_init.php');
$beanstalk = new Pheanstalk('127.0.0.1');
$beanstalk->useTube("booth-jobs");
$watchtasks[] = $beanstalk->put(microtime(1), Pheanstalk::DEFAULT_PRIORITY, 10); # 10 sec wait
$beanstalk->useTube("booth-jobs");
$watchtasks[] = $beanstalk->put(microtime(1), Pheanstalk::DEFAULT_PRIORITY, 20); # 20 sec wait
$beanstalk->useTube("booth-jobs");
$watchtasks[] = $beanstalk->put(microtime(1), Pheanstalk::DEFAULT_PRIORITY, 30); # 30 sec wait
$beanstalk->useTube("booth-jobs");
$watchtasks[] = $beanstalk->put(microtime(1), Pheanstalk::DEFAULT_PRIORITY, 40); # 40 sec wait
$beanstalk->useTube("watch");
$beanstalk->put(json_encode($watchtasks), Pheanstalk::DEFAULT_PRIORITY, 5);
# worker
require_once($LIB_DIR_PATH .'/pheanstalk/pheanstalk_init.php');
$beanstalk = new Pheanstalk('127.0.0.1');
$beanstalk->watchOnly("booth-jobs");
while(true){
$job = $beanstalk->reserve();
$beanstalk->bury($job);
var_dump("Delete job {$job->getId()}");
$beanstalk->delete($job);
}
# watcher
require_once($LIB_DIR_PATH .'/pheanstalk/pheanstalk_init.php');
beanstalk = new Pheanstalk('127.0.0.1');
$beanstalk->watchOnly("watch");
while(true){
$job = $beanstalk->reserve();
if($job){
$beanstalk->bury($job);
var_dump("watch {$job->getId()} {$job->getData()}");
$watchtasks = json_decode($job->getData(), true);
$todo = array();
foreach($watchtasks as $task){
try{
$beanstalk->useTube("booth-jobs");
$beanstalk->statsJob($task);
$todo[] = $task;
var_dump("task $task not done");
}catch (Exception $e){
var_dump("task $task done $e");
}
}
if($todo){
$beanstalk->useTube("watch");
$beanstalk->put(json_encode($todo), Pheanstalk::DEFAULT_PRIORITY, 5); # 5 sec retry
}else{
var_dump("stop watch");
}
$beanstalk->delete($job);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment