Skip to content

Instantly share code, notes, and snippets.

@shrijeet
Created November 9, 2012 19:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shrijeet/4047546 to your computer and use it in GitHub Desktop.
Save shrijeet/4047546 to your computer and use it in GitHub Desktop.
[asynchbase] Too many pending RPC
/**
* The errback associated with the row. If any of the put (one each column)
* fails, this errback will be invoked with a DeferredGroupException in
* argument.
*/
final class WriteErrback implements Callback<Object, Exception> {
public Exception call(final Exception arg) {
metrics.getWriteFailed().inc();
metrics.getWritePending().dec();
return arg;
}
public String toString() {
return "hbase write error!";
}
}
/**
* The callback success associated with the row. If a put on a row succeeds,
* this callback will be invoked.
*/
final class WriteOkback implements Callback<Object, ArrayList<Object>> {
public Object call(final ArrayList<Object> arg) {
metrics.getWriteCompleted().inc();
metrics.getWritePending().dec();
return arg;
}
public String toString() {
return "hbase write success!";
}
}
// Reusable errback for a write attempt made for a row.
final WriteErrback writeErrback = new WriteErrback();
// Reusable callback for a write attempt made for a row.
final WriteOkback writeOk = new WriteOkback();
/**
* Performs one put per column in an non blocking manner. With each put an
* errback is attached.
* <p>
* Returns a new {@link Deferred}, a demultiplexed version of multiple Deferreds (one for each put) into a single one.
*
* @see com.stumbleupon.async.Deferred#group(java.util.Collection)
*/
public Deferred<Object> nonBlockingWrite(MappableModel model,
Filter filter) {
byte[] key = toBytes(model.getKey());
HashMap<String, String> keyColumnValues = model
.getKeyColumnValues();
ArrayList<Deferred<Object>> workers = new ArrayList<Deferred<Object>>();
int rowSize = 0;
metrics.getWriteRequest().inc();
for (String modelColumn : keyColumnValues.keySet()) {
if (!model.isDirty(modelColumn)) {
continue;
}
String columnValueStr = keyColumnValues.get(modelColumn);
if (columnValueStr == null) {
continue;
}
byte[] column = toBytes(modelColumn);
byte[] columnFamily = colToFamilyMap.get(column);
byte[] columnValue = toBytes(columnValueStr);
if (columnFamily == null) {
continue;
}
String columnFamilyStr = fromBytes(columnFamily);
if (!FilterChecker.isAcceptable(filter, columnFamilyStr,
modelColumn)) {
continue;
}
final PutRequest onecolumn = (model.getTimestamp() == 0) ?
new PutRequest(tableName, key,
columnFamily, column,
columnValue) :
new PutRequest(tableName, key,
columnFamily, column,
columnValue,
model.getTimestamp());
// estimate size of row, this does not include Java object overhead.
rowSize += (key.length + columnFamily.length + column.length + columnValue.length
+ 8); // long time stamp = 8 bytes
onecolumn.setDurable(false); // skip WAL.
final class ColumnErrback implements Callback<Object, Exception> {
public Exception call(final Exception arg) {
client.pseudoPut(onecolumn);
return arg;
}
}
workers.add(client.put(onecolumn).addErrback(new ColumnErrback()));
}
metrics.getWritePending().inc();
return Deferred.group(workers).addCallbacks(writeOk, writeErrback);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment