Skip to content

Instantly share code, notes, and snippets.

@ryantxr
Created February 12, 2018 14:31
Show Gist options
  • Save ryantxr/fd46991bbe5ceb01c092c15435386385 to your computer and use it in GitHub Desktop.
Save ryantxr/fd46991bbe5ceb01c092c15435386385 to your computer and use it in GitHub Desktop.
Use Beanstalk and PHP to make an asynchronous system
class BeanstalkClient extends AbstractBaseQueue{
public $queue;
public $host;
public $port;
public $timeout;
function __construct($timeout=null) {
$this->loadClasses();
$this->host = '127.0.0.1;
$this->port = BEANSTALK_PORT;
$this->timeout = 30;
$this->connect();
}
public function connect(){
$this->queue = new \Pheanstalk\Pheanstalk($this->host, $this->port);
}
public function publish($tube, $data, $delay){
$payload = $this->encodeData($data);
$this->queue->useTube($tube)->put($payload,
\Pheanstalk\PheanstalkInterface::DEFAULT_PRIORITY, $delay);
}
public function waitForMessages($tube, $callback=null){
if ( $this->timeout ) {
return $this->queue->watchOnly($tube)->reserve($this->timeout);
}
return $this->queue->watchOnly($tube)->reserve();
}
public function delete($message){
$this->queue->delete($message);
}
public function encodeData($data){
$payload = json_encode($data);
return $payload;
}
public function decodeData($encodedData) {
return json_decode($encodedData, true);
}
public function getData($message){
if ( is_string($message) ) {
throw new Exception('message is a string');
}
return json_decode($message->getData(), true);
}
}
abstract class BaseQueueProcess {
protected $channelName = ''; // child class should set this
// The queue object
public $queue = null;
public $processId = null; // this is the system process id
public $name = null;
public $status = null;
public function initialize() {
$this->processId = getmypid();
$this->name = get_called_class();
$this->endTime = time() + (2 * 60 * 60); // restart every hour
// seconds to timeout when waiting for a message
// if the process isn't doing anything, timeout so they have a chance to do housekeeping.
$queueTimeout = 900;
if ( empty($this->queue) ) {
$this->queue = new BeanstalkClient($queueTimeout);
}
}
public function receiveMessage($queueMessage) {
$taskData = $this->queue->getData($queueMessage);
// debuglog(' Task Data = ' . print_r($taskData, true));
if ( $this->validateTaskData($taskData) ) {
// process the message
$good = $this->didReceiveMessage($taskData);
if ( $good !== false ) {
// debuglog("Completing task {$this->taskId}");
$this->completeTask($queueMessage);
}
else {
$this->failTask($queueMessage);
}
}
else {
// Handle bad message
$this->queue->delete($queueMessage);
}
}
public function run() {
$this->processName = $this->channelName;
// debuglog('Start ' . $this->processName);
// debuglog(print_r($this->params, true));
while(1) {
$queueMessage = $this->queue->waitForMessages($this->channelName);
if ( ! empty($queueMessage) ) {
$this->receiveMessage($queueMessage);
}
else {
// empty message
// a timeout
// // debuglog("empty message " . get_called_class());
}
$memory = memory_get_usage();
if( $memory > 20000000 ) {
// debuglog('Exit '.get_called_class().' due to memory. Memory:'. ($memory/1024/1024).' MB');
exit;
}
elseif ( time() > $this->endTime ) {
// debuglog('Exit '.get_called_class().' due to time.');
exit;
}
usleep(10);
}
}
public function completeTask($queueMessage) {
//
$this->queue->delete($queueMessage);
}
public function failTask($queueMessage) {
//
$this->queue->delete($queueMessage);
}
}
class MyProcess extends BaseQueueProcess {
public function initialize() {
$this->channelName = 'Temperature';
parent::initialize();
}
public function didReceiveMessage($taskData) {
// debuglog(print_r($taskData, true));
// process data here
// return false if something went wrong
return true;
}
}
Sender
class WorkSender {
const TubeName = 'Temperature';
const TubeDelay = 0; // Set delay to 0, i.e. don't use a delay.
function send($data) {
$c = BeanstalkClient();
$c->publish(self::TubeName, $data, self::TubeDelay);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment