Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
RabbitMq DatabaseTransactionProducer
<?php
namespace App\Doctrine\Connection;
use Closure;
use Psr\Log\LoggerInterface;
/**
* @author Vašek Purchart
*/
class Connection extends \Doctrine\DBAL\Connection
{
/** @var \Closure[] */
private $afterCommitCallbacks = [];
/** @var \Psr\Log\LoggerInterface */
private $logger;
public function setLogger(LoggerInterface $logger)
{
if ($this->logger !== null) {
throw new \Exception('Logger has already been initialized');
}
$this->logger = $logger;
}
public function commit()
{
parent::commit();
if (!$this->isTransactionActive()) {
$callbacks = $this->afterCommitCallbacks;
$this->afterCommitCallbacks = [];
foreach ($callbacks as $callback) {
try {
$callback();
} catch (\Throwable $exception) {
$message = sprintf(
'%s: %s (uncaught exception) at %s line %s while running after commit callbacks',
get_class($exception),
$exception->getMessage(),
$exception->getFile(),
$exception->getLine()
);
$this->logger->error($message, [
'exception' => $exception,
]);
}
}
}
}
public function rollBack()
{
parent::rollBack();
if (!$this->isTransactionActive() || $this->isRollbackOnly()) {
$this->afterCommitCallbacks = [];
}
}
public function addAfterCommitCallback(Closure $callback)
{
if ($this->logger === null) {
throw new \Exception('Logger is not set');
}
$this->afterCommitCallbacks[] = $callback;
}
}
<?php
namespace App\RabbitMq\Producer;
use App\Doctrine\Connection\Connection as DbalConnection;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
/**
* @author Vašek Purchart
*/
class DatabaseTransactionProducer implements \OldSound\RabbitMqBundle\RabbitMq\ProducerInterface
{
/** @var \OldSound\RabbitMqBundle\RabbitMq\ProducerInterface */
private $wrappedProducer;
/** @var \App\Doctrine\Connection\Connection */
private $databaseConnection;
public function __construct(
ProducerInterface $wrappedProducer,
DbalConnection $databaseConnection
)
{
$this->wrappedProducer = $wrappedProducer;
$this->databaseConnection = $databaseConnection;
}
/**
* @param string $messageBody
* @param string $routingKey
* @param mixed[] $additionalProperties
*/
public function publish($messageBody, $routingKey = '', $additionalProperties = [])
{
if (!$this->databaseConnection->isTransactionActive()) {
$this->wrappedProducer->publish($messageBody, $routingKey, $additionalProperties);
}
$this->databaseConnection->addAfterCommitCallback(function () use ($messageBody, $routingKey, $additionalProperties) {
$this->wrappedProducer->publish($messageBody, $routingKey, $additionalProperties);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.