Skip to content

Instantly share code, notes, and snippets.

@oqq
Created February 20, 2017 08:51
Show Gist options
  • Save oqq/6ef0726bd3cd32c7a3f9854e9643cabc to your computer and use it in GitHub Desktop.
Save oqq/6ef0726bd3cd32c7a3f9854e9643cabc to your computer and use it in GitHub Desktop.
Example of an abstract ElasticSearchReadModel
<?php declare(strict_types = 1);
namespace Acme\Projection\Document;
use Acme\Domain\Document\Event\DocumentWasCreated;
use Acme\Domain\Document\Event\DocumentContentWasUpdated;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Projection\ReadModel;
final class DocumentProjectionRunner
{
private $projection;
public function __construct(EventStore $eventStore, ReadModel $readModel)
{
$this->projection = $eventStore->createReadModelProjection('acme_documents', $readModel);
$this->projection
->fromStream('document_stream')
->when([
DocumentWasCreated::class => function ($state, DocumentWasCreated $event) {
$this->readModel()->stack('index', $event->documentId()->toString(), [
'content' => $event->content()->toString(),
]);
},
DocumentContentWasUpdated::class => function ($state, DocumentContentWasUpdated $event) {
$this->readModel()->stack('update', $event->documentId()->toString(), [
'content' => $event->newContent()->toString(),
]);
},
])
;
}
public function run(bool $delete = false): void
{
if ($delete) {
$this->projection->delete(true);
}
$this->projection->run(false);
}
}
<?php declare(strict_types = 1);
namespace Acme\Projection;
use Elasticsearch\Client;
use Prooph\EventStore\Projection\ReadModel;
use RuntimeException;
class ElasticSearchReadModel implements ReadModel
{
private $client;
private $index;
private $indexBodyProvider;
private $stack = [];
function __construct(Client $client, string $index, IndexBodyProvider $indexBodyProvider)
{
$this->client = $client;
$this->index = $index;
$this->indexBodyProvider = $indexBodyProvider;
}
public function init(): void
{
$indexBody = $this->indexBodyProvider->getBodyForIndex($this->index);
$response = $this->client->indices()->create([
'index' => $this->index,
'body' => $indexBody,
]);
if (!isset($response['acknowledged']) || true !== $response['acknowledged']) {
throw new RuntimeException(sprintf(
'Could not create index "%s". Response was: %s',
$this->index,
print_r($response, true)
));
}
}
public function isInitialized(): bool
{
return $this->client->indices()->exists([
'index' => $this->index,
]);
}
public function reset(): void
{
// There is no flush method for elastic search, so we have to delete and recreate the index.
$this->delete();
$this->init();
}
public function delete(): void
{
$response = $this->client->indices()->delete([
'index' => $this->index,
]);
if (!isset($response['acknowledged']) || true !== $response['acknowledged']) {
throw new RuntimeException(sprintf(
'Could not delete index "%s". Response was: %s',
$this->index,
print_r($response, true)
));
}
}
public function stack(string $operation, ...$args): void
{
switch ($operation) {
case 'index':
$this->stack[] = ['index' => ['_id' => $args[0]]];
$this->stack[] = $args[1];
break;
case 'update':
$this->stack[] = ['update' => ['_id' => $args[0]]];
$this->stack[] = ['doc' => $args[1]];
break;
default:
throw new \RuntimeException(sprintf('Operation %s is not valid', $operation));
}
}
public function persist(): void
{
// todo: analyse response for errors
$response = $this->client->bulk([
'index' => $this->index,
'type' => 'doc',
'body' => $this->stack
]);
$this->stack = [];
}
}
<?php declare(strict_types = 1);
namespace Acme\Projection;
interface IndexBodyProvider
{
public function getBodyForIndex(string $index): array;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment