Skip to content

Instantly share code, notes, and snippets.

@spajak
Last active June 28, 2019 17:06
Show Gist options
  • Save spajak/f29ca0b4416c3a023eb769a50526ea0b to your computer and use it in GitHub Desktop.
Save spajak/f29ca0b4416c3a023eb769a50526ea0b to your computer and use it in GitHub Desktop.
PHP long running task with MySQL transaction + progress + state
<?php
/**
* Long running task with single MySQL transaction. Only one task with the same name
* can be running at a time. Next executed task waits untill previous finishes, or
* doesn't wait, but throws DomainException instead.
*
* Use it like so:
* $task = new Task('foo', $db);
* $task->run();
*
*/
class Task
{
const PENDING = 'PENDING';
const WORKING = 'WORKING';
const FINISHING = 'FINISHING';
protected $table = 'tasks';
protected $name;
protected $db;
protected $wait;
protected $displayName;
public function __construct($name, PDO $db, $wait = false)
{
$this->name = $name;
$this->db = $db;
$this->wait = $wait;
$this->displayName = sprintf(
'%s(%s)',
(new ReflectionClass($this))->getShortName(),
$this->name
);
}
public function __toString()
{
return $this->displayName;
}
public function getState()
{
// Important: setting lowest isolation level for the next transaction
$this->db->exec('SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED');
$this->db->exec('START TRANSACTION');
// Check if another task is not running.
// We read here uncommited data from another transaction.
$stmt = $this->db->query(
"SELECT `state` FROM `{$this->table}` WHERE `name` = '{$this->name}'"
);
$task = $stmt->fetch(PDO::FETCH_ASSOC);
$state = null;
if ($task and $task['state']) {
$state = $task['state'];
}
$this->db->exec('ROLLBACK');
return $state;
}
public function run()
{
$this->prepare();
if (!$this->wait) {
// We don't want to enqueue this task, just check and throw or continue
if ($state = $this->getState()) {
throw new DomainException(sprintf(
'Task with the same name "%s" already running. State: %s',
$this->name,
$state
));
}
}
// Start task transaction in default isolation level (commit is implicit here)
$this->db->beginTransaction();
// Acquire exclusive lock on the task row before starting.
// See: MySQL innodb_lock_wait_timeout variable
$this->log('Acquiring row lock...');
$stmt = $this->db->query(
"SELECT * FROM `{$this->table}` WHERE `name` = '{$this->name}' FOR UPDATE"
);
if (false === $task = $stmt->fetch()) {
throw new InvalidArgumentException(sprintf(
'Task row "%s" not found in database',
$this->name
));
}
$this->setState(self::PENDING);
$this->log('Starting work...');
// Do some checks & preparations first
usleep(250000);
$this->setState(self::WORKING);
$this->progress(0);
for ($i = 1; $i <= 100; ++$i) {
usleep(250000);
$this->progress($i);
}
$this->setState(self::FINISHING);
$this->log('Cleaning up...');
// Do some cleanup, release resources etc
usleep(500000);
// Important before commit (but not important if rolling back!)
$this->setState(null);
$this->db->commit();
$this->log('Done!');
}
protected function setState($state)
{
$stmt = $this->db->prepare(
"UPDATE `{$this->table}` SET `state` = ? WHERE `name` = ?"
);
$stmt->execute([$state, $this->name]);
}
protected function progress($value, $forceEol = false)
{
$value = min(100, max(0, (int) $value));
// https://stackoverflow.com/a/5265440/315785
$this->log(
sprintf("Working ...... %3s%%", $value),
($forceEol or $value === 100) ? "\n" : "\r"
);
}
protected function log($message, $eol = "\n")
{
file_put_contents('php://stdout', sprintf(
"%s: %s%s",
$this->displayName,
$message,
$eol
));
}
protected function prepare()
{
$stmt = $this->db->query("SHOW TABLES LIKE '{$this->table}'");
if (!$stmt->fetch()) {
$this->log('Initializing database...');
$this->createTable();
}
}
protected function createTable()
{
$sql =<<<_SQL
DROP TABLE IF EXISTS `tasks`;
CREATE TABLE `tasks` (
`id` INT NOT NULL AUTO_INCREMENT KEY,
`name` VARCHAR(20) NOT NULL UNIQUE,
`state` VARCHAR(10)
);
INSERT INTO `tasks` (`name`) VALUES ('foo'), ('bar');
_SQL;
foreach (explode(";", $sql) as $s) {
if ($s = trim($s)) {
$this->db->exec($s);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment