Skip to content

Instantly share code, notes, and snippets.

@muteor
Created September 2, 2010 19:26
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save muteor/562799 to your computer and use it in GitHub Desktop.
Save muteor/562799 to your computer and use it in GitHub Desktop.
<?php
error_reporting(E_ALL | E_STRICT);
/**
* To runs this example you will need the UUID PHP Extension http://pecl.php.net/package/uuid
*/
/**
* DomainModel Aggregate root for a Tweet
*
* The Aggregate root tracks all uncommitted events and can have old
* events replayed to it.
*/
class Domain_Tweet
{
protected $_id;
protected $_version = -1;
protected $_message;
protected $_who;
protected $_when;
protected $_uncommittedEvents = array();
public function __construct($message, $who)
{
$event = new Event_NewTweetPostedEvent(
uuid_create(UUID_TYPE_RANDOM),
$message,
$who,
time()
);
$this->_applyEvent($event);
}
protected function _OnNewTweetPosted(Event_NewTweetPostedEvent $e)
{
$this->_id = $e->tweetId;
$this->_message = $e->message;
$this->_who = $e->who;
$this->_when = $e->when;
}
protected function _applyEvent($event, $fromHistory = false)
{
// would find event handler, we have hard-coded here
$this->_OnNewTweetPosted($event);
$this->_version++;
if (false === $fromHistory) {
$this->_uncommittedEvents[] = $event;
}
}
public function initializeFromHistory($events)
{
foreach ($events as $event) {
$this->_applyEvent($event, true);
}
}
public function getUncommittedEvents()
{
return $this->_uncommittedEvents;
}
public function getId()
{
return $this->_id;
}
}
/**
* DomainEvent a new tweet post, this will be stored in the
* event store and replayed to the aggregate root when it is
* called back into memory
*/
class Event_NewTweetPostedEvent
{
public $id;
public $eventTime;
public $tweetId;
public $message;
public $who;
public $when;
public function __construct($tweetId, $message, $who, $when)
{
$this->id = uuid_create(UUID_TYPE_RANDOM);
$this->eventTime = time();
$this->tweetId = $tweetId;
$this->message = $message;
$this->who = $who;
$this->when = $when;
}
}
/**
* Command to post a new tweet, holds all the required data
* for the command. This would be sent from the client to
* the command service.
*/
class Command_PostNewTweetCommand
{
public $message;
public $who;
public function __construct($message, $who)
{
$this->message = $message;
$this->who = $who;
}
}
/**
* Matching Executor for the Command, this actually changes the state
* of our domain. Executors are called from the command service.
*/
class Command_PostNewTweetExecutor
{
protected $_cmdService;
public function __construct(CommandService $commandService)
{
$this->_cmdService = $commandService;
}
public function execute($command)
{
$tweet = new Domain_Tweet(
$command->message,
$command->who
);
$this->_cmdService->getRepository()->save($tweet);
}
}
/**
* Classic Domain Repository, handles the persisting of state and recall
* thereafter. In CQRS we dont end up with a bloated repository though!
*/
class Repository
{
protected $_eventStore;
protected $_eventBus;
public function __construct(EventStore $store, EventBus $bus)
{
$this->_eventStore = $store;
$this->_eventBus = $bus;
}
public function getById($id, $type)
{
// trick php to not call constructor
$aggregate = unserialize(
sprintf('O:%d:"%s":0:{}', strlen($type), $type)
);
$events = $this->_eventStore->getHistoryForAggregateId($id);
$aggregate->initializeFromHistory($events);
return $aggregate;
}
public function save($aggregate)
{
$eventsToSave = $aggregate->getUncommittedEvents();
foreach ($eventsToSave as $event) {
$this->_eventStore->store($aggregate->getId(), $event);
$this->_eventBus->enqueue($aggregate->getId(), $event);
}
$this->_eventBus->publish();
}
}
/**
* An In-Memory store of events that have been applied to the Domain.
*/
class EventStore
{
protected $_events;
public function __construct()
{
$this->_events = array();
}
public function store($aggregateIdent, $event)
{
if (!isset($this->_events[$aggregateIdent])) {
$this->_events[$aggregateIdent] = array();
}
$this->_events[$aggregateIdent][] = $event;
}
public function getHistoryForAggregateId($id)
{
if (!isset($this->_events[$id])) {
return null;
}
return $this->_events[$id];
}
}
/**
* The event bus sends events over to the read model for processing.
* This will then create our de-normalized storage.
*/
class EventBus
{
protected $_db;
protected $_events = array();
public function __construct($db)
{
$this->_db = $db;
}
public function enqueue($aggregateIdent, $event)
{
if (!isset($this->_events[$aggregateIdent])) {
$this->_events[$aggregateIdent] = array();
}
$this->_events[$aggregateIdent] = $event;
}
public function publish()
{
// find the denormalizer, hardcoded here...
$denormalizer = new Read_PostNewTweetDenormalizer($this->_db);
// write events to the readModel, this would be
// where we did transactions etc...
foreach ($this->_events as $ident => $event) {
$denormalizer->write($ident, $event);
}
// clear the processed events
$this->_events = array();
}
}
/**
* The denormalizer for the PostNewTweet event, this will take the event
* and translate its data into a de-normalized form.
*/
class Read_PostNewTweetDenormalizer
{
protected $_db;
public function __construct($db)
{
$this->_db = $db;
}
public function write($aggregateIdent, $event)
{
$this->_db->exec("INSERT INTO tweets
VALUES('$aggregateIdent', '$event->message', '$event->who', '$event->when')");
}
}
/**
* Command Service, directs the incoming commands to the correct
* Command Executor.
*/
class CommandService
{
protected $_repository;
public function __construct(Repository $repository)
{
$this->_repository = $repository;
}
public function execute($command)
{
// find the command executor, again we hardcode here...
$executor = new Command_PostNewTweetExecutor($this);
$executor->execute($command);
}
public function getRepository()
{
return $this->_repository;
}
}
// in-memory read db
$db = new PDO('sqlite::memory:');
$db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
// create the readModel table, stores the denormalized data
$db->exec('CREATE TABLE tweets
(
`id` CHAR(36),
`message` VARCHAR(140),
`who` VARCHAR(128),
`when` TIMESTAMP,
PRIMARY KEY(`id`)
)');
// setup all the components
$eventBus = new EventBus($db);
$eventStore = new EventStore();
$repository = new Repository($eventStore, $eventBus);
$commandService = new CommandService($repository);
// adding a new tweet
$tweetCommand = new Command_PostNewTweetCommand('Another really important tweet', 'Keith Pope');
$commandService->execute($tweetCommand);
// reading tweets
$stmt = $db->query('SELECT * FROM tweets');
$result = $stmt->fetchAll(PDO::FETCH_ASSOC);
title('Tweets from ReadModel');
foreach ($result as $tweet) {
$id = $tweet['id'];
echo $tweet['message'] . '(' . $tweet['who'] . ')' . ' - ' . $tweet['when'] . "\n";
}
// whats in the event store?
title('Event Store');
print_r($eventStore);
// recalling aggregates, this would never happen here but we haven't got an update event :)
title('Our Tweet');
$recalled = $repository->getById($id, 'Domain_Tweet');
print_r($recalled);
// Ignore this....
function title($title) {
echo str_repeat('-', 70);
echo "\n$title\n";
echo str_repeat('-', 70) . "\n";
}
@muteor
Copy link
Author

muteor commented Sep 2, 2010

Example Output:
----------------------------------------------------------------------
Tweets from ReadModel
----------------------------------------------------------------------
Another really important tweet(Keith Pope) - 1283456671
----------------------------------------------------------------------
Event Store
----------------------------------------------------------------------
EventStore Object
(
[_events:protected] => Array
(
[E2069990-7EF8-4E0C-BCD8-3583D75BFD6C] => Array
(
[0] => Event_NewTweetPostedEvent Object
(
[id] => E4DA269B-8E73-41C3-96E9-186472F495CE
[eventTime] => 1283456671
[tweetId] => E2069990-7EF8-4E0C-BCD8-3583D75BFD6C
[message] => Another really important tweet
[who] => Keith Pope
[when] => 1283456671
)

                )

        )

)
----------------------------------------------------------------------
Our Tweet
----------------------------------------------------------------------
Domain_Tweet Object
(
    [_id:protected] => E2069990-7EF8-4E0C-BCD8-3583D75BFD6C
    [_version:protected] => 0
    [_message:protected] => Another really important tweet
    [_who:protected] => Keith Pope
    [_when:protected] => 1283456671
    [_uncommittedEvents:protected] => Array
        (
        )

)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment