Last active
June 28, 2019 17:06
-
-
Save spajak/f29ca0b4416c3a023eb769a50526ea0b to your computer and use it in GitHub Desktop.
PHP long running task with MySQL transaction + progress + state
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* Long running task with single MySQL transaction. Only one task with the same name | |
* can be running at a time. Next executed task waits untill previous finishes, or | |
* doesn't wait, but throws DomainException instead. | |
* | |
* Use it like so: | |
* $task = new Task('foo', $db); | |
* $task->run(); | |
* | |
*/ | |
class Task | |
{ | |
const PENDING = 'PENDING'; | |
const WORKING = 'WORKING'; | |
const FINISHING = 'FINISHING'; | |
protected $table = 'tasks'; | |
protected $name; | |
protected $db; | |
protected $wait; | |
protected $displayName; | |
public function __construct($name, PDO $db, $wait = false) | |
{ | |
$this->name = $name; | |
$this->db = $db; | |
$this->wait = $wait; | |
$this->displayName = sprintf( | |
'%s(%s)', | |
(new ReflectionClass($this))->getShortName(), | |
$this->name | |
); | |
} | |
public function __toString() | |
{ | |
return $this->displayName; | |
} | |
public function getState() | |
{ | |
// Important: setting lowest isolation level for the next transaction | |
$this->db->exec('SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED'); | |
$this->db->exec('START TRANSACTION'); | |
// Check if another task is not running. | |
// We read here uncommited data from another transaction. | |
$stmt = $this->db->query( | |
"SELECT `state` FROM `{$this->table}` WHERE `name` = '{$this->name}'" | |
); | |
$task = $stmt->fetch(PDO::FETCH_ASSOC); | |
$state = null; | |
if ($task and $task['state']) { | |
$state = $task['state']; | |
} | |
$this->db->exec('ROLLBACK'); | |
return $state; | |
} | |
public function run() | |
{ | |
$this->prepare(); | |
if (!$this->wait) { | |
// We don't want to enqueue this task, just check and throw or continue | |
if ($state = $this->getState()) { | |
throw new DomainException(sprintf( | |
'Task with the same name "%s" already running. State: %s', | |
$this->name, | |
$state | |
)); | |
} | |
} | |
// Start task transaction in default isolation level (commit is implicit here) | |
$this->db->beginTransaction(); | |
// Acquire exclusive lock on the task row before starting. | |
// See: MySQL innodb_lock_wait_timeout variable | |
$this->log('Acquiring row lock...'); | |
$stmt = $this->db->query( | |
"SELECT * FROM `{$this->table}` WHERE `name` = '{$this->name}' FOR UPDATE" | |
); | |
if (false === $task = $stmt->fetch()) { | |
throw new InvalidArgumentException(sprintf( | |
'Task row "%s" not found in database', | |
$this->name | |
)); | |
} | |
$this->setState(self::PENDING); | |
$this->log('Starting work...'); | |
// Do some checks & preparations first | |
usleep(250000); | |
$this->setState(self::WORKING); | |
$this->progress(0); | |
for ($i = 1; $i <= 100; ++$i) { | |
usleep(250000); | |
$this->progress($i); | |
} | |
$this->setState(self::FINISHING); | |
$this->log('Cleaning up...'); | |
// Do some cleanup, release resources etc | |
usleep(500000); | |
// Important before commit (but not important if rolling back!) | |
$this->setState(null); | |
$this->db->commit(); | |
$this->log('Done!'); | |
} | |
protected function setState($state) | |
{ | |
$stmt = $this->db->prepare( | |
"UPDATE `{$this->table}` SET `state` = ? WHERE `name` = ?" | |
); | |
$stmt->execute([$state, $this->name]); | |
} | |
protected function progress($value, $forceEol = false) | |
{ | |
$value = min(100, max(0, (int) $value)); | |
// https://stackoverflow.com/a/5265440/315785 | |
$this->log( | |
sprintf("Working ...... %3s%%", $value), | |
($forceEol or $value === 100) ? "\n" : "\r" | |
); | |
} | |
protected function log($message, $eol = "\n") | |
{ | |
file_put_contents('php://stdout', sprintf( | |
"%s: %s%s", | |
$this->displayName, | |
$message, | |
$eol | |
)); | |
} | |
protected function prepare() | |
{ | |
$stmt = $this->db->query("SHOW TABLES LIKE '{$this->table}'"); | |
if (!$stmt->fetch()) { | |
$this->log('Initializing database...'); | |
$this->createTable(); | |
} | |
} | |
protected function createTable() | |
{ | |
$sql =<<<_SQL | |
DROP TABLE IF EXISTS `tasks`; | |
CREATE TABLE `tasks` ( | |
`id` INT NOT NULL AUTO_INCREMENT KEY, | |
`name` VARCHAR(20) NOT NULL UNIQUE, | |
`state` VARCHAR(10) | |
); | |
INSERT INTO `tasks` (`name`) VALUES ('foo'), ('bar'); | |
_SQL; | |
foreach (explode(";", $sql) as $s) { | |
if ($s = trim($s)) { | |
$this->db->exec($s); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment