Last active
February 3, 2019 21:31
-
-
Save fritz-gerneth/4445953f2e58aac9507c697646dd431c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Funct\Demo; | |
use PDO; | |
define('ABSPATH', __DIR__); | |
require './vendor/autoload.php'; | |
pcntl_async_signals(true); | |
$connection = new PDO('mysql:host=localhost;dbname=event_store_tests', 'funct', 'funct'); | |
$lastNo = 0; | |
$gapsDetected = 0; | |
pcntl_signal(SIGINT, function () use (&$gapsDetected, &$lastNo) { | |
$gapsPerEvent = '-'; | |
if ($gapsDetected !== 0) { | |
$gapsPerEvent = $lastNo / $gapsDetected; | |
} | |
echo "Detected $gapsDetected gaps in $lastNo events (processed $gapsPerEvent events / gap)"; | |
exit; | |
}); | |
while(true) { | |
$select = <<<SQL | |
SELECT * from _gcl_collection where `no` > $lastNo ORDER BY `no` LIMIT 3 | |
SQL; | |
$res = $connection->query($select)->fetchAll(); | |
foreach ($res as $event) { | |
$eventNo = intval($event['no']); | |
if ($eventNo !== ($lastNo+1)) { | |
echo 'Gap detected at ' . $lastNo . ', got ' . $eventNo . ' instead' . PHP_EOL; | |
$gapsDetected++; | |
} | |
$lastNo = $eventNo; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
for ((run=0;run<$1;run++)) | |
do | |
php event-generator.php "ar$run" $2 $3 & | |
done | |
wait |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Funct\Demo; | |
use PDO; | |
use Ramsey\Uuid\Uuid; | |
define('ABSPATH', __DIR__); | |
require './vendor/autoload.php'; | |
$connection = new PDO('mysql:host=localhost;dbname=event_store_tests', 'funct', 'funct'); | |
$version = 0; | |
$aggregate = $argv[1]; | |
$useLocks = (bool) $argv[3]; | |
$events = (int) $argv[2]; | |
echo "[$aggregate] Using locks: " . ($useLocks ? 'yes' : 'no') . PHP_EOL; | |
$start = microtime(true); | |
$sql = <<<SQL | |
CALL insert_event_streams(:event_stream, :event_rows); | |
SQL; | |
$smt = $connection->prepare($sql); | |
while($version <= $events) { | |
$values = []; | |
for ($i = 0; $i < 5; $i++) { | |
$values[] = [ | |
'event_id' => Uuid::uuid4()->toString(), | |
'event_name' => 'DemoName', | |
'payload' => [], | |
'metadata' => [ | |
'_aggregate_version' => (++$version), | |
'_aggregate_id' => $aggregate, | |
'_aggregate_type' => 'FunctIonGclBaseModelCollectionAggregateCollection', | |
], | |
'created_at' => (new \DateTime())->format('Y-m-d H:i:s'), | |
]; | |
} | |
$smt->execute([ | |
'event_stream' => '_gcl_collection', | |
'event_rows' => json_encode($values), | |
]); | |
$smt->fetchAll(); | |
} | |
$end = microtime(true); | |
$executionTime = $end - $start; | |
$eventsPerSecond = $version / $executionTime; | |
echo "[$aggregate] Inserted $version events in $executionTime seconds ($eventsPerSecond events/second)" . PHP_EOL; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Funct\Demo; | |
use PDO; | |
use Ramsey\Uuid\Uuid; | |
define('ABSPATH', __DIR__); | |
require './vendor/autoload.php'; | |
$connection = new PDO('mysql:host=localhost;dbname=event_store_tests', 'funct', 'funct'); | |
$version = 0; | |
$aggregate = $argv[1]; | |
$useLocks = (bool) $argv[3]; | |
$events = (int) $argv[2]; | |
echo "[$aggregate] Using locks: " . ($useLocks ? 'yes' : 'no') . PHP_EOL; | |
$start = microtime(true); | |
while($version <= $events) { | |
$values = []; | |
for ($i = 0; $i < 5; $i++) { | |
$values[] = "(" . implode(', ', [ | |
'event_id' => '"' . Uuid::uuid4()->toString() . '"', | |
'event_name' => '"DemoName"', | |
'payload' => '"{}"', | |
'metadata' => '\'' . json_encode([ | |
'_aggregate_version' => (++$version), | |
'_aggregate_id' => $aggregate, | |
'_aggregate_type' => 'FunctIonGclBaseModelCollectionAggregateCollection', | |
]) . '\'', | |
'created_at' => '"' . (new \DateTime())->format('Y-m-d H:i:s') . '"', | |
]) . ")"; | |
} | |
$valueString = implode(', ', $values); | |
$insert = <<<SQL | |
INSERT INTO `_gcl_collection` (`event_id`, `event_name`, `payload`, `metadata`, `created_at`) VALUES $valueString; | |
RELEASE_LOCK('test-ar'); | |
SQL; | |
if ($useLocks) { | |
$getLock = $connection->query('SELECT GET_LOCK("test-ar", -1); '); | |
} | |
$connection->beginTransaction(); | |
$connection->exec($insert); | |
$connection->commit(); | |
if ($useLocks) { | |
$releaseLock = $connection->query('SELECT RELEASE_LOCK("test-ar"); '); | |
} | |
if ($useLocks) { | |
$getLock->fetchAll(); | |
$releaseLock->fetchAll(); | |
} | |
} | |
$end = microtime(true); | |
$executionTime = $end - $start; | |
$eventsPerSecond = $version / $executionTime; | |
echo "[$aggregate] Inserted $version events in $executionTime seconds ($eventsPerSecond events/second)" . PHP_EOL; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Producers | Events Per Producer | Gaps | Events per gap | |
------------------------------------------------------- | |
2 | 50005 | 350 | 29 | |
4 | 50005 | 1916 | 104 | |
6 | 50005 | 1153 | 260 | |
8 | 50005 | 23 | 17393 | |
10 | 50005 | 2 | 250025 | |
Producers | Events Per Producer | Events/sec / producer | Events/sec total | Lock | |
--------------------------------------------------------------------------------- | |
2 | 50005 | 3300 | 6600 | local, no lock | |
2 | 50005 | 1620 | 3240 (49%) | local, lock | |
2 | 50005 | 224 | 448 | remote, no lock | |
2 | 50005 | 95 | 190 | remote, lock | |
4 | 50005 | 1541 | 6164 | local, no lock | |
4 | 50005 | 707 | 2828 (45%) | local, lock | |
6 | 50005 | 1230 | 7380 | local, no lock | |
6 | 50005 | 409 | 2454 (33%) | local, lock | |
8 | 50005 | 962 | 7696 | local, no lock | |
8 | 50005 | 279 | 2232 (29%) | local, lock | |
10 | 50005 | 830 | 8300 | local, no lock | |
10 | 50005 | 250 | 2500 (30%) | local, lock | |
10 | 50005 | 217 | 2170 | remote, no lock | |
10 | 50005 | 42 | 420 | remote, lock | |
10 | 50005 | 53 | 530 | remote, stored procedure |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP PROCEDURE IF EXISTS `insert_event_streams`; | |
DELIMITER $$ | |
CREATE PROCEDURE insert_event_streams ( | |
IN eventstream VARCHAR(255), | |
IN eventrows JSON | |
) | |
BEGIN | |
DECLARE lock_name VARCHAR(260) DEFAULT CONCAT('_lock_', eventstream); | |
DECLARE lock_status INT; -- to keep query buffer clean | |
DECLARE event_index INT DEFAULT 0; | |
DECLARE event_data JSON; | |
DECLARE exit handler for sqlexception | |
BEGIN | |
ROLLBACK; | |
SELECT RELEASE_LOCK(lock_name) INTO lock_status; | |
END; | |
-- Prepared statements do not support declared local variables but only constants and session variables | |
-- https://bugs.mysql.com/bug.php?id=13572 | |
SET @insert_sql = CONCAT('INSERT INTO `', eventstream, '` (`event_id`, `event_name`, `payload`, `metadata`, `created_at`) VALUES (?, ?, ?, ?, ?);'); | |
PREPARE insert_stmt FROM @insert_sql; | |
SELECT GET_LOCK(lock_name, -1) INTO lock_status; | |
START TRANSACTION; | |
WHILE event_index < JSON_LENGTH(eventrows) | |
DO | |
SET event_data = JSON_EXTRACT(eventrows, CONCAT('$[', event_index, ']')); | |
SET @event_id = JSON_UNQUOTE(JSON_EXTRACT(event_data, '$."event_id"')); | |
SET @event_name = JSON_UNQUOTE(JSON_EXTRACT(event_data, '$.event_name')); | |
SET @event_payload = JSON_EXTRACT(event_data, '$.payload'); | |
SET @event_metadata = JSON_EXTRACT(event_data, '$.metadata'); | |
SET @event_date = JSON_UNQUOTE(JSON_EXTRACT(event_data, '$.created_at')); | |
EXECUTE insert_stmt USING @event_id, @event_name, @event_payload, @event_metadata, @event_date; | |
SET event_index = event_index + 1; | |
END WHILE; | |
COMMIT; | |
SELECT RELEASE_LOCK(lock_name) INTO lock_status; | |
DEALLOCATE PREPARE insert_stmt; | |
END$$ | |
DELIMITER ; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE `_gcl_collection` ( | |
`no` BIGINT(20) NOT NULL AUTO_INCREMENT, | |
`event_id` CHAR(36) NOT NULL COLLATE 'utf8_bin', | |
`event_name` VARCHAR(100) NOT NULL COLLATE 'utf8_bin', | |
`payload` JSON NOT NULL, | |
`metadata` JSON NOT NULL, | |
`created_at` DATETIME(6) NOT NULL, | |
`aggregate_version` INT(11) UNSIGNED AS (json_extract(`metadata`,'$._aggregate_version')) STORED, | |
`aggregate_id` VARCHAR(50) AS (json_unquote(json_extract(`metadata`,'$._aggregate_id'))) STORED, | |
`aggregate_type` VARCHAR(200) AS (json_unquote(json_extract(`metadata`,'$._aggregate_type'))) STORED, | |
PRIMARY KEY (`no`), | |
UNIQUE INDEX `ix_event_id` (`event_id`), | |
UNIQUE INDEX `ix_unique_event` (`aggregate_type`, `aggregate_id`, `aggregate_version`), | |
INDEX `ix_query_aggregate` (`aggregate_type`, `aggregate_id`, `no`) | |
) | |
COLLATE='utf8_general_ci' | |
ENGINE=InnoDB | |
AUTO_INCREMENT=48994 | |
; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment