Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Event Machine Example
{
"variables": [],
"info": {
"name": "EventMachine Example",
"_postman_id": "89698ba0-98f0-cc94-2f71-9f19c183e3f3",
"description": "",
"schema": "https://schema.getpostman.com/json/collection/v2.0.0/collection.json"
},
"item": [
{
"name": "RegisterUser",
"request": {
"url": "http://localhost:8080/api/messagebox/RegisterUser",
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json",
"description": ""
},
{
"key": "Accept",
"value": "application/json",
"description": ""
}
],
"body": {
"mode": "raw",
"raw": "{\n\t\"payload\": {\n\t\t\"userId\": \"932577dd-3593-47bd-8bd0-3bd7bf7e111f\",\n\t\t\"email\": \"contact@prooph.de\",\n\t\t\"username\": \"codeliner\"\n\t}\n}"
},
"description": "Register a new user"
},
"response": []
},
{
"name": "ChangeUsername",
"request": {
"url": "http://localhost:8080/api/messagebox/ChangeUsername",
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json",
"description": ""
},
{
"key": "Accept",
"value": "application/json",
"description": ""
}
],
"body": {
"mode": "raw",
"raw": "{\n\t\"payload\": {\n\t\t\"userId\": \"932577dd-3593-47bd-8bd0-3bd7bf7e111f\",\n\t\t\"newUsername\": \"your name here\"\n\t}\n}"
},
"description": "Change name of a user"
},
"response": []
}
]
}
<?php
declare(strict_types = 1);
namespace App\Config;
use App\Model\UserDescription;
return [
'environment' => getenv('PROOPH_ENV')?: 'prod',
'pdo' => [
'dsn' => getenv('PDO_DSN'),
'user' => getenv('PDO_USER'),
'pwd' => getenv('PDO_PWD'),
],
'mongo' => [
'server' => getenv('MONGO_SERVER'),
'db' => getenv('MONGO_DB_NAME'),
],
'rabbit' => [
'connection' => [
'host' => getenv('RABBIT_HOST')?: 'rabbit',
'port' => (int)getenv('RABBIT_PORT')?: 5672,
'login' => getenv('RABBIT_USER')?: 'event-machine',
'password' => getenv('RABBIT_PWD')?: 'event-machine',
'vhost' => getenv('RABBIT_VHOST')?: '/event-machine',
'persistent' => (bool)getenv('RABBIT_PERSISTENT')?: false,
'read_timeout' => (int)getenv('RABBIT_READ_TIMEOUT')?: 1, //sec, float allowed
'write_timeout' => (int)getenv('RABBIT_WRITE_TIMEOUT')?: 1, //sec, float allowed,
'heartbeat' => (int)getenv('RABBIT_HEARTBEAT')?: 0,
'verify' => false
],
'ui_exchange' => getenv('RABBIT_UI_EXCHANGE')?: 'ui-exchange',
],
'event_machine' => [
'descriptions' => [
UserDescription::class
]
]
];
<?php
declare(strict_types = 1);
namespace App\Model;
use Prooph\Common\Messaging\Message;
use Prooph\EventMachine\Aggregate\Exception\AggregateNotFound;
use Prooph\EventMachine\EventMachine;
use Prooph\EventMachine\EventMachineDescription;
use Prooph\EventMachine\JsonSchema\JsonSchema;
use Prooph\EventMachine\Persistence\Stream;
use React\Promise\Deferred;
final class UserDescription implements EventMachineDescription
{
const COMMAND_REGISTER_USER = "RegisterUser";
const COMMAND_CHANGE_USERNAME = "ChangeUsername";
const EVENT_USER_WAS_REGISTERED = "UserWasRegistered";
const EVENT_USERNAME_WAS_CHANGED = "UsernameWasChanged";
const QUERY_USER = 'User';
const AGGREGATE_USER = "User";
const IDENTIFIER = 'userId';
private static $userSchema;
public static function describe(EventMachine $eventMachine): void
{
self::describeMessages($eventMachine);
self::describeRegisterUser($eventMachine);
self::describeChangeUsername($eventMachine);
self::describeProjectUser($eventMachine);
self::describeQueryUser($eventMachine);
}
private static function describeMessages(EventMachine $eventMachine): void
{
$userId = ['type' => 'string', 'pattern' => '^[A-Za-z0-9-]{36}$'];
$username = ['type' => 'string', 'minLength' => 1];
$email = ['type' => 'string', 'format' => 'email'];
self::$userSchema = JsonSchema::object([
'userId' => $userId,
'username' => $username,
'email' => $email
]);
$eventMachine->registerCommand(self::COMMAND_REGISTER_USER, self::$userSchema);
$eventMachine->registerCommand(self::COMMAND_CHANGE_USERNAME, JsonSchema::object([
'userId' => $userId,
'newUsername' => $username,
]));
$eventMachine->registerEvent(self::EVENT_USER_WAS_REGISTERED, JsonSchema::object([
'userId' => $userId,
'username' => $username,
'email' => $email
]));
$eventMachine->registerEvent(self::EVENT_USERNAME_WAS_CHANGED, JsonSchema::object([
'userId' => $userId,
'oldUsername' => $username,
'newUsername' => $username
]));
}
private static function describeRegisterUser(EventMachine $eventMachine): void
{
$eventMachine->process(self::COMMAND_REGISTER_USER)
->withNew(self::AGGREGATE_USER)
->identifiedBy(self::IDENTIFIER)
->handle(function(Message $registerUser) {
yield [self::EVENT_USER_WAS_REGISTERED, $registerUser->payload()];
})
->recordThat(self::EVENT_USER_WAS_REGISTERED)
->apply(function (Message $userWasRegistered) {
$userState = $userWasRegistered->payload();
return $userState;
});
}
private static function describeChangeUsername(EventMachine $eventMachine): void
{
$eventMachine->process(self::COMMAND_CHANGE_USERNAME)
->withExisting(self::AGGREGATE_USER)
->handle(function (array $userState, Message $changeUsername) {
yield [self::EVENT_USERNAME_WAS_CHANGED, [
'userId' => $userState[self::IDENTIFIER],
'oldUsername' => $userState['username'],
'newUsername' => $changeUsername->payload()['newUsername']
]];
})
->recordThat(self::EVENT_USERNAME_WAS_CHANGED)
->apply(function(array $userState, Message $usernameWasChanged) {
$userState['username'] = $usernameWasChanged->payload()['newUsername'];
return $userState;
});
//Forward UserWasChanged event to UI using rabbitmq + stomp over websocket
$eventMachine->on(self::EVENT_USERNAME_WAS_CHANGED, 'uiExchange');
}
private static function describeProjectUser(EventMachine $eventMachine): void
{
$eventMachine->watch(Stream::ofWriteModel())
->withAggregateProjection(self::AGGREGATE_USER, self::$userSchema);
}
private static function describeQueryUser(EventMachine $eventMachine): void
{
$eventMachine->registerQuery(self::QUERY_USER, JsonSchema::object([
'userId' => JsonSchema::string()
]))->resolveWith(function (Message $getUser, Deferred $deferred) use ($eventMachine) {
try {
$deferred->resolve(
$eventMachine->loadAggregateState(self::AGGREGATE_USER, $getUser->payload()['userId'])
);
} catch (AggregateNotFound $error) {
$deferred->reject($error);
}
})->returnType(JsonSchema::typeRef(self::AGGREGATE_USER));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.