Skip to content

Instantly share code, notes, and snippets.

@dacc
Created March 19, 2012 19:20
Show Gist options
  • Save dacc/2124991 to your computer and use it in GitHub Desktop.
Save dacc/2124991 to your computer and use it in GitHub Desktop.
import akka.actor.ActorRef;
import akka.camel.*;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import static akka.actor.Actors.actorOf;
import static akka.camel.CamelServiceFactory.createCamelService;
import static java.lang.String.format;
import static java.lang.System.out;
/**
* Demonstration of processing messages from an SQS queue using akka-camel and camel-aws.
*
* Dependencies:
* se.scalablesolutions.akka:akka-actor:jar:1.3
* se.scalablesolutions.akka:akka-camel:jar:1.3
* org.apache.camel:camel-core:jar:2.8.4
* org.apache.camel:camel-aws:jar:2.8.4
* com.amazonaws:aws-java-sdk:jar:1.3.3
*/
public class SqsFaultHandling {
/**
* How long you want SQS to give you to process a message before giving it to someone else. Should be large enough
* to ensure adequate time to complete, and short enough that retries will be timely. Currently governs retry on
* both explicit failure via a Failure message and simply disappearing and never responding, as the former doesn't
* appear to reset the visibility timeout.
*/
private static final int VISIBILITY_TIMEOUT_SECONDS = 5;
private static AWSCredentials creds;
static {
try {
creds = new PropertiesCredentials(new File("aws.properties"));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static String endpointUri;
static {
try {
endpointUri = format("aws-sqs://fault-handling-test?deleteAfterRead=true&messageVisibilityTimeout=%d&accessKey=%s&secretKey=%s",
VISIBILITY_TIMEOUT_SECONDS,
creds.getAWSAccessKeyId(),
URLEncoder.encode(creds.getAWSSecretKey(), "ISO-8859-1")
);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static class Producer extends UntypedProducerActor {
public String getEndpointUri() {
return endpointUri;
}
}
public static class Consumer extends UntypedConsumerActor {
public String getEndpointUri() {
return endpointUri;
}
public boolean isAutoack() {
// prevents message deletion in SQS until we explicitly acknowledge that processing succeeded.
return false;
}
public void onReceive(Object message) throws Exception {
if (message instanceof Message) {
Message camelMessage = (Message) message;
String body = (String) camelMessage.body();
if ("fail".equals(body)) {
Thread.sleep(1000);
throw new RuntimeException("blech");
} else {
out.println("got: " + body);
// comment out to simulate long running, parallel message processing. currently requires a patch to
// SqsConsumer, which will otherwise block on the first message completing. see:
// http://camel.465427.n5.nabble.com/Better-Way-to-Achieve-Parallel-Processing-of-SQS-Messages-td5578135.html
getContext().reply(Ack.ack());
}
}
}
public void preRestart(Throwable reason) {
// explicitly fail to process the message. seems like this should go back to SQS and rewrite the visibility
// timeout to zero, but currently it doesn't.
getContext().replySafe(new Failure(reason));
}
}
public static void main(String[] args) throws InterruptedException {
createCamelService().start();
ActorRef producer = actorOf(Producer.class);
producer.start();
ActorRef consumer = actorOf(Consumer.class);
consumer.start();
// all of the oks will be processed and removed from the SQS queue, and the fail will keep getting retried.
producer.tell("ok");
producer.tell("fail");
producer.tell("ok3");
producer.tell("ok4");
while (consumer.isRunning())
Thread.sleep(500);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment