Skip to content

Instantly share code, notes, and snippets.

@stickfigure
Last active August 29, 2015 13:57
Show Gist options
  • Save stickfigure/9895662 to your computer and use it in GitHub Desktop.
Save stickfigure/9895662 to your computer and use it in GitHub Desktop.
App Engine tasks which rewrite data for a single entity type
package blah;
import com.google.appengine.api.taskqueue.DeferredTask;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.googlecode.objectify.Key;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import st.voo.tick.Queues;
import static st.voo.tick.OfyService.ofy;
/**
* Task which kicks off rewriting a large chunk of data
*/
@Slf4j
@RequiredArgsConstructor
public class RewriteDataTask<T> extends DeferredTask {
private static final long serialVersionUID = 1L;
final Class<T> type;
/** */
@Override
public void run() {
log.warn("Starting rewrite data");
log.info("############# SPAWNING rewrite for " + type);
Iterable<Key<T>> keys = ofy().load().type(type).keys().iterable();
Queues.deflt().add(Iterables.transform(keys, new Function<Key<T>, RewriteSingleTask<T>>() {
@Override
public RewriteSingleTask<T> apply(Key<T> key) {
return makeTask(key);
}
}));
log.warn("Finished spawning rewrite data");
}
private RewriteSingleTask<T> makeTask(Key<T> key) {
return new RewriteSingleTask<T>(key) {
private static final long serialVersionUID = 1L;
@Override
protected void process(T thing) {
RewriteDataTask.this.process(thing);
}
};
}
protected void process(T thing) {
// Do nothing by default
}
}
package blah;
import com.google.appengine.api.taskqueue.DeferredTask;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.VoidWork;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import static st.voo.tick.OfyService.ofy;
/**
* Task which rewrites one single piece of data
*/
@Slf4j
@RequiredArgsConstructor
public class RewriteSingleTask<T> extends DeferredTask {
private static final long serialVersionUID = 1L;
final Key<T> key;
/** */
@Override
public void run() {
ofy().transact(new VoidWork() {
@Override
public void vrun() {
T thing = ofy().load().key(key).safe();
log.info("Processing: " + thing);
process(thing);
ofy().save().entity(thing);
}
});
}
protected void process(T thing) {
// Do nothing by default
}
}
@husayt
Copy link

husayt commented Apr 1, 2014

Jeff, why would you spawn so many tasks just for individual saves?

Wouldn't it be better to do them in batches?

Here is what I have, I might need to add transaction though:

    /**
     * This class is for cursor based processing of items
     *
     * @param <T>
     * @param <R>
     */
 public class ModelProcessor<T, R> implements DeferredTask {

    public ModelProcessor(Class<T> clz, Function<T, R> processor) {
        this.processor = processor;
        this.modelClass = clz;
    }


    private ModelProcessor(int countSoFar, Cursor cursor) {

        this.countSoFar = countSoFar;
        this.startCursor = cursor;
    }

    @Setter
    int limit = 50;


    private Class<T> modelClass;

    private Function<T, R> processor;

    @Setter
    private int offset = 0;

    private int countSoFar = 0;

    private Cursor startCursor;


    @Override
    public void run() {

        Preconditions.checkNotNull(this.processor, "Can't have null processor");
        Preconditions.checkNotNull(this.modelClass, "Can't have null modelClass");

        Stopwatch sw = Stopwatch.createStarted();

        boolean stillNotFinished = true;
        ModelProcessor.log.info("Starting processing. Done so far {} items of kind {}. ", this.countSoFar, this.modelClass.getSimpleName());
        boolean isException = false;
        try {
            do {
                Query<T> query = ofy().cache(false).load()
                        .type(this.modelClass)
                        .limit(this.limit)
                        .chunkAll();

                if (this.startCursor != null) {
                    query = query.startAt(this.startCursor);
                } else if (this.offset > 0) {
                    query = query.offset(this.offset);
                    this.countSoFar = this.offset;
                    ModelProcessor.log.info("Continuing processing. Done so far {} items of kind {}. ", this.countSoFar, this.modelClass.getSimpleName());
                }
                QueryResultIterator<T> iterator = null;

                List<R> newItems = Lists.newArrayList();
                try {
                    iterator = query.iterator();
                    if (!iterator.hasNext()) {
                        stillNotFinished = false;
                    }


                    while (iterator.hasNext()) {
                        T file = iterator.next();
                        R newItem = this.processor.apply(file);
                        if (newItem != null) {
                            newItems.add(newItem);
                        }
                        this.countSoFar++;

                    }


                } catch (ImagesServiceFailureException dxe) {
                    ModelProcessor.log.error("Exception error should continue from {}", this.countSoFar, dxe);
                } catch (Exception dxe) {
                    ModelProcessor.log.error("Exception error should stop from {}", this.countSoFar, dxe);
                    isException = true;
                } finally {
                    ModelProcessor.log.info("migrated {} files.", newItems.size());
                    ofy().cache(false).save(newItems);
                    if (iterator != null) {
                        this.startCursor = iterator.getCursor();
                    }
                }
                ModelProcessor.log.info("Processed so far {} items. ", this.countSoFar);

            } while (stillNotFinished && sw.elapsed(TimeUnit.MINUTES) < 8);
        } finally {
            sw.stop();
            ModelProcessor.log.info("Finished reading {} items in total for {}", this.countSoFar, sw.toString());
            if (stillNotFinished) {
                if (isException) {
                    ModelProcessor.log.warn("Has Stopped on item {}", this.countSoFar);
                } else {
                    ModelProcessor.log.info("Rerunning from {}", this.countSoFar);
                    TaskController.addToTaskQueue(withPayload(this));
                }
            }
        }
     }


}

@stickfigure
Copy link
Author

The transaction is necessary, otherwise you risk losing changes when a resave conflicts with normal business activity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment