Skip to content

Instantly share code, notes, and snippets.

@phrocker
Last active December 23, 2021 10:38
Show Gist options
  • Save phrocker/dbe741082d8d2731032a5c6879e3d066 to your computer and use it in GitHub Desktop.
Save phrocker/dbe741082d8d2731032a5c6879e3d066 to your computer and use it in GitHub Desktop.
package datawave.query.tables;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import datawave.data.type.NoOpType;
import datawave.query.DocumentSerialization;
import datawave.query.attributes.Document;
import datawave.query.attributes.TypeAttribute;
import datawave.query.function.json.deser.JsonDeser;
import org.apache.accumulo.core.client.*;
import org.apache.accumulo.core.clientImpl.*;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.*;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.*;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.OpTimer;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class DocumentScan implements Iterator<Document> {
private static final Logger log = LoggerFactory.getLogger(DocumentScan.class);
private final ClientContext context;
private final TableId tableId;
private final DocumentSerialization.ReturnType returnType;
private final boolean docRawFields;
private int maxTabletsPerThread=0;
private Authorizations authorizations = Authorizations.EMPTY;
private final int numThreads;
private final ExecutorService queryThreadPool;
private final ScannerOptions options;
private ArrayBlockingQueue<List<Document>> resultsQueue;
private Iterator<Document> batchIterator;
private List<Document> batch;
private static final List<Document> LAST_BATCH = new ArrayList<>();
private final Object nextLock = new Object();
static final transient Kryo kryo = new Kryo();
static final JsonDeser jsonDeser = new JsonDeser();
static final JsonParser jsonParser = new JsonParser();
private long failSleepTime = 100;
private volatile Throwable fatalException = null;
//private Map<String, TimeoutTracker> timeoutTrackers;
//private Set<String> timedoutServers;
private final long timeout;
private TabletLocator locator;
public interface ResultReceiver {
void receive(List<Document> entries);
}
public DocumentScan(ClientContext context, TableId tableId,
Authorizations authorizations, ArrayList<Range> ranges, int numThreads,
ExecutorService queryThreadPool, ScannerOptions scannerOptions, long timeout, boolean printOutput, DocumentSerialization.ReturnType returnType, boolean docRawFields, int queueCapacity, int maxTabletsPerThread) {
this.context = context;
this.tableId = tableId;
this.authorizations = authorizations;
this.numThreads = numThreads;
this.queryThreadPool = queryThreadPool;
this.options = new ScannerOptions(scannerOptions);
resultsQueue = new ArrayBlockingQueue<>(queueCapacity);
this.returnType = returnType;
this.docRawFields=docRawFields;
this.maxTabletsPerThread = maxTabletsPerThread;
this.locator = new TimeoutTabletLocator(timeout, context, tableId);
this.timeout = timeout;
if (options.getFetchedColumns().size() > 0) {
ArrayList<Range> ranges2 = new ArrayList<>(ranges.size());
for (Range range : ranges) {
ranges2.add(range.bound(options.getFetchedColumns().first(), options.getFetchedColumns().last()));
}
ranges = ranges2;
}
ResultReceiver rr = printOutput ? entries -> {
try {
System.out.println("Received " + entries.size() + " " + resultsQueue.size());
resultsQueue.put(entries);
} catch (InterruptedException e) {
if (this.queryThreadPool.isShutdown())
log.debug("Failed to add Batch Scan result", e);
else
log.warn("Failed to add Batch Scan result", e);
fatalException = e;
throw new RuntimeException(e);
}
} : entries -> {
try {
resultsQueue.put(entries);
} catch (InterruptedException e) {
if (this.queryThreadPool.isShutdown())
log.debug("Failed to add Batch Scan result", e);
else
log.warn("Failed to add Batch Scan result", e);
fatalException = e;
throw new RuntimeException(e);
}
};
try {
lookup(ranges, rr);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new RuntimeException("Failed to create iterator", e);
}
}
@Override
public boolean hasNext() {
synchronized (nextLock) {
if (batch == LAST_BATCH)
return false;
if (batch != null && batchIterator.hasNext())
return true;
// don't have one cached, try to cache one and return success
try {
batch = null;
while (batch == null && fatalException == null && !queryThreadPool.isShutdown())
batch = resultsQueue.poll(5, TimeUnit.MILLISECONDS);
if (fatalException != null)
if (fatalException instanceof RuntimeException)
throw (RuntimeException) fatalException;
else
throw new RuntimeException(fatalException);
if (queryThreadPool.isShutdown()) {
String shortMsg =
"The BatchScanner was unexpectedly closed while" + " this Iterator was still in use.";
log.error("{} Ensure that a reference to the BatchScanner is retained"
+ " so that it can be closed when this Iterator is exhausted. Not"
+ " retaining a reference to the BatchScanner guarantees that you are"
+ " leaking threads in your client JVM.", shortMsg);
throw new RuntimeException(shortMsg + " Ensure proper handling of the BatchScanner.");
}
batchIterator = batch.iterator();
return batch != LAST_BATCH;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
@Override
public Document next() {
// if there's one waiting, or hasNext() can get one, return it
synchronized (nextLock) {
if (hasNext())
return batchIterator.next();
else
throw new NoSuchElementException();
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
private synchronized void lookup(List<Range> ranges, ResultReceiver receiver)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
List<Column> columns = new ArrayList<>(options.getFetchedColumns());
ranges = Range.mergeOverlapping(ranges);
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
binRanges(locator, ranges, binnedRanges);
doLookups(binnedRanges, receiver, columns);
}
private void binRanges(TabletLocator tabletLocator, List<Range> ranges,
Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
int lastFailureSize = Integer.MAX_VALUE;
while (true) {
binnedRanges.clear();
List<Range> failures = tabletLocator.binRanges(context, ranges, binnedRanges);
if (failures.size() > 0) {
// tried to only do table state checks when failures.size() == ranges.size(), however this
// did
// not work because nothing ever invalidated entries in the tabletLocator cache... so even
// though
// the table was deleted the tablet locator entries for the deleted table were not
// cleared... so
// need to always do the check when failures occur
if (failures.size() >= lastFailureSize)
if (!Tables.exists(context, tableId))
throw new TableDeletedException(tableId.canonical());
else if (Tables.getTableState(context, tableId) == TableState.OFFLINE)
throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId));
lastFailureSize = failures.size();
if (log.isTraceEnabled())
log.trace("Failed to bin {} ranges, tablet locations were null, retrying in 100ms",
failures.size());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
break;
}
}
// truncate the ranges to within the tablets... this makes it easier to know what work
// needs to be redone when failures occurs and tablets have merged or split
Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>();
for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
Map<KeyExtent,List<Range>> tabletMap = new HashMap<>();
binnedRanges2.put(entry.getKey(), tabletMap);
for (Entry<KeyExtent,List<Range>> tabletRanges : entry.getValue().entrySet()) {
Range tabletRange = tabletRanges.getKey().toDataRange();
List<Range> clippedRanges = new ArrayList<>();
tabletMap.put(tabletRanges.getKey(), clippedRanges);
for (Range range : tabletRanges.getValue())
clippedRanges.add(tabletRange.clip(range));
}
}
binnedRanges.clear();
binnedRanges.putAll(binnedRanges2);
}
private void processFailures(Map<KeyExtent,List<Range>> failures, ResultReceiver receiver,
List<Column> columns)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
if (log.isTraceEnabled())
log.trace("Failed to execute multiscans against {} tablets, retrying...", failures.size());
try {
Thread.sleep(failSleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// We were interrupted (close called on batchscanner) just exit
log.debug("Exiting failure processing on interrupt");
return;
}
failSleepTime = Math.min(5000, failSleepTime * 2);
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
List<Range> allRanges = new ArrayList<>();
for (List<Range> ranges : failures.values())
allRanges.addAll(ranges);
// since the first call to binRanges clipped the ranges to within a tablet, we should not get
// only
// bin to the set of failed tablets
binRanges(locator, allRanges, binnedRanges);
doLookups(binnedRanges, receiver, columns);
}
private String getTableInfo() {
return Tables.getPrintableTableInfoFromId(context, tableId);
}
private class QueryTask implements Runnable {
private String tsLocation;
private Map<KeyExtent,List<Range>> tabletsRanges;
private ResultReceiver receiver;
private Semaphore semaphore = null;
private final Map<KeyExtent,List<Range>> failures;
private List<Column> columns;
private int semaphoreSize;
QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges,
Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns, DocumentSerialization.ReturnType returnType) {
this.tsLocation = tsLocation;
this.tabletsRanges = tabletsRanges;
this.receiver = receiver;
this.columns = columns;
this.failures = failures;
}
void setSemaphore(Semaphore semaphore, int semaphoreSize) {
this.semaphore = semaphore;
this.semaphoreSize = semaphoreSize;
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
Thread.currentThread()
.setName(threadName + " looking up " + tabletsRanges.size() + " ranges at " + tsLocation);
Map<KeyExtent,List<Range>> unscanned = new HashMap<>();
Map<KeyExtent,List<Range>> tsFailures = new HashMap<>();
try {
/*
TimeoutTracker timeoutTracker = timeoutTrackers.get(tsLocation);
if (timeoutTracker == null) {
timeoutTracker = new TimeoutTracker(tsLocation, timedoutServers, timeout);
timeoutTrackers.put(tsLocation, timeoutTracker);
}
*/
doLookup(context, tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns,
options, authorizations, returnType,docRawFields);
if (tsFailures.size() > 0) {
locator.invalidateCache(tsFailures.keySet());
synchronized (failures) {
failures.putAll(tsFailures);
}
}
} catch (IOException e) {
e.printStackTrace();
if (!DocumentScan.this.queryThreadPool.isShutdown()) {
synchronized (failures) {
failures.putAll(tsFailures);
failures.putAll(unscanned);
}
locator.invalidateCache(context, tsLocation);
}
log.debug("IOException thrown", e);
} catch (AccumuloSecurityException e) {
e.printStackTrace();
e.setTableInfo(getTableInfo());
log.debug("AccumuloSecurityException thrown", e);
Tables.clearCache(context);
if (!Tables.exists(context, tableId))
fatalException = new TableDeletedException(tableId.canonical());
else
fatalException = e;
} catch (SampleNotPresentException e) {
fatalException = e;
} catch (Throwable t) {
t.printStackTrace();
if (queryThreadPool.isShutdown())
log.debug("Caught exception, but queryThreadPool is shutdown", t);
else
log.warn("Caught exception, but queryThreadPool is not shutdown", t);
fatalException = t;
} finally {
semaphore.release();
Thread.currentThread().setName(threadName);
if (semaphore.tryAcquire(semaphoreSize)) {
// finished processing all queries
if (fatalException == null && failures.size() > 0) {
// there were some failures
try {
processFailures(failures, receiver, columns);
} catch (TableNotFoundException | AccumuloException e) {
log.debug("{}", e.getMessage(), e);
fatalException = e;
} catch (AccumuloSecurityException e) {
e.setTableInfo(getTableInfo());
log.debug("{}", e.getMessage(), e);
fatalException = e;
} catch (Throwable t) {
log.debug("{}", t.getMessage(), t);
fatalException = t;
}
if (fatalException != null) {
// we are finished with this batch query
if (!resultsQueue.offer(LAST_BATCH)) {
log.debug(
"Could not add to result queue after seeing fatalException in processFailures",
fatalException);
}
}
} else {
// we are finished with this batch query
if (fatalException != null) {
if (!resultsQueue.offer(LAST_BATCH)) {
log.debug("Could not add to result queue after seeing fatalException",
fatalException);
}
} else {
try {
resultsQueue.put(LAST_BATCH);
} catch (InterruptedException e) {
fatalException = e;
if (!resultsQueue.offer(LAST_BATCH)) {
log.debug("Could not add to result queue after seeing fatalException",
fatalException);
}
}
}
}
}
}
}
}
private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
final ResultReceiver receiver, List<Column> columns) {
/*
if (timedoutServers.containsAll(binnedRanges.keySet())) {
// all servers have timed out
throw new TimedOutException(timedoutServers);
}*/
// when there are lots of threads and a few tablet servers
// it is good to break request to tablet servers up, the
// following code determines if this is the case
int maxTabletsPerRequest = Integer.MAX_VALUE;
System.out.println("maxTabletsPerRequest " + numThreads + " " + binnedRanges.size());
if (numThreads / binnedRanges.size() > 1) {
int totalNumberOfTablets = 0;
for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
totalNumberOfTablets += entry.getValue().size();
}
maxTabletsPerRequest = totalNumberOfTablets / (numThreads);
System.out.println("maxTabletsPerRequest is now " + maxTabletsPerRequest + " " + totalNumberOfTablets);
if (maxTabletsPerRequest == 0) {
maxTabletsPerRequest = 1;
}
}
if (maxTabletsPerThread > 0 ){
maxTabletsPerRequest = maxTabletsPerThread;
}
Map<KeyExtent,List<Range>> failures = new HashMap<>();
/*
if (timedoutServers.size() > 0) {
// go ahead and fail any timed out servers
for (Iterator<Entry<String,Map<KeyExtent,List<Range>>>> iterator =
binnedRanges.entrySet().iterator(); iterator.hasNext();) {
Entry<String,Map<KeyExtent,List<Range>>> entry = iterator.next();
if (timedoutServers.contains(entry.getKey())) {
failures.putAll(entry.getValue());
iterator.remove();
}
}
}*/
// randomize tabletserver order... this will help when there are multiple
// batch readers and writers running against accumulo
List<String> locations = new ArrayList<>(binnedRanges.keySet());
Collections.shuffle(locations);
List<QueryTask> queryTasks = new ArrayList<>();
for (final String tsLocation : locations) {
final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns, this.returnType);
queryTasks.add(queryTask);
} else {
HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<>();
for (Entry<KeyExtent,List<Range>> entry : tabletsRanges.entrySet()) {
tabletSubset.put(entry.getKey(), entry.getValue());
if (tabletSubset.size() >= maxTabletsPerRequest) {
QueryTask queryTask =
new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, this.returnType);
queryTasks.add(queryTask);
tabletSubset = new HashMap<>();
}
}
if (tabletSubset.size() > 0) {
QueryTask queryTask =
new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, this.returnType);
queryTasks.add(queryTask);
}
}
}
final Semaphore semaphore = new Semaphore(queryTasks.size());
semaphore.acquireUninterruptibly(queryTasks.size());
for (QueryTask queryTask : queryTasks) {
queryTask.setSemaphore(semaphore, queryTasks.size());
queryThreadPool.execute(queryTask);
}
}
static void trackScanning(Map<KeyExtent,List<Range>> failures,
Map<KeyExtent,List<Range>> unscanned, MultiScanResult scanResult) {
// translate returned failures, remove them from unscanned, and add them to failures
Map<KeyExtent,List<Range>> retFailures = Translator.translate(scanResult.failures,
Translators.TKET, new Translator.ListTranslator<>(Translators.TRT));
unscanned.keySet().removeAll(retFailures.keySet());
failures.putAll(retFailures);
// translate full scans and remove them from unscanned
HashSet<KeyExtent> fullScans =
new HashSet<>(Translator.translate(scanResult.fullScans, Translators.TKET));
unscanned.keySet().removeAll(fullScans);
// remove partial scan from unscanned
if (scanResult.partScan != null) {
KeyExtent ke = new KeyExtent(scanResult.partScan);
Key nextKey = new Key(scanResult.partNextKey);
ListIterator<Range> iterator = unscanned.get(ke).listIterator();
while (iterator.hasNext()) {
Range range = iterator.next();
if (range.afterEndKey(nextKey) || (nextKey.equals(range.getEndKey())
&& scanResult.partNextKeyInclusive != range.isEndKeyInclusive())) {
iterator.remove();
} else if (range.contains(nextKey)) {
iterator.remove();
Range partRange = new Range(nextKey, scanResult.partNextKeyInclusive, range.getEndKey(),
range.isEndKeyInclusive());
iterator.add(partRange);
}
}
}
}
static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested,
Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned,
ResultReceiver receiver, List<Column> columns, ScannerOptions options,
Authorizations authorizations /* , TimeoutTracker timeoutTracker */, DocumentSerialization.ReturnType returnType, boolean docRawFields)
throws IOException, AccumuloSecurityException, AccumuloServerException {
if (requested.size() == 0) {
return;
}
// copy requested to unscanned map. we will remove ranges as they are scanned in trackScanning()
for (Entry<KeyExtent,List<Range>> entry : requested.entrySet()) {
ArrayList<Range> ranges = new ArrayList<>();
for (Range range : entry.getValue()) {
ranges.add(new Range(range));
}
unscanned.put(new KeyExtent(entry.getKey()), ranges);
}
// timeoutTracker.startingScan();
TTransport transport = null;
try {
final HostAndPort parsedServer = HostAndPort.fromString(server);
final TabletClientService.Client client;
// if (timeoutTracker.getTimeOut() < context.getClientTimeoutInMillis())
// client = ThriftUtil.getTServerClient(parsedServer, context, timeoutTracker.getTimeOut());
// else
client = ThriftUtil.getTServerClient(parsedServer, context);
MyScannerOptions opts = new MyScannerOptions(options);
try {
OpTimer timer = null;
TabletType ttype = TabletType.type(requested.keySet());
boolean waitForWrites = !ThriftScanner.serversWaitedForWrites.get(ttype).contains(server);
Map<TKeyExtent,List<TRange>> thriftTabletRanges = Translator.translate(requested,
Translators.KET, new Translator.ListTranslator<>(Translators.RT));
Map<String,String> execHints = null;
InitialMultiScan imsr = client.startMultiScan(TraceUtil.traceInfo(), context.rpcCreds(),
thriftTabletRanges, Translator.translate(columns, Translators.CT),
opts.getServerSideIteratorList(), opts.getServerSideIteratorOptions(),
ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites,
SamplerConfigurationImpl.toThrift(options.getSamplerConfiguration()),
Long.MAX_VALUE, options.getClassLoaderContext(), execHints);
if (waitForWrites)
ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString());
MultiScanResult scanResult = imsr.result;
if (timer != null) {
timer.stop();
log.trace("tid={} Got 1st multi scan results, #results={} {} in {}",
Thread.currentThread().getId(), scanResult.results.size(),
(scanResult.more ? "scanID=" + imsr.scanID : ""),
String.format("%.3f secs", timer.scale(TimeUnit.SECONDS)));
}
/*
ArrayList<Document> entries = new ArrayList<>(scanResult.results.size());
for (TKeyValue kv : scanResult.results) {
entries.add( getDocument(returnType,docRawFields,kv));
//new SimpleImmutableEntry<>(new Key(kv.key), new Value(kv.value.array(),false)));
}*/
if (scanResult.results.size() > 0)
receiver.receive(scanResult.results.parallelStream().map( x -> {
return getDocument(returnType,docRawFields,x);
}).collect(Collectors.toList()));
// if (entries.size() > 0 || scanResult.fullScans.size() > 0)
// timeoutTracker.madeProgress();
trackScanning(failures, unscanned, scanResult);
AtomicLong nextOpid = new AtomicLong();
while (scanResult.more) {
//timeoutTracker.check();
if (timer != null) {
log.trace("tid={} oid={} Continuing multi scan, scanid={}",
Thread.currentThread().getId(), nextOpid.get(), imsr.scanID);
timer.reset().start();
}
scanResult = client.continueMultiScan(TraceUtil.traceInfo(), imsr.scanID);
if (timer != null) {
timer.stop();
log.trace("tid={} oid={} Got more multi scan results, #results={} {} in {}",
Thread.currentThread().getId(), nextOpid.getAndIncrement(),
scanResult.results.size(), (scanResult.more ? " scanID=" + imsr.scanID : ""),
String.format("%.3f secs", timer.scale(TimeUnit.SECONDS)));
}
/*
entries = new ArrayList<>(scanResult.results.size());
for (TKeyValue kv : scanResult.results) {
entries.add( getDocument(returnType,docRawFields,kv));
//entries.add(new SimpleImmutableEntry<>(new Key(kv.key), new Value(kv.value.array(),false)));
}
if (entries.size() > 0)
receiver.receive(entries);
*/
if (scanResult.results.size() > 0)
receiver.receive(scanResult.results.parallelStream().map( x -> {
return getDocument(returnType,docRawFields,x);
}).collect(Collectors.toList()));
trackScanning(failures, unscanned, scanResult);
}
client.closeMultiScan(TraceUtil.traceInfo(), imsr.scanID);
} finally {
ThriftUtil.returnClient(client);
}
} catch (TTransportException e) {
log.debug("Server : {} msg : {}", server, e.getMessage());
// timeoutTracker.errorOccured();
throw new IOException(e);
} catch (ThriftSecurityException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (TApplicationException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
throw new AccumuloServerException(server, e);
} catch (NoSuchScanIDException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
throw new IOException(e);
} catch (TSampleNotPresentException e) {
log.debug("Server : " + server + " msg : " + e.getMessage(), e);
String tableInfo = "?";
if (e.getExtent() != null) {
TableId tableId = new KeyExtent(e.getExtent()).getTableId();
tableInfo = Tables.getPrintableTableInfoFromId(context, tableId);
}
String message = "Table " + tableInfo + " does not have sampling configured or built";
throw new SampleNotPresentException(message, e);
} catch (TException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
// timeoutTracker.errorOccured();
throw new IOException(e);
} finally {
ThriftTransportPool.getInstance().returnTransport(transport);
}
}
static Document getDocument(DocumentSerialization.ReturnType returnType,boolean docRawFields, TKeyValue kv){
Document document = null;
byte [] array = kv.value.array();
int offset = 0;
int size = array.length;
if ( !org.apache.thrift.TBaseHelper.wrapsFullArray(kv.value)){
size = kv.value.remaining();
offset = kv.value.arrayOffset() + kv.value.position();
}
if (DocumentSerialization.ReturnType.kryo == returnType) {
Input input = new Input(array,offset,size);
document = kryo.readObject(input, Document.class);
if (null == document) {
throw new RuntimeException("Deserialized null Document");
}
input.close();
}
else if (DocumentSerialization.ReturnType.json == returnType) {
InputStream jsonStream = new ByteArrayInputStream(array, offset+3, size - 3);
if (!docRawFields) {
Reader rdr = new InputStreamReader(jsonStream);
//Reader rdr = new InputStreamReader(DocumentSerialization.consumeHeader(kv.value.array()));
JsonObject jsonObject = jsonParser.parse(rdr).getAsJsonObject();
document = jsonDeser.deserialize(jsonObject, null, null);
}
else{
document = new Document();
TypeAttribute<?> attr = new TypeAttribute(new NoOpType(jsonStream.toString()),null,true);
document.put("rawjson",attr);
}
}
else{
throw new UnsupportedOperationException("only kryo and json are supported");
}
return document;
}
static int sumSizes(Collection<List<Range>> values) {
int sum = 0;
for (List<Range> list : values) {
sum += list.size();
}
return sum;
}
public static class MyScannerOptions extends ScannerOptions{
public MyScannerOptions(ScannerOptions opts){
super(opts);
}
public List<IterInfo> getServerSideIteratorList(){
return serverSideIteratorList;
}
public Map<String,Map<String,String>> getServerSideIteratorOptions(){
return serverSideIteratorOptions;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment