Skip to content

Instantly share code, notes, and snippets.

@ch3ric
Last active August 29, 2015 13:56
Show Gist options
  • Save ch3ric/9220611 to your computer and use it in GitHub Desktop.
Save ch3ric/9220611 to your computer and use it in GitHub Desktop.
Publish RabbitMQ message from symfony 1 and Consume in Symfony 2
<?php
// apps/frontend/modules/export/actions/actions.class.php
class exportActions extends sfActions
{
public function executeExport(sfWebRequest $request)
{
$mail = $request->getParameter('mail');
$payments = Doctrine::getTable('Payment')->getAwaitingExportPayments();
foreach ($payments as $payment) {
if ($payment->isSepa()) {
try {
$msg = array(
'paymentId' => $payment->getId(),
'action' => 'export.payment.sepa',
'email' => $mail,
);
$producer = sfRabbit::getProducer('acme');
$producer->publish(serialize($msg));
sfContext::getInstance()->getLogger()->info('Message sent to RabbitMQ: ' . print_r($msg, true));
} catch (Exception $exc) {
sfContext::getInstance()->getLogger()->err('Cannot send exports to RabbitMQ: ' . $exc->getMessage());
}
continue;
}
// legacy
// do export in txt and send email
}
}
}
# apps/frontend/config/app.yml
all:
sfRabbitPlugin:
debug: false
connections:
default:
host: localhost
post: 5672
user: guest
password: guest
vhost: /
producers:
acme:
connection: default
exchange_options: {name: direct, type: direct}
# app/config/config.yml
old_sound_rabbit_mq:
connections:
default:
host: 'localhost'
port: 5672
user: guest
password: guest
vhost: '/'
consumers:
direct:
connection: default
exchange_options: {name: 'direct', type: direct}
queue_options: {name: 'direct'}
callback: acme.message.consumer
<?php
// src/Acme/ExportBundle/Services/Message/MessageConsumer.php
namespace Acme\ExportBundle\Services\Message;
use Monolog\Logger;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\EventDispatcher\GenericEvent;
class MessageConsumer implements ConsumerInterface
{
protected $dispatcher;
protected $logger;
public function __construct(EventDispatcherInterface $dispatcher, Logger $logger)
{
$this->dispatcher = $dispatcher;
$this->logger = $logger;
}
public function execute(AMQPMessage $msg)
{
$body = unserialize($msg->body);
try {
if (!isset($body['action'])) {
throw new \InvalidArgumentException("This message should have an 'action' property");
}
$this->dispatcher->dispatch($body['action'], new GenericEvent($body));
} catch (\Exception $e) {
$this->logger->crit(sprintf(
'Exception "%s" while consuming the following message: %s.',
$e->getMessage(),
$msg
));
}
}
}
<?php
// src/Acme/ExportBundle/Services/Message/PaymentMessageListener.php
namespace Acme\ExportBundle\Services\Message;
use Doctrine\Bundle\DoctrineBundle\Registry;
use Symfony\Component\EventDispatcher\GenericEvent;
use Acme\ExportBundle\Entity\PaymentManager;
class PaymentMessageListener
{
protected $mailer;
protected $doctrine;
protected $paymentManager;
public function __construct(\Swift_Mailer $mailer, Registry $doctrine, PaymentManager $paymentManager)
{
$this->mailer = $mailer;
$this->doctrine = $doctrine;
$this->paymentManager = $paymentManager;
}
public function onExportSepa(GenericEvent $event)
{
$body = $event->getSubject();
$em = $this->doctrine->getEntityManager();
$payment = $em->getRepository('AcmeExportBundle:Payment')->find($body['paymentId']);
$export = $this->paymentManager->export($payment);
$em->persist($export);
$em->flush();
$message = $this->paymentManager->generateMailMessage($export);
$this->mailer->send($message);
}
}
#!/bin/bash
# bin/rabbitmq.sh
su www-data -c "php app/console rabbitmq:consume --env=prod direct >> app/logs/rabbitmq.log 2>&1 &"
<?xml version="1.0" ?>
<!-- src/Acme/ExportBundle/Resources/config/services.xml -->
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd">
<services>
<service id="acme.message.consumer" class="Acme\ExportBundle\Services\Message\MessageConsumer">
<argument type="service" id="event_dispatcher" />
<argument type="service" id="logger" />
<argument type="service" id="old_sound_rabbit_mq.error_producer" />
</service>
<service id="acme.payment.message.listener" class="Acme\ExportBundle\Services\Message\PaymentMessageListener">
<argument type="service" id="mailer" />
<argument type="service" id="doctrine" />
<argument type="service" id="acme.payment.manager" />
<tag name="kernel.event_listener" event="export.payment.sepa" method="onExportSepa" />
</service>
</services>
</container>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment