Skip to content

Instantly share code, notes, and snippets.

@SammyK
Last active September 23, 2015 12:01
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save SammyK/3957ee7de95d35f395be to your computer and use it in GitHub Desktop.
Save SammyK/3957ee7de95d35f395be to your computer and use it in GitHub Desktop.
AWS SQS Push Queues for Laravel 4.1

Add these files to your app namespace under the App/LaravelExtensions folder & namespace (change App to whatever you app folder is).

Open up your app/config/queue.php and edit the default value to mysqs. Add a new connection to the connections array with your AWS config:

return array(
	'default' => 'mysqs',

	'connections' => array(

		'mysqs' => array(
			'driver' => 'mysqs',
            'key'    => getenv('AWS_ACCESS_KEY_ID'),
            'secret' => getenv('AWS_SECRET_KEY'),
            'queue'  => getenv('SQS_QUEUE'),
			'region' => 'us-east-1',
		),

	),
);

Finally open up your app/config/app.php file and find the providers array. Comment out Illuminate\Queue\QueueServiceProvider and replace with App\LaravelExtensions\MyQueueServiceProvider (replacing App with your namespace):

return array(
    'providers' => array(
        //'Illuminate\Queue\QueueServiceProvider',
        'App\LaravelExtensions\MyQueueServiceProvider',
    ),
);

Then you should be able to use push queues as normal:

// Process incoming push queues
Route::post('queue/receive', function()
{
    Queue::marshal();
});
<?php namespace App\LaravelExtensions;
use Illuminate\Queue\QueueServiceProvider;
class MyQueueServiceProvider extends QueueServiceProvider {
/**
* Register the connectors on the queue manager.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
public function registerConnectors($manager)
{
parent::registerConnectors($manager);
$this->registerMysqsConnector($manager);
}
/**
* Register the modified Amazon SQS queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
public function registerMysqsConnector($manager)
{
$app = $this->app;
$manager->addConnector('mysqs', function() use ($app)
{
return new MysqsConnector($app['request']);
});
}
}
<?php namespace App\LaravelExtensions;
use Aws\Sqs\SqsClient;
use Illuminate\Http\Request;
use Illuminate\Queue\Connectors\ConnectorInterface;
class MysqsConnector implements ConnectorInterface {
/**
* The current request instance.
*
* @var \Illuminate\Http\Request;
*/
protected $request;
/**
* Create a new SQS connector instance.
*
* @param \Illuminate\Http\Request $request
* @return void
*/
public function __construct(Request $request)
{
$this->request = $request;
}
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Queue\QueueInterface
*/
public function connect(array $config)
{
$sqsConfig = array_only($config, array('key', 'secret', 'region', 'default_cache_config'));
$sqs = SqsClient::factory($sqsConfig);
return new MysqsQueue($sqs, $this->request, $config['queue']);
}
}
<?php namespace App\LaravelExtensions;
use Illuminate\Queue\Jobs\SqsJob;
use Aws\Sqs\SqsClient;
use Illuminate\Container\Container;
class MysqsJob extends SqsJob {
/**
* Indicates if the message was a push message.
*
* @var bool
*/
protected $pushed = false;
public function __construct(Container $container,
SqsClient $sqs,
$queue,
array $job,
$pushed = false)
{
parent::__construct($container, $sqs, $queue, $job);
$this->pushed = $pushed;
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
$this->deleted = true;
if (isset($this->job['pushed'])) return;
parent::delete();
}
}
<?php namespace App\LaravelExtensions;
use Illuminate\Queue\SqsQueue;
use Aws\Sqs\SqsClient;
use Illuminate\Http\Request;
use Illuminate\Http\Response;
class MysqsQueue extends SqsQueue {
/**
* The current request instance.
*
* @var \Illuminate\Http\Request
*/
protected $request;
public function __construct(SqsClient $sqs, Request $request, $default)
{
parent::__construct($sqs, $default);
$this->request = $request;
}
/**
* Marshal a push queue request and fire the job.
*
* @return \Illuminate\Http\Response
*/
public function marshal()
{
$this->createPushedSqsJob($this->marshalPushedJob())->fire();
return new Response('OK');
}
/**
* Marshal out the pushed job and payload.
*
* @return array
*/
protected function marshalPushedJob()
{
$r = $this->request;
return array(
'MessageId' => $r->header('X-aws-sqsd-msgid'),
'Body' => $r->getContent(),
'Attributes' => array('ApproximateReceiveCount' => $r->header('X-aws-sqsd-receive-count')),
'pushed' => true,
);
}
/**
* Create a new SqsJob for a pushed job.
*
* @param array $job
* @return \Illuminate\Queue\Jobs\SqsJob
*/
protected function createPushedSqsJob($job)
{
return new MysqsJob($this->container, $this->sqs, $this, $job, true);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment