Skip to content

Instantly share code, notes, and snippets.

@stickfigure
Created November 10, 2015 22:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stickfigure/1a345f9a8a56897da6a0 to your computer and use it in GitHub Desktop.
Save stickfigure/1a345f9a8a56897da6a0 to your computer and use it in GitHub Desktop.
Scatter mapping using Objectify
package com.foo.housekeeping;
import com.google.appengine.api.datastore.Entity;
import com.google.common.collect.Range;
import com.googlecode.objectify.Key;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static com.googlecode.objectify.ObjectifyService.ofy;
/**
* See:
* https://github.com/GoogleCloudPlatform/appengine-mapreduce/wiki/ScatterPropertyImplementation
* https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/java/src/main/java/com/google/appengine/tools/mapreduce/inputs/DatastoreShardStrategy.java
*/
public class Scatter {
public static final int DEFAULT_COUNT = 500;
/**
* @return a set of key ranges that represent count chunks of
*/
public static <T> List<Range<Key<T>>> splits(final Class<T> type) {
return splits(type, DEFAULT_COUNT);
}
/**
* @return a set of key ranges that represent count chunks of
*/
public static <T> List<Range<Key<T>>> splits(final Class<T> type, final int count) {
final List<Key<T>> scatteredKeys = ofy().load().type(type).order(Entity.SCATTER_RESERVED_PROPERTY).limit(count - 1).keys().list();
if (scatteredKeys.isEmpty())
return Collections.emptyList();
Collections.sort(scatteredKeys);
final List<Range<Key<T>>> points = new ArrayList<>(count);
Key<T> last = null;
for (final Key<T> here : scatteredKeys) {
final Range<Key<T>> range = last == null ? Range.atMost(here) : Range.closedOpen(last, here);
points.add(range);
last = here;
}
points.add(Range.atLeast(last));
return points;
}
}
package com.foo.housekeeping;
import com.google.appengine.api.taskqueue.DeferredTask;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.cmd.Query;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import static com.googlecode.objectify.ObjectifyService.ofy;
/**
* Takes a key range and creates task instances for each.
*
* We put this all on a configurable queue
*/
@Slf4j
@RequiredArgsConstructor
abstract public class ScatterMapperShardTask<E, T extends DeferredTask> implements DeferredTask {
private static final long serialVersionUID = 1L;
protected final Class<E> type;
protected final Range<Key<E>> range;
/** */
@Override
public final void run() {
log.info("Mapping {}", range);
Query<E> query = ofy().load().type(type);
if (range.hasLowerBound())
query = query.filterKey(">=", range.lowerEndpoint());
if (range.hasUpperBound())
query = query.filterKey("<", range.upperEndpoint());
final Iterable<Key<E>> keys = query.keys().iterable();
queue().add(Iterables.transform(keys, new Function<Key<E>, T>() {
@Override
public T apply(final Key<E> input) {
return makeTask(input);
}
}));
log.info("Finished mapping");
}
abstract protected T makeTask(Key<E> key);
abstract protected QueueHelper queue();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment