-
-
Save cweagans/f7ed4fc642289470671c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/aws_sqs.services.yml b/aws_sqs.services.yml | |
index 4040cd8..8b9a833 100644 | |
--- a/aws_sqs.services.yml | |
+++ b/aws_sqs.services.yml | |
@@ -1,3 +1,4 @@ | |
services: | |
queue.awssqs: | |
- class: Drupal\aws_sqs\Queue\QueueAwsSqsFactory | |
\ No newline at end of file | |
+ class: Drupal\aws_sqs\Queue\AwsSqsQueueFactory | |
+ arguments: ['@config.factory', '@logger.factory'] | |
diff --git a/lib/Drupal/aws_sqs/Queue/AwsSqsQueue.php b/lib/Drupal/aws_sqs/Queue/AwsSqsQueue.php | |
deleted file mode 100644 | |
index 3002480..0000000 | |
--- a/lib/Drupal/aws_sqs/Queue/AwsSqsQueue.php | |
+++ /dev/null | |
@@ -1,464 +0,0 @@ | |
-<?php | |
- | |
-/** | |
- * @file | |
- * Definition of AwsSqsQueue. | |
- * Contains \Drupal\aws_sqs\Queue\AwsSqsQueue. | |
- */ | |
- | |
-/** | |
- * Use SQS Client provided by AWS SDK PHP version 2. | |
- * | |
- * More info: | |
- * | |
- * http://aws.amazon.com/php | |
- * https://github.com/aws/aws-sdk-php | |
- * http://docs.aws.amazon.com/aws-sdk-php-2/latest/ | |
- * http://docs.aws.amazon.com/aws-sdk-php-2/guide/latest/service-sqs.html | |
- * | |
- * Responses to HTTP requests made through SqsClient are returned as Guzzle | |
- * objects. More info about Guzzle here: | |
- * | |
- * http://guzzlephp.org/ | |
- */ | |
- | |
-namespace Drupal\aws_sqs\Queue; | |
- | |
-use Aws\Sqs\SqsClient; | |
-use Drupal\Core\Queue\ReliableQueueInterface; | |
- | |
- | |
-/** | |
- * Amazon queue. | |
- */ | |
-class AwsSqsQueue implements ReliableQueueInterface { | |
- | |
- /** | |
- * The name of the queue this instance is working with. | |
- * | |
- * @var string | |
- */ | |
- private $awsKey; // This is the key that gets sent to AWS with your requests. | |
- private $awsSecret; // Your secret. (This one doesn't get sent.) | |
- private $awsRegion; // Location of AWS data center. (See constants below.) | |
- private $claimTimeout; | |
- private $client; // SqsClient provided by AWS as interface to SQS. | |
- private $name; // Queue name. | |
- private $queueUrl; // Uniqueue identifier for queue. | |
- private $waitTimeSeconds; | |
- private $config; | |
- | |
- // Constants for AWS regions. | |
- const REGION_US_EAST_1 = 'us-east-1'; | |
- const REGION_US_WEST_1 = 'us-west-1'; | |
- const REGION_US_WEST_2 = 'us-west-2'; | |
- const REGION_EU_WEST_1 = 'eu-west-1'; | |
- const REGION_AP_SOUTHEAST_1 = 'ap-southeast-1'; | |
- const REGION_AP_NORTHEAST_1 = 'ap-northeast-1'; | |
- const REGION_SA_EAST_1 = 'sa-east-1'; | |
- | |
- /** | |
- * Initialize the Queue Class | |
- * | |
- * @param string $name | |
- * Name of the queue, will also be this name in Amazon. You will be able to | |
- * see it in the AWS console here: | |
- * https://console.aws.amazon.com/sqs | |
- * | |
- * @param string $region | |
- * Region where you want to create the Queue | |
- * | |
- * @throws Exception | |
- */ | |
- public function __construct($name) { | |
- | |
- $this->config = \Drupal::config('aws_sqs.settings'); | |
- | |
- // Add it again in case it did not load in time. | |
- composer_manager_register_autoloader(); | |
- | |
- // Set up the object. | |
- $this->setName($name); | |
- $this->setAwsKey(); | |
- $this->setAwsSecret(); | |
- $this->setAwsRegion(); | |
- $this->setClient(); | |
- | |
- // Check if keys are available. | |
- if (!$this->getAwsKey() || !$this->getAwsSecret()) { | |
- throw new \Exception("AWS Credentials not found"); | |
- } | |
- } | |
- | |
- /** | |
- * Returns the queue object for a given name. | |
- * | |
- * @return object | |
- */ | |
- static public function get($name) { | |
- return new AwsSqsQueue($name); | |
- } | |
- | |
- /** | |
- * Get the queue's options; | |
- * | |
- * @param type $name | |
- * @return type | |
- * | |
- * @todo What do these two settings do? Can we get rid of them or move them | |
- * to getters/setters? | |
- */ | |
- static private function getOptions($name) { | |
- | |
- $options = $this->config->get('aws_sqs_' . $name, array()); | |
- $defaults = $this->config->get('aws_sqs_default_queue', array()); | |
- $options += $defaults; | |
- | |
- return $options; | |
- } | |
- | |
- /** | |
- * Send an item to the AWS Queue. | |
- * | |
- * Careful, you can only store data up to 64kb. | |
- * @todo Add link to documentation here. I think this info is out of date. | |
- * I believe now you can store more. But you get charged as if it's an additional | |
- * request. | |
- * | |
- * Invokes SqsClient::sendMessage(). | |
- * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_sendMessage | |
- * | |
- * @param $data | |
- * Can be of any type, mostly array or object. Will be stored | |
- * serialized in the queue sytem. If an item retreived from the queue is | |
- * being re-submitted to the queue (if is_object($item) && $item->data && | |
- * item->item_id), only $item->data will be stored. | |
- * | |
- * @return bool | |
- */ | |
- public function createItem($data) { | |
- | |
- // Encapsulate our data | |
- $serialized_data = $this->serialize($data); | |
- | |
- // Check to see if someone is trying to save an item originally retrieved | |
- // from the queue. If so, this really should have been submitted as | |
- // $item->data, not $item. Reformat this so we don't save metadata or | |
- // confuse item_ids downstream. | |
- if (is_object($data) && property_exists($data, 'data') && property_exists($data, 'item_id')) { | |
- $text = t('Do not re-queue whole items retrieved from the SQS queue. This included metadata, like the item_id. Pass $item->data to createItem() as a parameter, rather than passing the entire $item. $item->data is being saved. The rest is being ignored.'); | |
- $data = $data->data; | |
- watchdog('aws_sqs', $text, array(), WATCHDOG_ERROR); | |
- } | |
- | |
- // @todo Add a check here for message size? Log it? | |
- | |
- // Create a new message object | |
- $result = $this->client->sendMessage(array( | |
- 'QueueUrl' => $this->queueUrl, | |
- 'MessageBody' => $serialized_data, | |
- )); | |
- | |
- return (bool) $result; | |
- } | |
- | |
- /** | |
- * Return the amount of items in the queue | |
- * | |
- * Invokes SqsClient::getQueueAttributes(). | |
- * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_getQueueAttributes | |
- * | |
- * @return integer | |
- * Approximate Number of messages in the aws queue. Returns FALSE if SQS is | |
- * not available. | |
- */ | |
- public function numberOfItems() { | |
- // Request attributes of queue from AWS. The response is returned as a Guzzle | |
- // resource model object: | |
- // http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Guzzle.Service.Resource.Model.html | |
- $args = array( | |
- 'QueueUrl' => $this->queueUrl, | |
- 'AttributeNames' => array('ApproximateNumberOfMessages'), | |
- ); | |
- $response = $this->client->getQueueAttributes($args); | |
- | |
- $attributes = $response->get('Attributes'); | |
- if (!empty($attributes['ApproximateNumberOfMessages'])) { | |
- $return = $attributes['ApproximateNumberOfMessages']; | |
- } | |
- else { | |
- $return = FALSE; | |
- } | |
- | |
- return $return; | |
- } | |
- | |
- /** | |
- * Fetch a single item from the AWS SQS queue. | |
- * | |
- * Invokes SqsClient::receiveMessage(). | |
- * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_receiveMessage | |
- * http://docs.aws.amazon.com/aws-sdk-php-2/guide/latest/service-sqs.html#receiving-messages | |
- * | |
- * @param int $lease_time | |
- * Drupal's "lease time" is the same as AWS's "Visibility Timeout". It's the | |
- * amount of time for which an item is being claimed. If a user passes in a | |
- * value for $lease_time here, override the default claimTimeout. | |
- * | |
- * @return | |
- * On success we return an item object. If the queue is unable to claim an | |
- * item it returns false. This implies a best effort to retrieve an item | |
- * and either the queue is empty or there is some other non-recoverable | |
- * problem. | |
- */ | |
- public function claimItem($lease_time = 0) { | |
- // This is important to support blocking calls to the queue system | |
- $waitTimeSeconds = $this->getWaitTimeSeconds(); | |
- $claimTimeout = ($lease_time) ? $lease_time : $this->getClaimTimeout(); | |
- // if our given claimTimeout is smaller than the allowed waiting seconds | |
- // set the waitTimeSeconds to this value. This is to avoid a long call when | |
- // the worker that called claimItem only has a finite amount of time to wait | |
- // for an item | |
- // if $waitTimeSeconds is set to 0, it will never use the blocking | |
- // logic (which is intended) | |
- if ($claimTimeout < $waitTimeSeconds) { | |
- $waitTimeSeconds = $claimTimeout; | |
- } | |
- | |
- // Fetch the queue item. | |
- // @todo See usage of $lease_time. Should we use lease_time or other timeout below? | |
- // $message = $this->manager->receiveMessage($this->queue, $lease_time, true); | |
- | |
- // Retrieve item from AWS. See documentation about method and response here: | |
- $response = $this->client->receiveMessage(array( | |
- 'QueueUrl' => $this->queueUrl, | |
- 'MaxNumberOfMessages' => 1, | |
- 'VisibilityTimeout' => $claimTimeout, | |
- 'WaitTimeSeconds' => $waitTimeSeconds, | |
- )); | |
- | |
- // @todo Add error handling, in case service becomes unavailable. | |
- | |
- $item = new \stdClass(); | |
- $message = $response->getPath('Messages/*'); | |
- $item->data = $this->unserialize($message['Body']); | |
- $item->item_id = $message['ReceiptHandle']; | |
- | |
- if (!empty($item->item_id)) { | |
- return $item; | |
- } | |
- | |
- return FALSE; | |
- } | |
- | |
- /** | |
- * Release claim on item in the queue. | |
- * | |
- * In AWS lingo, you release a claim on an item in the queue by "terminating | |
- * its visibility timeout". (Similarly, you can extend the amount of time for | |
- * which an item is claimed by extending its visibility timeout. The maximum | |
- * visibility timeout for any item in any queue is 12 hours, including all | |
- * extensions.) | |
- * | |
- * Invokes SqsClient::ChangeMessageVisibility(). | |
- * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_changeMessageVisibility | |
- * http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html | |
- * | |
- * @param object $item | |
- * Item retrieved from queue. This property is required: $item->item_id. | |
- * | |
- * @return bool | |
- * TRUE for success. | |
- */ | |
- public function releaseItem($item) { | |
- $result = $this->client->changeMessageVisibility(array( | |
- 'QueueUrl' => $this->queueUrl, | |
- 'ReceiptHandle' => $item->item_id, | |
- 'VisibilityTimeout' => 0, | |
- )); | |
- | |
- // If $result is the type of object we expect, everything went okay. | |
- // (Typically SqsClient would have thrown an error before here if anything | |
- // went wrong. This check is really just for good measure.) | |
- return self::isGuzzleServiceResourceModel($result); | |
- } | |
- | |
- /** | |
- * Deletes an item from the queue with deleteMessage method. | |
- * | |
- * Invokes SqsClient::deleteMessage(). | |
- * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_deleteMessage | |
- * | |
- * @param Message $item | |
- * The item to be deleted. | |
- * | |
- * @return | |
- * DrupalQueueInterface::deleteItem() returns nothing. Don't return anything here. | |
- */ | |
- public function deleteItem($item) { | |
- if (!isset($item->item_id)) { | |
- throw new \Exception("An item that needs to be deleted requires a handle ID"); | |
- } | |
- | |
- $result = $this->client->deleteMessage(array( | |
- 'QueueUrl' => $this->queueUrl, | |
- 'ReceiptHandle' => $item->item_id, | |
- )); | |
- } | |
- | |
- /** | |
- * Create the Amazon Queue. | |
- * | |
- * Store queueUrl when queue is created. This is the queue's unique | |
- * identifier. | |
- * | |
- * Invokes SqsClient::createQueue(). | |
- * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_createQueue | |
- * | |
- * @return | |
- * DrupalQueueInterface::createQueue() returns nothing. Don't return anything here. | |
- */ | |
- public function createQueue() { | |
- $result = $this->client->createQueue(array('QueueName' => $this->name)); | |
- $queueUrl = $result->get('QueueUrl'); | |
- $this->setQueueUrl($queueUrl); | |
- } | |
- | |
- /** | |
- * Deletes an SQS queue. | |
- * | |
- * Invokes SqsClient::deleteQueue(). | |
- * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_deleteQueue | |
- * | |
- * @return | |
- * DrupalQueueInterface::deleteQueue() returns nothing. Don't return anything here. | |
- */ | |
- public function deleteQueue() { | |
- $result = $this->client->deleteQueue(array('QueueUrl' => $this->queueUrl)); | |
- } | |
- | |
- /** | |
- * Determine whether an object is an instance of | |
- * Guzzle\Service\Resource\Model. | |
- * | |
- * @param obj $object | |
- * | |
- * @return bool | |
- */ | |
- static private function isGuzzleServiceResourceModel($object) { | |
- return (is_object($object) && get_class($object) == 'Guzzle\Service\Resource\Model') ? TRUE : FALSE; | |
- } | |
- | |
- /** | |
- * PHP's native serialize() isn't very portable. This method enables people to | |
- * extend this class and support other serialization formats (so that | |
- * something other than PHP can potentially process the data in the queue, as | |
- * per discussion here: https://drupal.org/node/1956190). | |
- */ | |
- protected static function serialize($data) { | |
- return serialize($data); | |
- } | |
- | |
- /** | |
- * PHP's native serialize() isn't very portable. This method enables people to | |
- * extend this class and support other serialization formats (so that | |
- * something other than PHP can potentially process the data in the queue, as | |
- * per discussion here: https://drupal.org/node/1956190). | |
- */ | |
- protected static function unserialize($data) { | |
- return unserialize($data); | |
- } | |
- | |
- /******************************************************* | |
- * Getters and setters | |
- *******************************************************/ | |
- | |
- private function getAwsKey() { | |
- if (!isset($this->awsKey)) $this->setAwsKey(); | |
- return $this->awsKey; | |
- } | |
- | |
- private function setAwsKey() { | |
- | |
- $this->awsKey = $this->config->get('aws_sqs_aws_key'); | |
- } | |
- | |
- private function getAwsSecret() { | |
- if (!isset($this->awsSecret)) $this->setAwsSecret(); | |
- return $this->awsSecret; | |
- } | |
- | |
- private function setAwsSecret() { | |
- $this->awsSecret = $this->config->get('aws_sqs_aws_secret'); | |
- } | |
- | |
- private function getAwsRegion() { | |
- if (!isset($this->awsRegion)) $this->setAwsRegion(); | |
- return $this->awsRegion; | |
- } | |
- | |
- private function setAwsRegion() { | |
- $this->awsRegion = $this->config->get('aws_sqs_region', self::REGION_EU_WEST_1); | |
- } | |
- | |
- private function getClaimTimeout() { | |
- if (!isset($this->claimTimeout)) $this->setClaimTimeout(); | |
- return $this->claimTimeout; | |
- } | |
- | |
- private function setClaimTimeout() { | |
- $this->claimTimeout = $this->config->get('aws_sqs_claimtimeout'); | |
- } | |
- | |
- private function getClient() { | |
- if (!isset($this->client)) $this->setClient(); | |
- return $this->client; | |
- } | |
- | |
- private function setClient() { | |
- $client = SqsClient::factory(array( | |
- 'key' => $this->getAwsKey(), | |
- 'secret' => $this->getAwsSecret(), | |
- 'region' => $this->getAwsRegion(), | |
- )); | |
- $this->client = $client; | |
- } | |
- | |
- /** | |
- * $name is a required, user-defined param in __construct. It is set there. | |
- */ | |
- private function getName() { | |
- return $this->name; | |
- } | |
- | |
- private function setName($name) { | |
- $this->name = $name; | |
- } | |
- | |
- private function getQueueUrl() { | |
- if (!isset($this->queueUrl)) { | |
- $text = t("You have to create a queue before you can get its URL. Use createQueue()."); | |
- watchdog('aws_sqs', $text, array(), WATCHDOG_WARNING); | |
- return FALSE; | |
- } | |
- else { | |
- return $this->queueUrl; | |
- } | |
- } | |
- | |
- /** | |
- * @see createQueue(). | |
- */ | |
- private function setQueueUrl($queueUrl) { | |
- $this->queueUrl = $queueUrl; | |
- } | |
- | |
- private function getWaitTimeSeconds() { | |
- if (!isset($this->waitTimeSeconds)) $this->setWaitTimeSeconds(); | |
- return $this->waitTimeSeconds; | |
- } | |
- | |
- private function setWaitTimeSeconds() { | |
- $this->waitTimeSeconds = $this->config->get('aws_sqs_waittimeseconds'); | |
- } | |
-} | |
diff --git a/lib/Drupal/aws_sqs/Queue/QueueAwsSqsFactory.php b/lib/Drupal/aws_sqs/Queue/QueueAwsSqsFactory.php | |
deleted file mode 100644 | |
index c5a3c14..0000000 | |
--- a/lib/Drupal/aws_sqs/Queue/QueueAwsSqsFactory.php | |
+++ /dev/null | |
@@ -1,29 +0,0 @@ | |
-<?php | |
- | |
-/** | |
- * @file | |
- * Contains \Drupal\Core\Queue\QueueDatabaseFactory. | |
- */ | |
- | |
-namespace Drupal\aws_sqs\Queue; | |
- | |
-/** | |
- * Defines the key/value store factory for the database backend. | |
- */ | |
-class QueueAwsSqsFactory { | |
- | |
- /** | |
- * Constructs a new queue object for a given name. | |
- * | |
- * @param string $name | |
- * The name of the collection holding key and value pairs. | |
- * @param \Drupal\Core\Database\Connection $connection | |
- * The connection to run against. | |
- * | |
- * @return \Drupal\Core\Queue\DatabaseQueue | |
- * A key/value store implementation for the given $collection. | |
- */ | |
- public function get($name) { | |
- return new AwsSqsQueue($name); | |
- } | |
-} | |
diff --git a/src/Queue/AwsSqsQueue.php b/src/Queue/AwsSqsQueue.php | |
new file mode 100644 | |
index 0000000..194e7ac | |
--- /dev/null | |
+++ b/src/Queue/AwsSqsQueue.php | |
@@ -0,0 +1,360 @@ | |
+<?php | |
+ | |
+/** | |
+ * @file | |
+ * Definition of AwsSqsQueue. | |
+ * Contains \Drupal\aws_sqs\Queue\AwsSqsQueue. | |
+ */ | |
+ | |
+/** | |
+ * Use SQS Client provided by AWS SDK PHP version 2. | |
+ * | |
+ * More info: | |
+ * | |
+ * http://aws.amazon.com/php | |
+ * https://github.com/aws/aws-sdk-php | |
+ * http://docs.aws.amazon.com/aws-sdk-php-2/latest/ | |
+ * http://docs.aws.amazon.com/aws-sdk-php-2/guide/latest/service-sqs.html | |
+ * | |
+ * Responses to HTTP requests made through SqsClient are returned as Guzzle | |
+ * objects. More info about Guzzle here: | |
+ * | |
+ * http://guzzlephp.org/ | |
+ */ | |
+ | |
+namespace Drupal\aws_sqs\Queue; | |
+ | |
+use Aws\Sqs\SqsClient; | |
+use Aws\AwsClientInterface; | |
+use Drupal\Core\Logger\LoggerChannelInterface; | |
+use Drupal\Core\Queue\ReliableQueueInterface; | |
+ | |
+ | |
+/** | |
+ * Amazon queue. | |
+ */ | |
+class AwsSqsQueue implements ReliableQueueInterface { | |
+ | |
+ /** | |
+ * The name of the queue this instance is working with. | |
+ * | |
+ * @var string | |
+ */ | |
+ protected $claimTimeout; | |
+ protected $client; // SqsClient provided by AWS as interface to SQS. | |
+ protected $name; // Queue name. | |
+ protected $queueUrl; // Uniqueue identifier for queue. | |
+ protected $waitTimeSeconds; | |
+ | |
+ /** | |
+ * @var Drupal\Core\Logger\LoggerChannelInterface | |
+ */ | |
+ protected $logger; | |
+ | |
+ /** | |
+ * AwsSqsQueue constructor. | |
+ * @param $name | |
+ * @param $client AwsClientInterface | |
+ * @param $logger LoggerChannelInterface | |
+ */ | |
+ public function __construct($name, AwsClientInterface $client, LoggerChannelInterface $logger) { | |
+ $this->name = $name; | |
+ $this->client = $client; | |
+ $this->logger = $logger; | |
+ | |
+ // Ensure the the queue exists and that we have a queue URL so that we | |
+ // aren't checking for the queueUrl everywhere. | |
+ $this->createQueue(); | |
+ } | |
+ | |
+ | |
+ /** | |
+ * Send an item to the AWS Queue. | |
+ * | |
+ * Careful, you can only store data up to 64kb. | |
+ * @todo Add link to documentation here. I think this info is out of date. | |
+ * I believe now you can store more. But you get charged as if it's an additional | |
+ * request. | |
+ * | |
+ * Invokes SqsClient::sendMessage(). | |
+ * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_sendMessage | |
+ * | |
+ * @param $data | |
+ * Caller should be sending serialized data. If an item retreived from the queue is | |
+ * being re-submitted to the queue (if is_object($item) && $item->data && | |
+ * item->item_id), only $item->data will be stored. | |
+ * | |
+ * @return bool | |
+ */ | |
+ public function createItem($data) { | |
+ | |
+ // Check to see if someone is trying to save an item originally retrieved | |
+ // from the queue. If so, this really should have been submitted as | |
+ // $item->data, not $item. Reformat this so we don't save metadata or | |
+ // confuse item_ids downstream. | |
+ if (is_object($data) && property_exists($data, 'data') && property_exists($data, 'item_id')) { | |
+ $text = t('Do not re-queue whole items retrieved from the SQS queue. This included metadata, like the item_id. Pass $item->data to createItem() as a parameter, rather than passing the entire $item. $item->data is being saved. The rest is being ignored.'); | |
+ $data = $data->data; | |
+ $this->logger->error($text); | |
+ } | |
+ | |
+ // @todo Add a check here for message size? Log it? | |
+ | |
+ // Create a new message object | |
+ $result = $this->client->sendMessage(array( | |
+ 'QueueUrl' => $this->queueUrl, | |
+ 'MessageBody' => $data, | |
+ )); | |
+ | |
+ return (bool) $result; | |
+ } | |
+ | |
+ /** | |
+ * Return the amount of items in the queue | |
+ * | |
+ * Invokes SqsClient::getQueueAttributes(). | |
+ * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_getQueueAttributes | |
+ * | |
+ * @return integer | |
+ * Approximate Number of messages in the aws queue. Returns FALSE if SQS is | |
+ * not available. | |
+ */ | |
+ public function numberOfItems() { | |
+ // Request attributes of queue from AWS. The response is returned as a Guzzle | |
+ // resource model object: | |
+ // http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Guzzle.Service.Resource.Model.html | |
+ $args = array( | |
+ 'QueueUrl' => $this->queueUrl, | |
+ 'AttributeNames' => array('ApproximateNumberOfMessages'), | |
+ ); | |
+ $response = $this->client->getQueueAttributes($args); | |
+ | |
+ $attributes = $response->get('Attributes'); | |
+ if (!empty($attributes['ApproximateNumberOfMessages'])) { | |
+ $return = $attributes['ApproximateNumberOfMessages']; | |
+ } | |
+ else { | |
+ $return = FALSE; | |
+ } | |
+ | |
+ return $return; | |
+ } | |
+ | |
+ /** | |
+ * Fetch a single item from the AWS SQS queue. | |
+ * | |
+ * Invokes SqsClient::receiveMessage(). | |
+ * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_receiveMessage | |
+ * http://docs.aws.amazon.com/aws-sdk-php-2/guide/latest/service-sqs.html#receiving-messages | |
+ * | |
+ * @param int $lease_time | |
+ * Drupal's "lease time" is the same as AWS's "Visibility Timeout". It's the | |
+ * amount of time for which an item is being claimed. If a user passes in a | |
+ * value for $lease_time here, override the default claimTimeout. | |
+ * | |
+ * @return | |
+ * On success we return an item object. If the queue is unable to claim an | |
+ * item it returns false. This implies a best effort to retrieve an item | |
+ * and either the queue is empty or there is some other non-recoverable | |
+ * problem. | |
+ */ | |
+ public function claimItem($lease_time = 0) { | |
+ // This is important to support blocking calls to the queue system | |
+ $waitTimeSeconds = $this->getWaitTimeSeconds(); | |
+ $claimTimeout = ($lease_time) ? $lease_time : $this->getClaimTimeout(); | |
+ // if our given claimTimeout is smaller than the allowed waiting seconds | |
+ // set the waitTimeSeconds to this value. This is to avoid a long call when | |
+ // the worker that called claimItem only has a finite amount of time to wait | |
+ // for an item | |
+ // if $waitTimeSeconds is set to 0, it will never use the blocking | |
+ // logic (which is intended) | |
+ if ($claimTimeout < $waitTimeSeconds) { | |
+ $waitTimeSeconds = $claimTimeout; | |
+ } | |
+ | |
+ // Fetch the queue item. | |
+ // @todo See usage of $lease_time. Should we use lease_time or other timeout below? | |
+ // $message = $this->manager->receiveMessage($this->queue, $lease_time, true); | |
+ | |
+ // Retrieve item from AWS. See documentation about method and response here: | |
+ $response = $this->client->receiveMessage(array( | |
+ 'QueueUrl' => $this->queueUrl, | |
+ 'MaxNumberOfMessages' => 1, | |
+ 'VisibilityTimeout' => $claimTimeout, | |
+ 'WaitTimeSeconds' => $waitTimeSeconds, | |
+ )); | |
+ | |
+ // @todo Add error handling, in case service becomes unavailable. | |
+ | |
+ $item = new \stdClass(); | |
+ $messageBody = $response->toArray()['Messages']['0']; | |
+ $item->data = $messageBody['Body']; | |
+ $item->item_id = $messageBody['ReceiptHandle']; | |
+ if(!empty($item->item_id)) { | |
+ return $item; | |
+ } | |
+ return FALSE; | |
+ } | |
+ | |
+ /** | |
+ * Release claim on item in the queue. | |
+ * | |
+ * In AWS lingo, you release a claim on an item in the queue by "terminating | |
+ * its visibility timeout". (Similarly, you can extend the amount of time for | |
+ * which an item is claimed by extending its visibility timeout. The maximum | |
+ * visibility timeout for any item in any queue is 12 hours, including all | |
+ * extensions.) | |
+ * | |
+ * Invokes SqsClient::ChangeMessageVisibility(). | |
+ * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_changeMessageVisibility | |
+ * http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html | |
+ * | |
+ * @param object $item | |
+ * Item retrieved from queue. This property is required: $item->item_id. | |
+ * | |
+ * @return bool | |
+ * TRUE for success. | |
+ */ | |
+ public function releaseItem($item) { | |
+ $result = $this->client->changeMessageVisibility(array( | |
+ 'QueueUrl' => $this->queueUrl, | |
+ 'ReceiptHandle' => $item->item_id, | |
+ 'VisibilityTimeout' => 0, | |
+ )); | |
+ | |
+ // If $result is the type of object we expect, everything went okay. | |
+ // (Typically SqsClient would have thrown an error before here if anything | |
+ // went wrong. This check is really just for good measure.) | |
+ return self::isGuzzleServiceResourceModel($result); | |
+ } | |
+ | |
+ /** | |
+ * Deletes an item from the queue with deleteMessage method. | |
+ * | |
+ * Invokes SqsClient::deleteMessage(). | |
+ * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_deleteMessage | |
+ * | |
+ * @param Message $item | |
+ * The item to be deleted. | |
+ * | |
+ * @return | |
+ * DrupalQueueInterface::deleteItem() returns nothing. Don't return anything here. | |
+ */ | |
+ public function deleteItem($item) { | |
+ if (!isset($item->item_id)) { | |
+ throw new \Exception("An item that needs to be deleted requires a handle ID"); | |
+ } | |
+ | |
+ $result = $this->client->deleteMessage(array( | |
+ 'QueueUrl' => $this->queueUrl, | |
+ 'ReceiptHandle' => $item->item_id, | |
+ )); | |
+ } | |
+ | |
+ /** | |
+ * Create the Amazon Queue. | |
+ * | |
+ * Store queueUrl when queue is created. This is the queue's unique | |
+ * identifier. | |
+ * | |
+ * Invokes SqsClient::createQueue(). | |
+ * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_createQueue | |
+ * | |
+ * @return | |
+ * DrupalQueueInterface::createQueue() returns nothing. Don't return anything here. | |
+ */ | |
+ public function createQueue() { | |
+ $result = $this->client->createQueue(array('QueueName' => $this->name)); | |
+ $queueUrl = $result->get('QueueUrl'); | |
+ $this->setQueueUrl($queueUrl); | |
+ } | |
+ | |
+ /** | |
+ * Deletes an SQS queue. | |
+ * | |
+ * Invokes SqsClient::deleteQueue(). | |
+ * http://docs.aws.amazon.com/aws-sdk-php-2/latest/class-Aws.Sqs.SqsClient.html#_deleteQueue | |
+ * | |
+ * @return | |
+ * DrupalQueueInterface::deleteQueue() returns nothing. Don't return anything here. | |
+ */ | |
+ public function deleteQueue() { | |
+ $result = $this->client->deleteQueue(array('QueueUrl' => $this->queueUrl)); | |
+ } | |
+ | |
+ /** | |
+ * Determine whether an object is an instance of | |
+ * Guzzle\Service\Resource\Model. | |
+ * | |
+ * @param obj $object | |
+ * | |
+ * @return bool | |
+ */ | |
+ static protected function isGuzzleServiceResourceModel($object) { | |
+ return (is_object($object) && get_class($object) == 'Guzzle\Service\Resource\Model') ? TRUE : FALSE; | |
+ } | |
+ | |
+ /** | |
+ * PHP's native serialize() isn't very portable. This method enables people to | |
+ * extend this class and support other serialization formats (so that | |
+ * something other than PHP can potentially process the data in the queue, as | |
+ * per discussion here: https://drupal.org/node/1956190). | |
+ * | |
+ * @todo: Depend on the Drupal serialization module for this. | |
+ */ | |
+ protected static function serialize($data) { | |
+ return serialize($data); | |
+ } | |
+ | |
+ /** | |
+ * PHP's native serialize() isn't very portable. This method enables people to | |
+ * extend this class and support other serialization formats (so that | |
+ * something other than PHP can potentially process the data in the queue, as | |
+ * per discussion here: https://drupal.org/node/1956190). | |
+ * | |
+ * @todo: Depend on the Drupal serialization module for this. | |
+ */ | |
+ protected static function unserialize($data) { | |
+ return unserialize($data); | |
+ } | |
+ | |
+ /******************************************************* | |
+ * Getters and setters | |
+ *******************************************************/ | |
+ | |
+ public function getClaimTimeout() { | |
+ return $this->claimTimeout; | |
+ } | |
+ | |
+ public function setClaimTimeout($timeout) { | |
+ $this->claimTimeout = $timeout; | |
+ } | |
+ | |
+ public function getClient() { | |
+ return $this->client; | |
+ } | |
+ | |
+ public function setClient($client) { | |
+ $this->client = $client; | |
+ } | |
+ | |
+ public function getName() { | |
+ return $this->name; | |
+ } | |
+ | |
+ protected function getQueueUrl() { | |
+ return $this->queueUrl; | |
+ } | |
+ | |
+ protected function setQueueUrl($queueUrl) { | |
+ $this->queueUrl = $queueUrl; | |
+ } | |
+ | |
+ public function getWaitTimeSeconds() { | |
+ return $this->waitTimeSeconds; | |
+ } | |
+ | |
+ public function setWaitTimeSeconds($seconds) { | |
+ $this->waitTimeSeconds = $seconds; | |
+ } | |
+} | |
diff --git a/src/Queue/AwsSqsQueueFactory.php b/src/Queue/AwsSqsQueueFactory.php | |
new file mode 100644 | |
index 0000000..e2f83dd | |
--- /dev/null | |
+++ b/src/Queue/AwsSqsQueueFactory.php | |
@@ -0,0 +1,62 @@ | |
+<?php | |
+ | |
+/** | |
+ * @file | |
+ * Contains \Drupal\aws_sqs\Queue\AwsSqsQueueFactory. | |
+ */ | |
+ | |
+namespace Drupal\aws_sqs\Queue; | |
+ | |
+use Drupal\Core\Config\ConfigFactoryInterface; | |
+use Drupal\Core\Logger\LoggerChannelFactoryInterface; | |
+use Aws\Sqs\SqsClient; | |
+ | |
+class AwsSqsQueueFactory { | |
+ | |
+ /** | |
+ * @var Drupal\Core\Config\ConfigFactoryInterface | |
+ */ | |
+ protected $config; | |
+ | |
+ /** | |
+ * @var Drupal\Core\Logger\LoggerChannelInterface | |
+ */ | |
+ protected $logger; | |
+ | |
+ /** | |
+ * Constructs a AwsSqsQueue object. | |
+ * | |
+ * @param Drupal\Core\Config\ConfigFactoryInterface | |
+ * @param Drupal\Core\Logger\LoggerChannelFactoryInterface | |
+ */ | |
+ public function __construct(ConfigFactoryInterface $config_factory, LoggerChannelFactoryInterface $logger_factory) { | |
+ $this->config = $config_factory->get('aws_sqs.settings'); | |
+ $this->logger = $logger_factory->get('aws_sqs'); | |
+ } | |
+ | |
+ /** | |
+ * Constructs a new queue object for a given name. | |
+ * | |
+ * @param string $name | |
+ * The name of the SQS queue to use. | |
+ * | |
+ * @return \Drupal\aws_sqs\Queue\AwsSqsQueue | |
+ */ | |
+ public function get($name) { | |
+ $client = new SqsClient(array( | |
+ 'credentials' => array( | |
+ 'key' => $this->config->get('aws_sqs_aws_key'), | |
+ 'secret' => $this->config->get('aws_sqs_aws_secret'), | |
+ ), | |
+ 'region' => $this->config->get('aws_sqs_region', 'us-east-1'), | |
+ 'version' => $this->config->get('aws_sqs_version', 'latest') | |
+ )); | |
+ | |
+ $queue = new AwsSqsQueue($name, $client, $this->logger); | |
+ $queue->setClaimTimeout($this->config->get('aws_sqs_claimtimeout')); | |
+ $queue->setWaitTimeSeconds($this->config->get('aws_sqs_waittimeseconds')); | |
+ | |
+ return $queue; | |
+ } | |
+ | |
+} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment