Skip to content

Instantly share code, notes, and snippets.

@dastanaron
Created February 20, 2019 15:26
Show Gist options
  • Save dastanaron/d541c2dbb0c272e5fcb3cb7d5d94f746 to your computer and use it in GitHub Desktop.
Save dastanaron/d541c2dbb0c272e5fcb3cb7d5d94f746 to your computer and use it in GitHub Desktop.
<?php
namespace Adapters;
use Vendor\PhpAmqpLib\Connection\AMQPStreamConnection;
use Vendor\PhpAmqpLib\Message\AMQPMessage;
class RabbitAdapter
{
/**
* @var \Vendor\PhpAmqpLib\Channel\AMQPChannel
*/
protected $channel;
/**
* @var AMQPStreamConnection
*/
protected $connection;
/**
* RabbitAdapter constructor.
* @throws \Fabrikant\Exception
*/
public function __construct()
{
$this->connection = $this->getConnection();
$this->channel = $this->connection->channel();
}
/**
* @return RabbitAdapter
* @throws \Fabrikant\Exception
*/
public static function get()
{
return new self();
}
/**
* @return \Vendor\PhpAmqpLib\Channel\AMQPChannel
*/
public function getChannel()
{
return $this->channel;
}
/**
* @param $queue
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param bool $autoDelete
* @param bool $nowait
* @param array $arguments
* @param null $ticket
*/
public function declareQueue
(
$queue,
$passive = false,
$durable = true,
$exclusive = false,
$autoDelete = false,
$nowait = false,
$arguments = array(),
$ticket = null
)
{
$this->channel->queue_declare($queue, $passive, $durable, $exclusive, $autoDelete, $nowait, $arguments, $ticket);
}
/**
* @param $exchange
* @param string $type
* @param bool $passive
* @param bool $durable
* @param bool $autoDelete
* @param bool $internal
* @param bool $nowait
* @param array $arguments
* @param null $ticket
*/
public function declareExchange(
$exchange,
$type = 'direct',
$passive = false,
$durable = true,
$autoDelete = false,
$internal = false,
$nowait = false,
$arguments = array(),
$ticket = null
)
{
$this->channel->exchange_declare(
$exchange,
$type,
$passive,
$durable,
$autoDelete,
$internal,
$nowait,
$arguments,
$ticket
);
}
/**
* @param Message $message
* @param string $exchange
* @param string $routing_key
* @param bool $mandatory
* @param bool $immediate
* @param null $ticket
*/
public function pushMessage(AMQPMessage $message, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null)
{
$this->channel->basic_publish($message, $exchange, $routing_key, $mandatory, $immediate, $ticket);
}
/**
* @param $queue
* @param $exchange
* @param null $routingKey
*/
public function queueBind($queue, $exchange, $routingKey = null)
{
!is_null($routingKey) || $routingKey = $queue;
$this->channel->queue_bind(
$queue,
$exchange,
$routingKey
);
}
/**
* Удаляет очередь
* @param string $queue
* @param bool $if_unused
* @param bool $if_empty
* @param bool $nowait
* @param null $ticket
*
* @return mixed|null
*/
public function deleteQueue($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
{
return $this->channel->queue_delete($queue, $if_unused, $if_empty, $nowait, $ticket);
}
/**
* Удаляет обменник
* @param $exchange
* @param bool $if_unused
* @param bool $nowait
* @param null $ticket
*
* @return mixed|null
*/
public function deleteExchange($exchange, $if_unused = false, $nowait = false, $ticket = null)
{
return $this->channel->exchange_delete($exchange, $if_unused, $nowait, $ticket);
}
/**
* @return AMQPStreamConnection
* @throws \Fabrikant\Exception
*/
protected function getConnection()
{
$config = [];//your config
$connection = new AMQPStreamConnection(
$config['host'],
$config['port'],
$config['login'],
$config['password'],
$config['vhost']
);
return $connection;
}
/**
* Destructor
* close connections
*/
public function __destruct()
{
$this->channel->close();
$this->connection->close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment