Skip to content

Instantly share code, notes, and snippets.

@pawelpabich
Created August 19, 2015 22:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pawelpabich/53d5cfc3ef51a083a042 to your computer and use it in GitHub Desktop.
Save pawelpabich/53d5cfc3ef51a083a042 to your computer and use it in GitHub Desktop.
How to combined raw data from HBase
public class CompleteRowsScanner
{
private readonly HBaseClient _client;
private readonly ScannerInformation _scannerInformation;
private readonly string _restServerName;
private readonly CompleteRowFilter _filter;
public CompleteRowsScanner(HBaseClient client, ScannerInformation scannerInformation, string restServerName)
{
_client = client;
_scannerInformation = scannerInformation;
_restServerName = restServerName;
_filter = new CompleteRowFilter();
}
public async Task<ICollection<CellSet.Row>> ScannerGetNextAsync()
{
var batch = await _client.ScannerGetNextAsyncInternal(_scannerInformation, _restServerName);
return _filter.Filter(batch);
}
public bool HasIncompleteData()
{
return _filter.HasIncompleteData();
}
}
public class CompleteRowFilter
{
private CellSet.Row _possiblyIncompleteRow;
public bool HasIncompleteData()
{
return _possiblyIncompleteRow != null;
}
public ICollection<CellSet.Row> Filter(CellSet cellSet)
{
if (cellSet == null) return TheRest();
var incomplete = cellSet.rows;
if (incomplete.IsNullOrEmpty()) return TheRest();
if (_possiblyIncompleteRow != null) Merge(incomplete, _possiblyIncompleteRow);
var completeRows = TakeAllExceptLast(incomplete);
_possiblyIncompleteRow = incomplete.Last();
if (!completeRows.Any()) return null;
return completeRows;
}
private ICollection<CellSet.Row> TheRest()
{
var value = _possiblyIncompleteRow == null ? null : new[] { _possiblyIncompleteRow };
_possiblyIncompleteRow = null;
return value;
}
private static ICollection<T> TakeAllExceptLast<T>(ICollection<T> source)
{
if (!source.Any()) return source;
var count = source.Count();
return source.Take(count - 1).ToArray();
}
private void Merge(IList<CellSet.Row> rows, CellSet.Row possiblyIncompleteRow)
{
var continuationRow = rows.First();
if (possiblyIncompleteRow.key.EqualsTo(continuationRow.key))
{
continuationRow.values.InsertRange(0, possiblyIncompleteRow.values);
}
else
{
rows.Insert(0, possiblyIncompleteRow);
}
}
}
var scannerInfo = await CreateScanner(restServerName, tableName, to, startKey);
var completeRowScanner = new CompleteRowsScanner(_client, scannerInfo, restServerName);
try
{
ICollection<CellSet.Row> completeRows;
while (!(completeRows = await completeRowScanner.ScannerGetNextAsync()).IsNullOrEmpty() ||
completeRowScanner.HasIncompleteData())
{
if (completeRows.IsNullOrEmpty()) continue;
await processBatch(completeRows);
}
}
finally
{
//ToDo: Use await once we are on C# 6.0
_client.DeleteScannerAsync(scannerInfo.TableName, scannerInfo.ScannerId).Wait();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment