Created
March 19, 2012 19:20
-
-
Save dacc/2124991 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorRef; | |
import akka.camel.*; | |
import com.amazonaws.auth.AWSCredentials; | |
import com.amazonaws.auth.PropertiesCredentials; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.UnsupportedEncodingException; | |
import java.net.URLEncoder; | |
import static akka.actor.Actors.actorOf; | |
import static akka.camel.CamelServiceFactory.createCamelService; | |
import static java.lang.String.format; | |
import static java.lang.System.out; | |
/** | |
* Demonstration of processing messages from an SQS queue using akka-camel and camel-aws. | |
* | |
* Dependencies: | |
* se.scalablesolutions.akka:akka-actor:jar:1.3 | |
* se.scalablesolutions.akka:akka-camel:jar:1.3 | |
* org.apache.camel:camel-core:jar:2.8.4 | |
* org.apache.camel:camel-aws:jar:2.8.4 | |
* com.amazonaws:aws-java-sdk:jar:1.3.3 | |
*/ | |
public class SqsFaultHandling { | |
/** | |
* How long you want SQS to give you to process a message before giving it to someone else. Should be large enough | |
* to ensure adequate time to complete, and short enough that retries will be timely. Currently governs retry on | |
* both explicit failure via a Failure message and simply disappearing and never responding, as the former doesn't | |
* appear to reset the visibility timeout. | |
*/ | |
private static final int VISIBILITY_TIMEOUT_SECONDS = 5; | |
private static AWSCredentials creds; | |
static { | |
try { | |
creds = new PropertiesCredentials(new File("aws.properties")); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private static String endpointUri; | |
static { | |
try { | |
endpointUri = format("aws-sqs://fault-handling-test?deleteAfterRead=true&messageVisibilityTimeout=%d&accessKey=%s&secretKey=%s", | |
VISIBILITY_TIMEOUT_SECONDS, | |
creds.getAWSAccessKeyId(), | |
URLEncoder.encode(creds.getAWSSecretKey(), "ISO-8859-1") | |
); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public static class Producer extends UntypedProducerActor { | |
public String getEndpointUri() { | |
return endpointUri; | |
} | |
} | |
public static class Consumer extends UntypedConsumerActor { | |
public String getEndpointUri() { | |
return endpointUri; | |
} | |
public boolean isAutoack() { | |
// prevents message deletion in SQS until we explicitly acknowledge that processing succeeded. | |
return false; | |
} | |
public void onReceive(Object message) throws Exception { | |
if (message instanceof Message) { | |
Message camelMessage = (Message) message; | |
String body = (String) camelMessage.body(); | |
if ("fail".equals(body)) { | |
Thread.sleep(1000); | |
throw new RuntimeException("blech"); | |
} else { | |
out.println("got: " + body); | |
// comment out to simulate long running, parallel message processing. currently requires a patch to | |
// SqsConsumer, which will otherwise block on the first message completing. see: | |
// http://camel.465427.n5.nabble.com/Better-Way-to-Achieve-Parallel-Processing-of-SQS-Messages-td5578135.html | |
getContext().reply(Ack.ack()); | |
} | |
} | |
} | |
public void preRestart(Throwable reason) { | |
// explicitly fail to process the message. seems like this should go back to SQS and rewrite the visibility | |
// timeout to zero, but currently it doesn't. | |
getContext().replySafe(new Failure(reason)); | |
} | |
} | |
public static void main(String[] args) throws InterruptedException { | |
createCamelService().start(); | |
ActorRef producer = actorOf(Producer.class); | |
producer.start(); | |
ActorRef consumer = actorOf(Consumer.class); | |
consumer.start(); | |
// all of the oks will be processed and removed from the SQS queue, and the fail will keep getting retried. | |
producer.tell("ok"); | |
producer.tell("fail"); | |
producer.tell("ok3"); | |
producer.tell("ok4"); | |
while (consumer.isRunning()) | |
Thread.sleep(500); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment