Skip to content

Instantly share code, notes, and snippets.

@sappenin
Last active August 29, 2015 14:18
Show Gist options
  • Save sappenin/bfb1ca18a3d70bc9ad51 to your computer and use it in GitHub Desktop.
Save sappenin/bfb1ca18a3d70bc9ad51 to your computer and use it in GitHub Desktop.
A example of a generic mechanism to transactionally commit a unit Work using Objectify with a "once-only" guarantee
package com.googlecode.objectify.idempotence.examples.counters;
import com.googlecode.objectify.annotation.Entity;
import com.googlecode.objectify.annotation.Id;
/**
* An Objectify entity for storing a simple count.
*/
@Entity
public class Counter
{
@Id
private String counterName;
private long count;
/**
* No-args Constructor.
*
* @deprecated Exists only for Objectify. Prefer the required-args Constructor instead.
*/
@Deprecated
public Counter()
{
}
/**
* Required-args Constructor.
*
* @param counterName
*/
public Counter(final String counterName)
{
this.counterName = counterName;
}
public String getCounterName()
{
return this.counterName;
}
public long getCount()
{
return count;
}
public void setCount(long count)
{
this.count = count;
}
}
package com.googlecode.objectify.idempotence.examples.counters;
import com.google.appengine.repackaged.com.google.common.base.Preconditions;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.ObjectifyService;
import com.googlecode.objectify.idempotence.OnceOnlyToken;
import com.googlecode.objectify.idempotence.OnceOnlyWork;
import com.googlecode.objectify.idempotence.OnceOnlyWorkAlreadyAppliedException;
/**
* An examples service that can be used to atomically increment an instance of {@link Counter} in an idempotent manner
* to ensure that a given "increment" only occurs once, independent of the current "count" of a counter.
*
* This examples differs slightly from the bank withdrawal examples that may depend on the current balance to succeed,
* but should also not be applied twice.
*/
public class OnceOnlyCounterIncrementExampleService
{
/**
* Increment a counter using an instance of {@link OnceOnlyToken} to ensure that the work is only ever applied once.
*
* @param onceOnlyToken An instance of {@link OnceOnlyToken} that must be supplied by callers of this method so that
* in the event of a system failure, timeout or other type of non-response, the increment can be retried
* again without a double-application of the work.
* @param counterName A {@link String} that uniquely identifies the counter.
*/
public void incrementCounter(final OnceOnlyToken onceOnlyToken, final String counterName)
{
Preconditions.checkNotNull(onceOnlyToken);
Preconditions.checkNotNull(counterName);
// Per the AppEngine docs, assemble everything you can before entering the transaation.
final Key<Counter> counterKey = Key.create(Counter.class, counterName);
final int numRetries = 10;
try
{
// Increment a counter by 1 using typical Objectify semantics. The OnceOnlyWork will perform the
// operations in #doWork below in a once-only fashion.
ObjectifyService.ofy().transact(new OnceOnlyWork<Void>(onceOnlyToken, numRetries)
{
@Override
public Void doWork()
{
final Counter counter = ObjectifyService.ofy().load().key(counterKey).now();
counter.setCount(counter.getCount() + 1L);
ObjectifyService.ofy().save().entity(counter);
return null;
}
});
}
catch (OnceOnlyWorkAlreadyAppliedException e)
{
// eat this - if the work already applied, we don't care.
}
}
}
package com.googlecode.objectify.idempotence;
import com.google.appengine.repackaged.com.google.common.base.Preconditions;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.ObjectifyService;
import com.googlecode.objectify.Work;
import com.googlecode.objectify.annotation.Entity;
import com.googlecode.objectify.annotation.Id;
import com.googlecode.objectify.annotation.Parent;
/**
* An Objectify entity that is tied to a parent entity so that it can be loaded and saved in the same entity group as
* the parent. Acts as a guard against double-applying a certain unit of {@link Work} by utilizing the transactional
* semantics of the Appengine Datastore that says, "In a transaction, all reads reflect the current, consistent state of
* the Datastore at the time the transaction started...Queries and gets inside a transaction are guaranteed to see a
* single, consistent snapshot of the Datastore as of the beginning of the transaction". Loading this entity at the
* beginning of a transaction will indicate if a previous operation already stored it. Thus, if it exists in the
* Datastore at the beginning of a Transation, then the {@link Work} should be aborted.
*/
@Entity
public class OnceOnlyToken
{
// Think more about the identifier of this class. Currently, we prefer a long number here instead of a String
// derived from a UUID (or something similar) so that outside callers don't have to worry about creating something
// unique (the @Parent key and this id are unique) and we can leverage the Datastore to guarantee the unique identifier
// via line ~54.
@Id
private long id;
@Parent
private Key<?> parentEntityKey;
/**
* No-args Constructor.
*
* @deprecated Exists only for Objectify. Prefer the required-args constructor instead.
*/
@Deprecated
public OnceOnlyToken()
{
}
/**
* Default Constructor.
*
* @param parentEntityKey Should be the same key as the entity being mutated in a corresponding instance of
* {@link OnceOnlyWork}.
*/
public OnceOnlyToken(final Key<?> parentEntityKey)
{
Preconditions.checkNotNull(parentEntityKey);
this.parentEntityKey = parentEntityKey;
this.id = ObjectifyService.factory().allocateId(this.getClass()).getId();
}
public long getId()
{
return id;
}
public Key<?> getParentCounterKey()
{
return parentEntityKey;
}
/**
* Helper method to generate an instance of {@link Key} for this entity.
*
* @return
*/
public Key<OnceOnlyToken> getTypedKey()
{
return Key.create(this.getParentCounterKey(), OnceOnlyToken.class, this.getId());
}
@Override
public boolean equals(Object o)
{
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
OnceOnlyToken that = (OnceOnlyToken) o;
if (id != that.id)
return false;
return parentEntityKey.equals(that.parentEntityKey);
}
@Override
public int hashCode()
{
int result = (int) (id ^ (id >>> 32));
result = 31 * result + parentEntityKey.hashCode();
return result;
}
}
package com.googlecode.objectify.idempotence;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.google.appengine.api.datastore.DatastoreFailureException;
import com.google.appengine.api.datastore.DatastoreTimeoutException;
import com.google.appengine.repackaged.com.google.common.base.Preconditions;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.ObjectifyService;
import com.googlecode.objectify.Work;
/**
* An extension of {@link Work} that guarantees the operations executed in {@link #doWork()} are executed exactly zero
* or one time, but not more.
*
* @param <R> The type that this {@link Work} will return after completing.
*/
public abstract class OnceOnlyWork<R> implements Work<R>
{
private static final Logger log = Logger.getLogger(OnceOnlyWork.class.getName());
private final OnceOnlyToken onceOnlyToken;
private final int numRetries;
// TODO: Consider another constructor that sets the default numRetries to a meaningful default.
// TODO: Consider different retry characteristicis if an instance of this class is running in a task queue vs
// a frontend/backend instance.
/**
* Required-args Constructor.
*
* @param onceOnlyToken An instance of {@link OnceOnlyToken}.
* @param numRetries The number of times to retry after encountering a {@link DatastoreFailureException} or a
* {@link DatastoreTimeoutException}.
*/
public OnceOnlyWork(final OnceOnlyToken onceOnlyToken, final int numRetries)
{
Preconditions.checkNotNull(onceOnlyToken, "The OnceOnlyToken must not be null!");
this.onceOnlyToken = onceOnlyToken;
Preconditions.checkArgument(numRetries >= 0);
this.numRetries = numRetries;
}
@Override
public R run()
{
// Per the AppEngine docs, assemble everything you can before entering the transaction.
final Key<OnceOnlyToken> onceOnlyTokenKey = onceOnlyToken.getTypedKey();
int numRetries = OnceOnlyWork.this.numRetries;
while (true)
{
try
{
// This call to ofy().transact inherits the transactional context, if any, and
// also retries up to Integer.MAX_VALUE times in response to any
// ConcurrentModificationException.
return ObjectifyService.ofy().transact(new Work<R>()
{
@Override
public R run()
{
// This will load the OnceOnlyToken from the Datastore as it existed at the
// beginning of the TX.
final OnceOnlyToken preExistingOnceOnlyToken = ObjectifyService.ofy().load()
.key(onceOnlyTokenKey).now();
if (preExistingOnceOnlyToken != null)
{
// As of the start of this transaction, the preExistingOnceOnlyToken already exists in the
// Datastore, which means this transaction has already committed in a previous try, so
// abort!
// TODO: Is there a better way than an exception to indicate this?
throw new OnceOnlyWorkAlreadyAppliedException("Transaction has already been applied!",
onceOnlyToken);
}
else
{
ObjectifyService.ofy().save().entity(onceOnlyToken);
return doWork();
}
}
});
}
catch (OnceOnlyWorkAlreadyAppliedException ex)
{
return null;
}
catch (DatastoreFailureException | DatastoreTimeoutException ex)
{
// Catch this exception, and consider repeating some amount of times until the Datastore recovers. If
// this operation is being performed in a taskqueue, then this exception can merely be thrown and the
// task will be retried.
//
// Otherwise, be aware that the datastore may not recover before this operation's AppEngine request
// times out, so it may be necessary to call #incrementCounter again with the same onceOnly token.
if (numRetries-- > 0)
{
if (log.isLoggable(Level.WARNING))
log.warning("Unknown Datastore failure (retrying): " + ex);
if (log.isLoggable(Level.FINEST))
log.log(Level.FINEST, "Details of unknown Datastore failure", ex);
}
else
{
throw ex;
}
}
}
}
/**
* Perform the actual work to be done.
*/
public abstract R doWork();
}
package com.googlecode.objectify.idempotence;
import com.google.appengine.repackaged.com.google.common.base.Preconditions;
/**
* An extension of {@link RuntimeException} that indicates a transaction was already applied by the Datastore and should
* not be retried.
*/
public class OnceOnlyWorkAlreadyAppliedException extends RuntimeException
{
private final OnceOnlyToken onceOnlyToken;
/**
*
* @param message
* @param onceOnlyToken
*/
public OnceOnlyWorkAlreadyAppliedException(final String message, final OnceOnlyToken onceOnlyToken)
{
super(message);
Preconditions.checkNotNull(onceOnlyToken);
this.onceOnlyToken = onceOnlyToken;
}
/**
* @param message
* @param cause
* @param onceOnlyToken
*/
public OnceOnlyWorkAlreadyAppliedException(final String message, Throwable cause,
final OnceOnlyToken onceOnlyToken)
{
super(message, cause);
Preconditions.checkNotNull(onceOnlyToken);
this.onceOnlyToken = onceOnlyToken;
}
/**
*
* @return
*/
public OnceOnlyToken getOnceOnlyToken()
{
return onceOnlyToken;
}
}
package com.googlecode.objectify.idempotence.examples.counters;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.Test;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.ObjectifyService;
import com.googlecode.objectify.idempotence.OnceOnlyToken;
import com.googlecode.objectify.test.util.TestBase;
/**
* Unit tests to test a once-only Work mechansim.
*/
public class OnceOnlyWorkTest extends TestBase
{
private OnceOnlyCounterIncrementExampleService example = new OnceOnlyCounterIncrementExampleService();
@Test
public void runTest()
{
ObjectifyService.factory().register(Counter.class);
ObjectifyService.factory().register(OnceOnlyToken.class);
Counter counter = new Counter("test-counter");
assertTrue(counter.getCount() == 0);
ObjectifyService.ofy().save().entity(counter);
final Key<?> counterKey = Key.create(Counter.class, counter.getCounterName());
final OnceOnlyToken onceOnlyToken = new OnceOnlyToken(counterKey);
this.example.incrementCounter(onceOnlyToken, counterKey.getName());
counter = ObjectifyService.ofy().load().entity(counter).now();
assertEquals(1, counter.getCount());
this.example.incrementCounter(onceOnlyToken, counterKey.getName());
counter = ObjectifyService.ofy().load().entity(counter).now();
assertEquals(1, counter.getCount());
this.example.incrementCounter(onceOnlyToken, counterKey.getName());
counter = ObjectifyService.ofy().load().entity(counter).now();
assertEquals(1, counter.getCount());
final OnceOnlyToken newOnceOnlyToken = new OnceOnlyToken(counterKey);
this.example.incrementCounter(newOnceOnlyToken, counterKey.getName());
counter = ObjectifyService.ofy().load().entity(counter).now();
assertEquals(2, counter.getCount());
this.example.incrementCounter(onceOnlyToken, counterKey.getName());
counter = ObjectifyService.ofy().load().entity(counter).now();
assertEquals(2, counter.getCount());
this.example.incrementCounter(newOnceOnlyToken, counterKey.getName());
counter = ObjectifyService.ofy().load().entity(counter).now();
assertEquals(2, counter.getCount());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment