Skip to content

Instantly share code, notes, and snippets.

@fprochazka
Last active August 29, 2015 14:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fprochazka/8de19f0b8e79a5fd12d0 to your computer and use it in GitHub Desktop.
Save fprochazka/8de19f0b8e79a5fd12d0 to your computer and use it in GitHub Desktop.
<?php
namespace Rohlikcz\RabbitMq;
use Rohlikcz\Identified;
use Rohlikcz\InvalidArgumentException;
use Rohlikcz\InvalidStateException;
use Doctrine\Common\Util\ClassUtils;
use Kdyby;
use Nette;
use Nette\Utils\Json;
use PhpAmqpLib\Message\AMQPMessage;
abstract class BufferedQueue extends Nette\Object implements Kdyby\RabbitMq\IConsumer, Kdyby\Events\Subscriber
{
/**
* @var array
*/
protected $dirtyEntities = [];
/**
* @var Kdyby\RabbitMq\Connection
*/
protected $rabbitmq;
/**
* @var string
*/
protected $exchangeName;
public function __construct($exchangeName, Kdyby\RabbitMq\Connection $rabbitmq)
{
$this->rabbitmq = $rabbitmq;
$this->exchangeName = $exchangeName;
}
public function getSubscribedEvents()
{
return ['Nette\Application\Application::onShutdown' => 'flushSync'];
}
protected function markDirty(Identified $entity)
{
if (empty($entity->getId())) {
throw new InvalidArgumentException("Entity has no identifier");
}
$class = ClassUtils::getRealClass(get_class($entity));
$this->dirtyEntities[$class][$entity->getId()] = TRUE;
}
protected function prepare($type, $id)
{
return Json::encode(['type' => $type, 'id' => $id]);
}
public function flushSync()
{
if ($this->exchangeName === NULL) {
throw new InvalidStateException('Exchange name is not defined');
}
if (!array_filter($this->dirtyEntities)) {
return;
}
$producer = $this->rabbitmq->getProducer($this->exchangeName);
foreach ($this->dirtyEntities as $type => $entities) {
foreach ($entities as $id => $_) {
$producer->publish($this->prepare($type, $id));
}
}
$this->dirtyEntities = [];
}
abstract public function process(AMQPMessage $message);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment