Skip to content

Instantly share code, notes, and snippets.

@tsuna
Created May 16, 2013 22:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tsuna/5595476 to your computer and use it in GitHub Desktop.
Save tsuna/5595476 to your computer and use it in GitHub Desktop.
Complex async scanning and processing with asynchbase
// This code is untested, may not even compile, but illustrates the idea.
class ComplexAsyncScanLoopDemo {
public Deferred<Object> scanAndProcess(final String start, final String stop) {
// This is the Deferred that the caller will wait on until everything
// we're doing has completed. If there was an object we wanted to return
// asynchronously to them as a result of whatever we're doing, we'd hand
// it to this Deferred once we're done.
final Deferred<Object> result = new Deferred<Object>();
final Scanner scanner = client.newScanner();
// Setup scanner...
scanner.setStartKey(start);
scanner.setStopKey(stop);
final int max_in_flight = 8;
// We use this counter to keep track of how many more batches we can
// process concurrently. When this counter drops down to 0, then we
// have max_in_flight batches running, so we'll have to pause scanning
// until this counter returns to 1.
final AtomicInteger inflight_slots = new AtomicInteger(max_in_flight);
// Callback that iterates asynchronously over the rows returned by the
// scanner.
class ScanLoop implements Callback<Object, ArrayList<ArrayList<KeyValue>>> {
boolean done = false; // Has our scanner reached the end already?
// What's the callback that should get triggered upon completion of a
// batch of work.
private final Callback<Object, Object> processingcb;
ScanLoop(Callback<Object, Object> processingcb) {
this.processingcb = processingcb;
}
void scan() {
scanner.nextRows().addCallback(ScanLoop.this);
}
public Object call(final ArrayList<ArrayList<KeyValue>> rows) {
if (rows == null) { // We've reached the end of the scanner.
done = true; // So remember that here. We're not 100% done yet,
return null; // some batches may still be in flight.
}
// Kick off your deletes or whatever you want.
process(rows).addCallback(processingcb);
int slots_left = inflight_slots.get();
assert slots_left >= 0; // This cannot possibly become negative.
if (slots_left != 0) { // Can we go fetch more data?
inflight_slots.decrementAndGet(); // Yes, so decrement and...
scan(); // ... kick off another batch.
} // else: already as many batches in flight as we allow,
// so "pause" scanning by not kicking off another scan().
return null;
}
}
// Whenever a batch of work completes, this callback gets executed.
class BatchCompletedCB implements Callback<Object, Object> {
public Object call(final Object unused) {
// Start by signaling that a batch completed by incrementing our
// counter (remember the counter indicates how many more batches
// can still be scheduled according to our limit).
int slots_left = inflight_slots.incrementAndGet();
if (slots_left == 1 && !loop.done) {
// if we get here it's because slots_left was zero, and we're the
// first ones to increment it so we got one. If slots_left was
// zero, it means that scanning had ceased as we had too many
// batches in flight at the same time. So we should resume scanning
// at this point:
loop.scan();
} else if (slots_left == max_in_flight && loop.done) {
// if we get here it's because we're the last batch to complete: the
// scanning loop indicated it was done scanning everything, and
// we're the last one to increment the counter, thereby returning
// its value to where it started from, at `max_in_flight'.
result.callback(null); // So indicate our caller we're all done.
}
return null;
}
}
final ScanLoop loop = new ScanLoop(new BatchCompletedCB());
loop.scan(); // Kick off the whole dance.
return result;
}
}
@tsuna
Copy link
Author

tsuna commented May 16, 2013

I think there is a subtle race in that code. The boolean done should be made volatile, and upon setting done = true in ScanLoop we need to check whether slots_left == max_in_flight, in which case we need to call result.callback(null);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment