Skip to content

Instantly share code, notes, and snippets.

@dacc
Created March 19, 2012 20:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacc/2126164 to your computer and use it in GitHub Desktop.
Save dacc/2126164 to your computer and use it in GitHub Desktop.
Index: ../../src/apache-camel-2.8.4/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- ../../src/apache-camel-2.8.4/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (revision )
+++ ../../src/apache-camel-2.8.4/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (revision )
@@ -16,22 +16,13 @@
*/
package org.apache.camel.component.aws.sqs;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-
-import org.apache.camel.BatchConsumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.NoFactoryAvailableException;
-import org.apache.camel.Processor;
-import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.*;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.spi.Synchronization;
@@ -41,7 +32,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
/**
* A Consumer of messages from the Amazon Web Service Simple Queue Service
* <a href="http://aws.amazon.com/sqs/">AWS SQS</a>
@@ -96,7 +91,7 @@
for (int index = 0; index < total && isBatchAllowed(); index++) {
// only loop if we are started (allowed to run)
- Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+ final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
// add current index and total as properties
exchange.setProperty(Exchange.BATCH_INDEX, index);
exchange.setProperty(Exchange.BATCH_SIZE, total);
@@ -123,7 +118,24 @@
LOG.trace("Processing exchange [{}]...", exchange);
- getProcessor().process(exchange);
+ // allows multiple messages to be processed in parallel.
+ final AsyncProcessor processor = (AsyncProcessor) getProcessor();
+
+ processor.process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ if (!doneSync) {
+ LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Done " + processor;
+ }
+ });
+
+ // blocks on processing further messages until this one is complete when autoack is turned off.
+// getProcessor().process(exchange);
}
return total;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment