Skip to content

Instantly share code, notes, and snippets.

@fprochazka
Last active January 18, 2016 14:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save fprochazka/3bbf174bd7ae1af7518f to your computer and use it in GitHub Desktop.
Save fprochazka/3bbf174bd7ae1af7518f to your computer and use it in GitHub Desktop.
<?php
use Kdyby;
use Kdyby\RabbitMq\Connection;
use Kdyby\RabbitMq\DI\RabbitMqExtension;
use Nette;
use Nette\Reflection\ClassType;
use PhpAmqpLib\Message\AMQPMessage;
use Tester;
class RabbitProducerMock extends Kdyby\RabbitMq\Producer
{
/**
* @var array
*/
public $messages = [];
/**
* @var Nette\DI\Container
*/
private $serviceLocator;
public function __construct(Connection $conn, $consumerTag = NULL, Nette\DI\Container $serviceLocator = NULL)
{
parent::__construct($conn, $consumerTag);
$this->serviceLocator = $serviceLocator;
}
public function publish($msgBody, $routingKey = '', $additionalProperties = [])
{
$this->messages[$routingKey][] = $msgBody;
}
/**
* @return array
*/
public function processMessages()
{
$results = [];
while (!empty($this->messages)) {
$results[] = $this->processMessage();
}
return $results;
}
/**
* @return bool|mixed
*/
public function processMessage()
{
if (!$next = $this->nextMessage()) {
return FALSE;
}
list($routingKey, $msgBody) = $next;
if (!$consumer = $this->findConsumer($routingKey)) {
throw new InvalidStateException(sprintf('No consumer for exchange "%s" with routing key "%s" found.', $this->exchangeOptions['name'], $routingKey));
}
$callbackRefl = new Nette\Reflection\Property($consumer, 'callback');
$callbackRefl->setAccessible(TRUE);
$callback = $callbackRefl->getValue($consumer);
return call_user_func($callback, new AMQPMessage($msgBody, []));
}
/**
* @param string $routingKey
* @return Kdyby\RabbitMq\Consumer
*/
private function findConsumer($routingKey)
{
foreach ($this->serviceLocator->findByTag(RabbitMqExtension::TAG_CONSUMER) as $consumerService => $_) {
/** @var Kdyby\RabbitMq\Consumer $consumer */
$consumer = $this->serviceLocator->getService($consumerService);
if ($consumer instanceof Kdyby\RabbitMq\MultipleConsumer) {
continue; // todo: not yet implemented
}
if ($consumer->exchangeOptions['name'] !== $this->exchangeOptions['name']) {
continue; // nope
}
if (empty($routingKey)) {
return $consumer;
}
continue; // todo: not yet implemented
}
return NULL;
}
private function nextMessage()
{
foreach ($this->messages as $routingKey => $messages) {
foreach ($messages as $i => $message) {
unset($this->messages[$routingKey][$i]);
if (empty($this->messages[$routingKey])) {
unset($this->messages[$routingKey]);
}
return [$routingKey, $message];
}
}
return NULL;
}
/**
* @param string $msgBody
* @return mixed
*/
private function unserializeMessage($msgBody)
{
return preg_match('~^(O|C)\\:\\d+\\:.+\\{.+\\}\\z~is', $msgBody) ? unserialize($msgBody) : $msgBody;
}
}
rabbitmq:
producers:
email:
class: RabbitProducerMock
sms:
class: RabbitProducerMock
searchSynch:
class: RabbitProducerMock
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment