Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
<?php
namespace Funct\Demo;
use PDO;
define('ABSPATH', __DIR__);
require './vendor/autoload.php';
pcntl_async_signals(true);
$connection = new PDO('mysql:host=localhost;dbname=event_store_tests', 'funct', 'funct');
$lastNo = 0;
$gapsDetected = 0;
pcntl_signal(SIGINT, function () use (&$gapsDetected, &$lastNo) {
$gapsPerEvent = '-';
if ($gapsDetected !== 0) {
$gapsPerEvent = $lastNo / $gapsDetected;
}
echo "Detected $gapsDetected gaps in $lastNo events (processed $gapsPerEvent events / gap)";
exit;
});
while(true) {
$select = <<<SQL
SELECT * from _gcl_collection where `no` > $lastNo ORDER BY `no` LIMIT 3
SQL;
$res = $connection->query($select)->fetchAll();
foreach ($res as $event) {
$eventNo = intval($event['no']);
if ($eventNo !== ($lastNo+1)) {
echo 'Gap detected at ' . $lastNo . ', got ' . $eventNo . ' instead' . PHP_EOL;
$gapsDetected++;
}
$lastNo = $eventNo;
}
}
#!/bin/bash
for ((run=0;run<$1;run++))
do
php event-generator.php "ar$run" $2 $3 &
done
wait
<?php
namespace Funct\Demo;
use PDO;
use Ramsey\Uuid\Uuid;
define('ABSPATH', __DIR__);
require './vendor/autoload.php';
$connection = new PDO('mysql:host=localhost;dbname=event_store_tests', 'funct', 'funct');
$version = 0;
$aggregate = $argv[1];
$useLocks = (bool) $argv[3];
$events = (int) $argv[2];
echo "[$aggregate] Using locks: " . ($useLocks ? 'yes' : 'no') . PHP_EOL;
$start = microtime(true);
$sql = <<<SQL
CALL insert_event_streams(:event_stream, :event_rows);
SQL;
$smt = $connection->prepare($sql);
while($version <= $events) {
$values = [];
for ($i = 0; $i < 5; $i++) {
$values[] = [
'event_id' => Uuid::uuid4()->toString(),
'event_name' => 'DemoName',
'payload' => [],
'metadata' => [
'_aggregate_version' => (++$version),
'_aggregate_id' => $aggregate,
'_aggregate_type' => 'FunctIonGclBaseModelCollectionAggregateCollection',
],
'created_at' => (new \DateTime())->format('Y-m-d H:i:s'),
];
}
$smt->execute([
'event_stream' => '_gcl_collection',
'event_rows' => json_encode($values),
]);
$smt->fetchAll();
}
$end = microtime(true);
$executionTime = $end - $start;
$eventsPerSecond = $version / $executionTime;
echo "[$aggregate] Inserted $version events in $executionTime seconds ($eventsPerSecond events/second)" . PHP_EOL;
<?php
namespace Funct\Demo;
use PDO;
use Ramsey\Uuid\Uuid;
define('ABSPATH', __DIR__);
require './vendor/autoload.php';
$connection = new PDO('mysql:host=localhost;dbname=event_store_tests', 'funct', 'funct');
$version = 0;
$aggregate = $argv[1];
$useLocks = (bool) $argv[3];
$events = (int) $argv[2];
echo "[$aggregate] Using locks: " . ($useLocks ? 'yes' : 'no') . PHP_EOL;
$start = microtime(true);
while($version <= $events) {
$values = [];
for ($i = 0; $i < 5; $i++) {
$values[] = "(" . implode(', ', [
'event_id' => '"' . Uuid::uuid4()->toString() . '"',
'event_name' => '"DemoName"',
'payload' => '"{}"',
'metadata' => '\'' . json_encode([
'_aggregate_version' => (++$version),
'_aggregate_id' => $aggregate,
'_aggregate_type' => 'FunctIonGclBaseModelCollectionAggregateCollection',
]) . '\'',
'created_at' => '"' . (new \DateTime())->format('Y-m-d H:i:s') . '"',
]) . ")";
}
$valueString = implode(', ', $values);
$insert = <<<SQL
INSERT INTO `_gcl_collection` (`event_id`, `event_name`, `payload`, `metadata`, `created_at`) VALUES $valueString;
RELEASE_LOCK('test-ar');
SQL;
if ($useLocks) {
$getLock = $connection->query('SELECT GET_LOCK("test-ar", -1); ');
}
$connection->beginTransaction();
$connection->exec($insert);
$connection->commit();
if ($useLocks) {
$releaseLock = $connection->query('SELECT RELEASE_LOCK("test-ar"); ');
}
if ($useLocks) {
$getLock->fetchAll();
$releaseLock->fetchAll();
}
}
$end = microtime(true);
$executionTime = $end - $start;
$eventsPerSecond = $version / $executionTime;
echo "[$aggregate] Inserted $version events in $executionTime seconds ($eventsPerSecond events/second)" . PHP_EOL;
Producers | Events Per Producer | Gaps | Events per gap
-------------------------------------------------------
2 | 50005 | 350 | 29
4 | 50005 | 1916 | 104
6 | 50005 | 1153 | 260
8 | 50005 | 23 | 17393
10 | 50005 | 2 | 250025
Producers | Events Per Producer | Events/sec / producer | Events/sec total | Lock
---------------------------------------------------------------------------------
2 | 50005 | 3300 | 6600 | local, no lock
2 | 50005 | 1620 | 3240 (49%) | local, lock
2 | 50005 | 224 | 448 | remote, no lock
2 | 50005 | 95 | 190 | remote, lock
4 | 50005 | 1541 | 6164 | local, no lock
4 | 50005 | 707 | 2828 (45%) | local, lock
6 | 50005 | 1230 | 7380 | local, no lock
6 | 50005 | 409 | 2454 (33%) | local, lock
8 | 50005 | 962 | 7696 | local, no lock
8 | 50005 | 279 | 2232 (29%) | local, lock
10 | 50005 | 830 | 8300 | local, no lock
10 | 50005 | 250 | 2500 (30%) | local, lock
10 | 50005 | 217 | 2170 | remote, no lock
10 | 50005 | 42 | 420 | remote, lock
10 | 50005 | 53 | 530 | remote, stored procedure
DROP PROCEDURE IF EXISTS `insert_event_streams`;
DELIMITER $$
CREATE PROCEDURE insert_event_streams (
IN eventstream VARCHAR(255),
IN eventrows JSON
)
BEGIN
DECLARE lock_name VARCHAR(260) DEFAULT CONCAT('_lock_', eventstream);
DECLARE lock_status INT; -- to keep query buffer clean
DECLARE event_index INT DEFAULT 0;
DECLARE event_data JSON;
DECLARE exit handler for sqlexception
BEGIN
ROLLBACK;
SELECT RELEASE_LOCK(lock_name) INTO lock_status;
END;
-- Prepared statements do not support declared local variables but only constants and session variables
-- https://bugs.mysql.com/bug.php?id=13572
SET @insert_sql = CONCAT('INSERT INTO `', eventstream, '` (`event_id`, `event_name`, `payload`, `metadata`, `created_at`) VALUES (?, ?, ?, ?, ?);');
PREPARE insert_stmt FROM @insert_sql;
SELECT GET_LOCK(lock_name, -1) INTO lock_status;
START TRANSACTION;
WHILE event_index < JSON_LENGTH(eventrows)
DO
SET event_data = JSON_EXTRACT(eventrows, CONCAT('$[', event_index, ']'));
SET @event_id = JSON_UNQUOTE(JSON_EXTRACT(event_data, '$."event_id"'));
SET @event_name = JSON_UNQUOTE(JSON_EXTRACT(event_data, '$.event_name'));
SET @event_payload = JSON_EXTRACT(event_data, '$.payload');
SET @event_metadata = JSON_EXTRACT(event_data, '$.metadata');
SET @event_date = JSON_UNQUOTE(JSON_EXTRACT(event_data, '$.created_at'));
EXECUTE insert_stmt USING @event_id, @event_name, @event_payload, @event_metadata, @event_date;
SET event_index = event_index + 1;
END WHILE;
COMMIT;
SELECT RELEASE_LOCK(lock_name) INTO lock_status;
DEALLOCATE PREPARE insert_stmt;
END$$
DELIMITER ;
CREATE TABLE `_gcl_collection` (
`no` BIGINT(20) NOT NULL AUTO_INCREMENT,
`event_id` CHAR(36) NOT NULL COLLATE 'utf8_bin',
`event_name` VARCHAR(100) NOT NULL COLLATE 'utf8_bin',
`payload` JSON NOT NULL,
`metadata` JSON NOT NULL,
`created_at` DATETIME(6) NOT NULL,
`aggregate_version` INT(11) UNSIGNED AS (json_extract(`metadata`,'$._aggregate_version')) STORED,
`aggregate_id` VARCHAR(50) AS (json_unquote(json_extract(`metadata`,'$._aggregate_id'))) STORED,
`aggregate_type` VARCHAR(200) AS (json_unquote(json_extract(`metadata`,'$._aggregate_type'))) STORED,
PRIMARY KEY (`no`),
UNIQUE INDEX `ix_event_id` (`event_id`),
UNIQUE INDEX `ix_unique_event` (`aggregate_type`, `aggregate_id`, `aggregate_version`),
INDEX `ix_query_aggregate` (`aggregate_type`, `aggregate_id`, `no`)
)
COLLATE='utf8_general_ci'
ENGINE=InnoDB
AUTO_INCREMENT=48994
;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment