Skip to content

Instantly share code, notes, and snippets.

@Jervelund
Created March 1, 2016 04:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Jervelund/6b5b793437ce628e684f to your computer and use it in GitHub Desktop.
Save Jervelund/6b5b793437ce628e684f to your computer and use it in GitHub Desktop.
PHP semaphore worker queue
<?PHP
/*
Simple implementation of process queue in PHP using MySQL database.
This method does not use simple semaphores, as it is impotant for the application to run input in sequential order for each thread (order of data received).
CREATE TABLE IF NOT EXISTS `sessions` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`thread` int(11) NOT NULL,
`time_initiated` decimal(14,4) unsigned NOT NULL,
`time_heartbeat` decimal(14,4) unsigned NOT NULL,
PRIMARY KEY (`id`),
KEY `TIME` (`time_heartbeat`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 AUTO_INCREMENT=1;
*/
$THREAD_ID = $_GET['thread'];
$req_time = $_SERVER['REQUEST_TIME_FLOAT'];
$heartbeat = microtime(true);
$dbh = new PDO('mysql:socket=/var/run/mysqld/mysqld.sock;dbname=DATABASE', 'USERNAME', 'PASSWORD', array( PDO::ATTR_PERSISTENT => true));
$dbh->query("INSERT INTO `sessions` (`thread`,`time_initiated`,`time_heartbeat`) VALUES (".$THREAD_ID.",".$req_time.",".$heartbeat.");");
$session_id = @$dbh->lastInsertId();
if($session_id == 0 || !is_numeric($session_id)){
die('Could not create session');
}
// Fetch the next in line, for current thread.
$first_session_in_queue = @$dbh->query("SELECT * FROM `sessions` WHERE `thread`=".$THREAD_ID." ORDER BY `time_initiated` ASC LIMIT 0,1;")->fetch(PDO::FETCH_ASSOC);
// SESSIONS: While the current session is not the first in the queue, wait for others to finish
while($first_session_in_queue['id'] != $session_id){
echo("Next to process: ".$first_session_in_queue['id']."\r\n");
echo(time()." Wait to do work\r\n");
sleep(1); // Polling interval
$now = microtime(true);
$dead_session_timer = $now-120; // Remove all sessions which have not updated heartbeat in two minutes
$dbh->query("UPDATE `sessions` SET `time_heartbeat` = ".$now." WHERE `id`=".$session_id.";"); // Refresh own session
$dbh->query("DELETE FROM `sessions` WHERE `time_heartbeat` < ".$dead_session_timer.";"); // Flush orphaned sessions
$first_session_in_queue = @$dbh->query("SELECT * FROM `sessions` WHERE `thread`=".$THREAD_ID." ORDER BY `time_initiated` ASC LIMIT 0,1;")->fetch(PDO::FETCH_ASSOC);
}
// If this session is first in queue, start processing of request
echo(time()." Doing work\r\n");
sleep(10); // Simulate work is being done
echo(time()." Done\r\n");
// Clean up by removing current session from database table, to allow next to process
$dbh->query("DELETE FROM `sessions` WHERE `id` = ".$session_id.";");
echo(time()." Removed session");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment