Skip to content

Instantly share code, notes, and snippets.

@jasonknight
Last active December 20, 2018 17:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jasonknight/fb46562c0583398f9417ae5bf6875cde to your computer and use it in GitHub Desktop.
Save jasonknight/fb46562c0583398f9417ae5bf6875cde to your computer and use it in GitHub Desktop.
<?php
function create_connection() {
$mysqli = new mysqli("localhost", "root", "xxx");
if ( $mysqli->connect_errno )
die( $mysqli->connect_error );
$mysqli->set_charset('utf8');
return $mysqli;
}
$conn = create_connection();
$create_table = "
DROP DATABASE IF EXISTS `concurrency_test`;
CREATE DATABASE `concurrency_test`;
CREATE TABLE `concurrency_test`.`invoices` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`reference` varchar(50) NOT NULL,
`total` int(11) NOT NULL DEFAULT '0',
`idate` DATE NOT NULL,
`stimestamp` INT(11) NOT NULL COMMENT 'The moment of the save',
`tid` int(11) NOT NULL COMMENT 'the id of the inserting process',
`ind` int(11) NOT NULL COMMENT 'the index of the invoice',
PRIMARY KEY (`id`),
UNIQUE KEY `reference` (`reference`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;
CREATE TABLE `concurrency_test`.`meta` (
`name` varchar(50) NOT NULL,
`val` varchar(50) NOT NULL
) ENGINE=INNODB DEFAULT CHARSET=utf8;
";
if ( ! $conn->multi_query($create_table) ) {
die( $conn->errno . " - " . $conn->error );
} else {
while($conn->next_result()) {;}
}
$res = $conn->query("INSERT INTO `concurrency_test`.`meta` (name,val) VALUES ('reference','RV00000');");
if ( ! $res ) {
echo $conn->errno . " - " . $conn->error . "\n";
}
$queue = [];
for ( $i = 0; $i < 10000; $i++ ) {
$inv = new stdClass();
$inv->total = rand(1,1000);
// because invoices can be from any time
$inv->date = sprintf("%d-%d-%d",rand(2000,2018),rand(1,12),rand(1,29));
$inv->index = $i;
$inv->tries = 0;
$queue[] = $inv;
}
class Task {
public function __construct($id,$invoices) {
$this->id = $id;
$this->invoices = $invoices;
}
public function run() {
file_put_contents("./thread.log","{$this->id}::run called.\n",FILE_APPEND);
$this->db = create_connection();
while ( count($this->invoices) > 0 ) {
$invoice = array_pop($this->invoices);
if ( $invoice->tries > 3) {
// we're testing here, it should never have
// this problem
file_put_contents("./thread.log","{$this->id} too many tries, killing\n",FILE_APPEND);
$this->db->close();
return;
}
$sql = "
BEGIN;
SELECT (SUBSTRING(val,3) * 1) FROM concurrency_test.meta WHERE name = 'reference' INTO @ref FOR UPDATE;
SELECT CONCAT('RV', LPAD(@ref + 1,5,'0')) INTO @new_ref;
INSERT INTO
concurrency_test.invoices
(reference,total,stimestamp,idate,tid,ind)
VALUES
(@new_ref,
{$invoice->total},
".time().",
'{$invoice->date}',
{$this->id}, {$invoice->index});
UPDATE concurrency_test.meta SET val = @new_ref WHERE name = 'reference';
COMMIT;
";
$res = $this->db->multi_query($sql);
if ( !$res ) {
file_put_contents("./thread.log", "{$this->id} error: {$this->db->error}\n",FILE_APPEND);
$invoice->tries++;
array_push($this->invoices,$invoice);
} else {
file_put_contents("./thread.log","{$this->id} says: {$invoice->index} saved.\n",FILE_APPEND);
while ($this->db->next_result()) {;}
}
}
$this->db->close();
}
}
$num_threads = $_SERVER['argv'][1];
if ( empty($num_threads) )
$num_threads = 16;
$per = count($queue) / $num_threads;
$payloads = [];
srand(time());
while( count($queue) > 0 ) {
$index = rand(0,$num_threads-1);
if ( !isset($payloads[$index]) )
$payloads[$index] = [];
$payloads[$index][] = array_pop($queue);
//srand(time() + $index);
}
$tasks = [];
foreach ( $payloads as $id=>$invoices ) {
$task = new Task($id,$invoices);
$pid = pcntl_fork();
if ( $pid == -1 ) {
die("wtf?");
} else if ($pid) {
echo "$id added as task\n";
array_push($tasks,$pid);
} else {
$task->run();
break;
}
}
while ( count($tasks) > 0 ) {
foreach ( $tasks as $i=>$t ) {
$r = pcntl_waitpid($t,$status, WNOHANG);
if ( $r > 0 || $r == -1 ) {
unset($tasks[$i]);
}
}
}
$conn->close();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment