Created
August 19, 2015 22:44
-
-
Save pawelpabich/53d5cfc3ef51a083a042 to your computer and use it in GitHub Desktop.
How to combined raw data from HBase
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class CompleteRowsScanner | |
{ | |
private readonly HBaseClient _client; | |
private readonly ScannerInformation _scannerInformation; | |
private readonly string _restServerName; | |
private readonly CompleteRowFilter _filter; | |
public CompleteRowsScanner(HBaseClient client, ScannerInformation scannerInformation, string restServerName) | |
{ | |
_client = client; | |
_scannerInformation = scannerInformation; | |
_restServerName = restServerName; | |
_filter = new CompleteRowFilter(); | |
} | |
public async Task<ICollection<CellSet.Row>> ScannerGetNextAsync() | |
{ | |
var batch = await _client.ScannerGetNextAsyncInternal(_scannerInformation, _restServerName); | |
return _filter.Filter(batch); | |
} | |
public bool HasIncompleteData() | |
{ | |
return _filter.HasIncompleteData(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class CompleteRowFilter | |
{ | |
private CellSet.Row _possiblyIncompleteRow; | |
public bool HasIncompleteData() | |
{ | |
return _possiblyIncompleteRow != null; | |
} | |
public ICollection<CellSet.Row> Filter(CellSet cellSet) | |
{ | |
if (cellSet == null) return TheRest(); | |
var incomplete = cellSet.rows; | |
if (incomplete.IsNullOrEmpty()) return TheRest(); | |
if (_possiblyIncompleteRow != null) Merge(incomplete, _possiblyIncompleteRow); | |
var completeRows = TakeAllExceptLast(incomplete); | |
_possiblyIncompleteRow = incomplete.Last(); | |
if (!completeRows.Any()) return null; | |
return completeRows; | |
} | |
private ICollection<CellSet.Row> TheRest() | |
{ | |
var value = _possiblyIncompleteRow == null ? null : new[] { _possiblyIncompleteRow }; | |
_possiblyIncompleteRow = null; | |
return value; | |
} | |
private static ICollection<T> TakeAllExceptLast<T>(ICollection<T> source) | |
{ | |
if (!source.Any()) return source; | |
var count = source.Count(); | |
return source.Take(count - 1).ToArray(); | |
} | |
private void Merge(IList<CellSet.Row> rows, CellSet.Row possiblyIncompleteRow) | |
{ | |
var continuationRow = rows.First(); | |
if (possiblyIncompleteRow.key.EqualsTo(continuationRow.key)) | |
{ | |
continuationRow.values.InsertRange(0, possiblyIncompleteRow.values); | |
} | |
else | |
{ | |
rows.Insert(0, possiblyIncompleteRow); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var scannerInfo = await CreateScanner(restServerName, tableName, to, startKey); | |
var completeRowScanner = new CompleteRowsScanner(_client, scannerInfo, restServerName); | |
try | |
{ | |
ICollection<CellSet.Row> completeRows; | |
while (!(completeRows = await completeRowScanner.ScannerGetNextAsync()).IsNullOrEmpty() || | |
completeRowScanner.HasIncompleteData()) | |
{ | |
if (completeRows.IsNullOrEmpty()) continue; | |
await processBatch(completeRows); | |
} | |
} | |
finally | |
{ | |
//ToDo: Use await once we are on C# 6.0 | |
_client.DeleteScannerAsync(scannerInfo.TableName, scannerInfo.ScannerId).Wait(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment