Created
March 30, 2016 11:06
-
-
Save alex-litvak/2a8c5831ba96a7db6293df1c3153621a to your computer and use it in GitHub Desktop.
Laravel 4.2 + IronMQ V3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php namespace Illuminate\Queue\Connectors; | |
use IronMQ\IronMQ; | |
use Illuminate\Http\Request; | |
use Illuminate\Queue\IronQueue; | |
use Illuminate\Encryption\Encrypter; | |
class IronConnector implements ConnectorInterface { | |
/** | |
* The encrypter instance. | |
* | |
* @var \Illuminate\Encryption\Encrypter | |
*/ | |
protected $crypt; | |
/** | |
* The current request instance. | |
* | |
* @var \Illuminate\Http\Request | |
*/ | |
protected $request; | |
/** | |
* Create a new Iron connector instance. | |
* | |
* @param \Illuminate\Encryption\Encrypter $crypt | |
* @param \Illuminate\Http\Request $request | |
* @return void | |
*/ | |
public function __construct(Encrypter $crypt, Request $request) | |
{ | |
$this->crypt = $crypt; | |
$this->request = $request; | |
} | |
/** | |
* Establish a queue connection. | |
* | |
* @param array $config | |
* @return \Illuminate\Queue\QueueInterface | |
*/ | |
public function connect(array $config) | |
{ | |
$ironConfig = array('token' => $config['token'], 'project_id' => $config['project']); | |
if (isset($config['host'])) $ironConfig['host'] = $config['host']; | |
$iron = new IronMQ($ironConfig); | |
if (isset($config['ssl_verifypeer'])) | |
{ | |
$iron->ssl_verifypeer = $config['ssl_verifypeer']; | |
} | |
return new IronQueue($iron, $this->request, $config['queue'], $config['encrypt']); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php namespace Illuminate\Queue\Jobs; | |
use Illuminate\Queue\IronQueue; | |
use Illuminate\Container\Container; | |
class IronJob extends Job | |
{ | |
/** | |
* The Iron queue instance. | |
* | |
* @var \Illuminate\Queue\IronQueue | |
*/ | |
protected $iron; | |
/** | |
* The IronMQ message instance. | |
* | |
* @var object | |
*/ | |
protected $job; | |
/** | |
* Indicates if the message was a push message. | |
* | |
* @var bool | |
*/ | |
protected $pushed = false; | |
/** | |
* Create a new job instance. | |
* | |
* @param \Illuminate\Container\Container $container | |
* @param \Illuminate\Queue\IronQueue $iron | |
* @param object $job | |
* @param bool $pushed | |
* | |
* @return void | |
*/ | |
public function __construct(Container $container, | |
IronQueue $iron, | |
$job, | |
$pushed = false) | |
{ | |
$this->job = $job; | |
$this->iron = $iron; | |
$this->pushed = $pushed; | |
$this->container = $container; | |
} | |
/** | |
* Fire the job. | |
* | |
* @return void | |
*/ | |
public function fire() | |
{ | |
$this->resolveAndFire(json_decode($this->getRawBody(), true)); | |
} | |
/** | |
* Get the raw body string for the job. | |
* | |
* @return string | |
*/ | |
public function getRawBody() | |
{ | |
return $this->job->body; | |
} | |
/** | |
* Delete the job from the queue. | |
* | |
* @return void | |
*/ | |
public function delete() | |
{ | |
parent::delete(); | |
if (isset($this->job->pushed)) { | |
return; | |
} | |
$this->iron->deleteMessage($this->getQueue(), $this->job->id, $this->job->reservation_id); | |
} | |
/** | |
* Release the job back into the queue. | |
* | |
* @param int $delay | |
* | |
* @return void | |
*/ | |
public function release($delay = 0) | |
{ | |
parent::release($delay); | |
if (!$this->pushed) { | |
$this->delete(); | |
} | |
$this->recreateJob($delay); | |
} | |
/** | |
* Release a pushed job back onto the queue. | |
* | |
* @param int $delay | |
* | |
* @return void | |
*/ | |
protected function recreateJob($delay) | |
{ | |
$payload = json_decode($this->job->body, true); | |
Arr::set($payload, 'attempts', Arr::get($payload, 'attempts', 1) + 1); | |
$this->iron->recreate(json_encode($payload), $this->getQueue(), $delay); | |
} | |
/** | |
* Get the number of times the job has been attempted. | |
* | |
* @return int | |
*/ | |
public function attempts() | |
{ | |
return Arr::get(json_decode($this->job->body, true), 'attempts', 1); | |
} | |
/** | |
* Get the job identifier. | |
* | |
* @return string | |
*/ | |
public function getJobId() | |
{ | |
return $this->job->id; | |
} | |
/** | |
* Get the IoC container instance. | |
* | |
* @return \Illuminate\Container\Container | |
*/ | |
public function getContainer() | |
{ | |
return $this->container; | |
} | |
/** | |
* Get the underlying Iron queue instance. | |
* | |
* @return \Illuminate\Queue\IronQueue | |
*/ | |
public function getIron() | |
{ | |
return $this->iron; | |
} | |
/** | |
* Get the underlying IronMQ job. | |
* | |
* @return array | |
*/ | |
public function getIronJob() | |
{ | |
return $this->job; | |
} | |
/** | |
* Get the name of the queue the job belongs to. | |
* | |
* @return string | |
*/ | |
public function getQueue() | |
{ | |
return Arr::get(json_decode($this->job->body, true), 'queue'); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php namespace Illuminate\Queue; | |
use IronMQ\IronMQ; | |
use Illuminate\Http\Request; | |
use Illuminate\Http\Response; | |
use Illuminate\Queue\Jobs\IronJob; | |
class IronQueue extends Queue implements QueueInterface | |
{ | |
/** | |
* The IronMQ instance. | |
* | |
* @var \IronMQ\IronMQ | |
*/ | |
protected $iron; | |
/** | |
* The current request instance. | |
* | |
* @var \Illuminate\Http\Request | |
*/ | |
protected $request; | |
/** | |
* The name of the default tube. | |
* | |
* @var string | |
*/ | |
protected $default; | |
/** | |
* Indicates if the messages should be encrypted. | |
* | |
* @var bool | |
*/ | |
protected $shouldEncrypt; | |
/** | |
* Number of seconds before the reservation_id times out on a newly popped message. | |
* | |
* @var int | |
*/ | |
protected $timeout; | |
/** | |
* Create a new IronMQ queue instance. | |
* | |
* @param \IronMQ\IronMQ $iron | |
* @param \Illuminate\Http\Request $request | |
* @param string $default | |
* @param bool $shouldEncrypt | |
* @param int $timeout | |
*/ | |
public function __construct(IronMQ $iron, Request $request, $default, $shouldEncrypt = false, $timeout = 60) | |
{ | |
$this->iron = $iron; | |
$this->request = $request; | |
$this->default = $default; | |
$this->shouldEncrypt = $shouldEncrypt; | |
$this->timeout = $timeout; | |
} | |
/** | |
* Push a new job onto the queue. | |
* | |
* @param string $job | |
* @param mixed $data | |
* @param string $queue | |
* | |
* @return mixed | |
*/ | |
public function push($job, $data = '', $queue = null) | |
{ | |
return $this->pushRaw($this->createPayload($job, $data, $queue), $queue); | |
} | |
/** | |
* Push a raw payload onto the queue. | |
* | |
* @param string $payload | |
* @param string $queue | |
* @param array $options | |
* | |
* @return mixed | |
*/ | |
public function pushRaw($payload, $queue = null, array $options = []) | |
{ | |
if ($this->shouldEncrypt) { | |
$payload = $this->crypt->encrypt($payload); | |
} | |
return $this->iron->postMessage($this->getQueue($queue), $payload, $options)->id; | |
} | |
/** | |
* Push a raw payload onto the queue after encrypting the payload. | |
* | |
* @param string $payload | |
* @param string $queue | |
* @param int $delay | |
* | |
* @return mixed | |
*/ | |
public function recreate($payload, $queue, $delay) | |
{ | |
$options = ['delay' => $this->getSeconds($delay)]; | |
return $this->pushRaw($payload, $queue, $options); | |
} | |
/** | |
* Push a new job onto the queue after a delay. | |
* | |
* @param \DateTime|int $delay | |
* @param string $job | |
* @param mixed $data | |
* @param string $queue | |
* | |
* @return mixed | |
*/ | |
public function later($delay, $job, $data = '', $queue = null) | |
{ | |
$delay = $this->getSeconds($delay); | |
$payload = $this->createPayload($job, $data, $queue); | |
return $this->pushRaw($payload, $queue, compact('delay')); | |
} | |
/** | |
* Pop the next job off of the queue. | |
* | |
* @param string $queue | |
* | |
* @return \Illuminate\Queue\Jobs|null | |
*/ | |
public function pop($queue = null) | |
{ | |
$queue = $this->getQueue($queue); | |
$job = $this->iron->reserveMessage($queue, $this->timeout); | |
// If we were able to pop a message off of the queue, we will need to decrypt | |
// the message body, as all Iron.io messages are encrypted, since the push | |
// queues will be a security hazard to unsuspecting developers using it. | |
if (!is_null($job)) { | |
$job->body = $this->parseJobBody($job->body); | |
return new IronJob($this->container, $this, $job); | |
} | |
} | |
/** | |
* Delete a message from the Iron queue. | |
* | |
* @param string $queue | |
* @param string $id | |
* @param string $reservation_id | |
* | |
* @return void | |
*/ | |
public function deleteMessage($queue, $id, $reservation_id) | |
{ | |
$this->iron->deleteMessage($queue, $id, $reservation_id); | |
} | |
/** | |
* Marshal a push queue request and fire the job. | |
* | |
* @return \Illuminate\Http\Response | |
* | |
* @deprecated since version 5.1. | |
*/ | |
public function marshal() | |
{ | |
$this->createPushedIronJob($this->marshalPushedJob())->fire(); | |
return new Response('OK'); | |
} | |
/** | |
* Marshal out the pushed job and payload. | |
* | |
* @return object | |
*/ | |
protected function marshalPushedJob() | |
{ | |
$r = $this->request; | |
$body = $this->parseJobBody($r->getContent()); | |
return (object) [ | |
'id' => $r->header('iron-message-id'), 'body' => $body, 'pushed' => true, | |
]; | |
} | |
/** | |
* Create a new IronJob for a pushed job. | |
* | |
* @param object $job | |
* | |
* @return \Illuminate\Queue\Jobs\IronJob | |
*/ | |
protected function createPushedIronJob($job) | |
{ | |
return new IronJob($this->container, $this, $job, true); | |
} | |
/** | |
* Create a payload string from the given job and data. | |
* | |
* @param string $job | |
* @param mixed $data | |
* @param string $queue | |
* | |
* @return string | |
*/ | |
protected function createPayload($job, $data = '', $queue = null) | |
{ | |
$payload = $this->setMeta(parent::createPayload($job, $data), 'attempts', 1); | |
return $this->setMeta($payload, 'queue', $this->getQueue($queue)); | |
} | |
/** | |
* Parse the job body for firing. | |
* | |
* @param string $body | |
* | |
* @return string | |
*/ | |
protected function parseJobBody($body) | |
{ | |
return $this->shouldEncrypt ? $this->crypt->decrypt($body) : $body; | |
} | |
/** | |
* Get the queue or return the default. | |
* | |
* @param string|null $queue | |
* | |
* @return string | |
*/ | |
public function getQueue($queue) | |
{ | |
return $queue ?: $this->default; | |
} | |
/** | |
* Get the underlying IronMQ instance. | |
* | |
* @return \IronMQ\IronMQ | |
*/ | |
public function getIron() | |
{ | |
return $this->iron; | |
} | |
/** | |
* Get the request instance. | |
* | |
* @return \Illuminate\Http\Request | |
*/ | |
public function getRequest() | |
{ | |
return $this->request; | |
} | |
/** | |
* Set the request instance. | |
* | |
* @param \Illuminate\Http\Request $request | |
* | |
* @return void | |
*/ | |
public function setRequest(Request $request) | |
{ | |
$this->request = $request; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment