Created
November 9, 2012 19:02
-
-
Save shrijeet/4047546 to your computer and use it in GitHub Desktop.
[asynchbase] Too many pending RPC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* The errback associated with the row. If any of the put (one each column) | |
* fails, this errback will be invoked with a DeferredGroupException in | |
* argument. | |
*/ | |
final class WriteErrback implements Callback<Object, Exception> { | |
public Exception call(final Exception arg) { | |
metrics.getWriteFailed().inc(); | |
metrics.getWritePending().dec(); | |
return arg; | |
} | |
public String toString() { | |
return "hbase write error!"; | |
} | |
} | |
/** | |
* The callback success associated with the row. If a put on a row succeeds, | |
* this callback will be invoked. | |
*/ | |
final class WriteOkback implements Callback<Object, ArrayList<Object>> { | |
public Object call(final ArrayList<Object> arg) { | |
metrics.getWriteCompleted().inc(); | |
metrics.getWritePending().dec(); | |
return arg; | |
} | |
public String toString() { | |
return "hbase write success!"; | |
} | |
} | |
// Reusable errback for a write attempt made for a row. | |
final WriteErrback writeErrback = new WriteErrback(); | |
// Reusable callback for a write attempt made for a row. | |
final WriteOkback writeOk = new WriteOkback(); | |
/** | |
* Performs one put per column in an non blocking manner. With each put an | |
* errback is attached. | |
* <p> | |
* Returns a new {@link Deferred}, a demultiplexed version of multiple Deferreds (one for each put) into a single one. | |
* | |
* @see com.stumbleupon.async.Deferred#group(java.util.Collection) | |
*/ | |
public Deferred<Object> nonBlockingWrite(MappableModel model, | |
Filter filter) { | |
byte[] key = toBytes(model.getKey()); | |
HashMap<String, String> keyColumnValues = model | |
.getKeyColumnValues(); | |
ArrayList<Deferred<Object>> workers = new ArrayList<Deferred<Object>>(); | |
int rowSize = 0; | |
metrics.getWriteRequest().inc(); | |
for (String modelColumn : keyColumnValues.keySet()) { | |
if (!model.isDirty(modelColumn)) { | |
continue; | |
} | |
String columnValueStr = keyColumnValues.get(modelColumn); | |
if (columnValueStr == null) { | |
continue; | |
} | |
byte[] column = toBytes(modelColumn); | |
byte[] columnFamily = colToFamilyMap.get(column); | |
byte[] columnValue = toBytes(columnValueStr); | |
if (columnFamily == null) { | |
continue; | |
} | |
String columnFamilyStr = fromBytes(columnFamily); | |
if (!FilterChecker.isAcceptable(filter, columnFamilyStr, | |
modelColumn)) { | |
continue; | |
} | |
final PutRequest onecolumn = (model.getTimestamp() == 0) ? | |
new PutRequest(tableName, key, | |
columnFamily, column, | |
columnValue) : | |
new PutRequest(tableName, key, | |
columnFamily, column, | |
columnValue, | |
model.getTimestamp()); | |
// estimate size of row, this does not include Java object overhead. | |
rowSize += (key.length + columnFamily.length + column.length + columnValue.length | |
+ 8); // long time stamp = 8 bytes | |
onecolumn.setDurable(false); // skip WAL. | |
final class ColumnErrback implements Callback<Object, Exception> { | |
public Exception call(final Exception arg) { | |
client.pseudoPut(onecolumn); | |
return arg; | |
} | |
} | |
workers.add(client.put(onecolumn).addErrback(new ColumnErrback())); | |
} | |
metrics.getWritePending().inc(); | |
return Deferred.group(workers).addCallbacks(writeOk, writeErrback); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment