Last active
December 23, 2021 10:38
-
-
Save phrocker/dbe741082d8d2731032a5c6879e3d066 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package datawave.query.tables; | |
import com.esotericsoftware.kryo.Kryo; | |
import com.esotericsoftware.kryo.io.Input; | |
import com.google.gson.JsonObject; | |
import com.google.gson.JsonParser; | |
import datawave.data.type.NoOpType; | |
import datawave.query.DocumentSerialization; | |
import datawave.query.attributes.Document; | |
import datawave.query.attributes.TypeAttribute; | |
import datawave.query.function.json.deser.JsonDeser; | |
import org.apache.accumulo.core.client.*; | |
import org.apache.accumulo.core.clientImpl.*; | |
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; | |
import org.apache.accumulo.core.data.*; | |
import org.apache.accumulo.core.dataImpl.KeyExtent; | |
import org.apache.accumulo.core.dataImpl.thrift.*; | |
import org.apache.accumulo.core.master.state.tables.TableState; | |
import org.apache.accumulo.core.rpc.ThriftUtil; | |
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; | |
import org.apache.accumulo.core.security.Authorizations; | |
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; | |
import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException; | |
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; | |
import org.apache.accumulo.core.trace.TraceUtil; | |
import org.apache.accumulo.core.util.ByteBufferUtil; | |
import org.apache.accumulo.core.util.HostAndPort; | |
import org.apache.accumulo.core.util.OpTimer; | |
import org.apache.thrift.TApplicationException; | |
import org.apache.thrift.TException; | |
import org.apache.thrift.transport.TTransport; | |
import org.apache.thrift.transport.TTransportException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.*; | |
import java.nio.ByteBuffer; | |
import java.util.AbstractMap.SimpleImmutableEntry; | |
import java.util.*; | |
import java.util.Map.Entry; | |
import java.util.concurrent.*; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.stream.Collectors; | |
public class DocumentScan implements Iterator<Document> { | |
private static final Logger log = LoggerFactory.getLogger(DocumentScan.class); | |
private final ClientContext context; | |
private final TableId tableId; | |
private final DocumentSerialization.ReturnType returnType; | |
private final boolean docRawFields; | |
private int maxTabletsPerThread=0; | |
private Authorizations authorizations = Authorizations.EMPTY; | |
private final int numThreads; | |
private final ExecutorService queryThreadPool; | |
private final ScannerOptions options; | |
private ArrayBlockingQueue<List<Document>> resultsQueue; | |
private Iterator<Document> batchIterator; | |
private List<Document> batch; | |
private static final List<Document> LAST_BATCH = new ArrayList<>(); | |
private final Object nextLock = new Object(); | |
static final transient Kryo kryo = new Kryo(); | |
static final JsonDeser jsonDeser = new JsonDeser(); | |
static final JsonParser jsonParser = new JsonParser(); | |
private long failSleepTime = 100; | |
private volatile Throwable fatalException = null; | |
//private Map<String, TimeoutTracker> timeoutTrackers; | |
//private Set<String> timedoutServers; | |
private final long timeout; | |
private TabletLocator locator; | |
public interface ResultReceiver { | |
void receive(List<Document> entries); | |
} | |
public DocumentScan(ClientContext context, TableId tableId, | |
Authorizations authorizations, ArrayList<Range> ranges, int numThreads, | |
ExecutorService queryThreadPool, ScannerOptions scannerOptions, long timeout, boolean printOutput, DocumentSerialization.ReturnType returnType, boolean docRawFields, int queueCapacity, int maxTabletsPerThread) { | |
this.context = context; | |
this.tableId = tableId; | |
this.authorizations = authorizations; | |
this.numThreads = numThreads; | |
this.queryThreadPool = queryThreadPool; | |
this.options = new ScannerOptions(scannerOptions); | |
resultsQueue = new ArrayBlockingQueue<>(queueCapacity); | |
this.returnType = returnType; | |
this.docRawFields=docRawFields; | |
this.maxTabletsPerThread = maxTabletsPerThread; | |
this.locator = new TimeoutTabletLocator(timeout, context, tableId); | |
this.timeout = timeout; | |
if (options.getFetchedColumns().size() > 0) { | |
ArrayList<Range> ranges2 = new ArrayList<>(ranges.size()); | |
for (Range range : ranges) { | |
ranges2.add(range.bound(options.getFetchedColumns().first(), options.getFetchedColumns().last())); | |
} | |
ranges = ranges2; | |
} | |
ResultReceiver rr = printOutput ? entries -> { | |
try { | |
System.out.println("Received " + entries.size() + " " + resultsQueue.size()); | |
resultsQueue.put(entries); | |
} catch (InterruptedException e) { | |
if (this.queryThreadPool.isShutdown()) | |
log.debug("Failed to add Batch Scan result", e); | |
else | |
log.warn("Failed to add Batch Scan result", e); | |
fatalException = e; | |
throw new RuntimeException(e); | |
} | |
} : entries -> { | |
try { | |
resultsQueue.put(entries); | |
} catch (InterruptedException e) { | |
if (this.queryThreadPool.isShutdown()) | |
log.debug("Failed to add Batch Scan result", e); | |
else | |
log.warn("Failed to add Batch Scan result", e); | |
fatalException = e; | |
throw new RuntimeException(e); | |
} | |
}; | |
try { | |
lookup(ranges, rr); | |
} catch (RuntimeException re) { | |
throw re; | |
} catch (Exception e) { | |
throw new RuntimeException("Failed to create iterator", e); | |
} | |
} | |
@Override | |
public boolean hasNext() { | |
synchronized (nextLock) { | |
if (batch == LAST_BATCH) | |
return false; | |
if (batch != null && batchIterator.hasNext()) | |
return true; | |
// don't have one cached, try to cache one and return success | |
try { | |
batch = null; | |
while (batch == null && fatalException == null && !queryThreadPool.isShutdown()) | |
batch = resultsQueue.poll(5, TimeUnit.MILLISECONDS); | |
if (fatalException != null) | |
if (fatalException instanceof RuntimeException) | |
throw (RuntimeException) fatalException; | |
else | |
throw new RuntimeException(fatalException); | |
if (queryThreadPool.isShutdown()) { | |
String shortMsg = | |
"The BatchScanner was unexpectedly closed while" + " this Iterator was still in use."; | |
log.error("{} Ensure that a reference to the BatchScanner is retained" | |
+ " so that it can be closed when this Iterator is exhausted. Not" | |
+ " retaining a reference to the BatchScanner guarantees that you are" | |
+ " leaking threads in your client JVM.", shortMsg); | |
throw new RuntimeException(shortMsg + " Ensure proper handling of the BatchScanner."); | |
} | |
batchIterator = batch.iterator(); | |
return batch != LAST_BATCH; | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
@Override | |
public Document next() { | |
// if there's one waiting, or hasNext() can get one, return it | |
synchronized (nextLock) { | |
if (hasNext()) | |
return batchIterator.next(); | |
else | |
throw new NoSuchElementException(); | |
} | |
} | |
@Override | |
public void remove() { | |
throw new UnsupportedOperationException(); | |
} | |
private synchronized void lookup(List<Range> ranges, ResultReceiver receiver) | |
throws AccumuloException, AccumuloSecurityException, TableNotFoundException { | |
List<Column> columns = new ArrayList<>(options.getFetchedColumns()); | |
ranges = Range.mergeOverlapping(ranges); | |
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); | |
binRanges(locator, ranges, binnedRanges); | |
doLookups(binnedRanges, receiver, columns); | |
} | |
private void binRanges(TabletLocator tabletLocator, List<Range> ranges, | |
Map<String,Map<KeyExtent,List<Range>>> binnedRanges) | |
throws AccumuloException, AccumuloSecurityException, TableNotFoundException { | |
int lastFailureSize = Integer.MAX_VALUE; | |
while (true) { | |
binnedRanges.clear(); | |
List<Range> failures = tabletLocator.binRanges(context, ranges, binnedRanges); | |
if (failures.size() > 0) { | |
// tried to only do table state checks when failures.size() == ranges.size(), however this | |
// did | |
// not work because nothing ever invalidated entries in the tabletLocator cache... so even | |
// though | |
// the table was deleted the tablet locator entries for the deleted table were not | |
// cleared... so | |
// need to always do the check when failures occur | |
if (failures.size() >= lastFailureSize) | |
if (!Tables.exists(context, tableId)) | |
throw new TableDeletedException(tableId.canonical()); | |
else if (Tables.getTableState(context, tableId) == TableState.OFFLINE) | |
throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); | |
lastFailureSize = failures.size(); | |
if (log.isTraceEnabled()) | |
log.trace("Failed to bin {} ranges, tablet locations were null, retrying in 100ms", | |
failures.size()); | |
try { | |
Thread.sleep(100); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
} else { | |
break; | |
} | |
} | |
// truncate the ranges to within the tablets... this makes it easier to know what work | |
// needs to be redone when failures occurs and tablets have merged or split | |
Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>(); | |
for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) { | |
Map<KeyExtent,List<Range>> tabletMap = new HashMap<>(); | |
binnedRanges2.put(entry.getKey(), tabletMap); | |
for (Entry<KeyExtent,List<Range>> tabletRanges : entry.getValue().entrySet()) { | |
Range tabletRange = tabletRanges.getKey().toDataRange(); | |
List<Range> clippedRanges = new ArrayList<>(); | |
tabletMap.put(tabletRanges.getKey(), clippedRanges); | |
for (Range range : tabletRanges.getValue()) | |
clippedRanges.add(tabletRange.clip(range)); | |
} | |
} | |
binnedRanges.clear(); | |
binnedRanges.putAll(binnedRanges2); | |
} | |
private void processFailures(Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, | |
List<Column> columns) | |
throws AccumuloException, AccumuloSecurityException, TableNotFoundException { | |
if (log.isTraceEnabled()) | |
log.trace("Failed to execute multiscans against {} tablets, retrying...", failures.size()); | |
try { | |
Thread.sleep(failSleepTime); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
// We were interrupted (close called on batchscanner) just exit | |
log.debug("Exiting failure processing on interrupt"); | |
return; | |
} | |
failSleepTime = Math.min(5000, failSleepTime * 2); | |
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); | |
List<Range> allRanges = new ArrayList<>(); | |
for (List<Range> ranges : failures.values()) | |
allRanges.addAll(ranges); | |
// since the first call to binRanges clipped the ranges to within a tablet, we should not get | |
// only | |
// bin to the set of failed tablets | |
binRanges(locator, allRanges, binnedRanges); | |
doLookups(binnedRanges, receiver, columns); | |
} | |
private String getTableInfo() { | |
return Tables.getPrintableTableInfoFromId(context, tableId); | |
} | |
private class QueryTask implements Runnable { | |
private String tsLocation; | |
private Map<KeyExtent,List<Range>> tabletsRanges; | |
private ResultReceiver receiver; | |
private Semaphore semaphore = null; | |
private final Map<KeyExtent,List<Range>> failures; | |
private List<Column> columns; | |
private int semaphoreSize; | |
QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges, | |
Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns, DocumentSerialization.ReturnType returnType) { | |
this.tsLocation = tsLocation; | |
this.tabletsRanges = tabletsRanges; | |
this.receiver = receiver; | |
this.columns = columns; | |
this.failures = failures; | |
} | |
void setSemaphore(Semaphore semaphore, int semaphoreSize) { | |
this.semaphore = semaphore; | |
this.semaphoreSize = semaphoreSize; | |
} | |
@Override | |
public void run() { | |
String threadName = Thread.currentThread().getName(); | |
Thread.currentThread() | |
.setName(threadName + " looking up " + tabletsRanges.size() + " ranges at " + tsLocation); | |
Map<KeyExtent,List<Range>> unscanned = new HashMap<>(); | |
Map<KeyExtent,List<Range>> tsFailures = new HashMap<>(); | |
try { | |
/* | |
TimeoutTracker timeoutTracker = timeoutTrackers.get(tsLocation); | |
if (timeoutTracker == null) { | |
timeoutTracker = new TimeoutTracker(tsLocation, timedoutServers, timeout); | |
timeoutTrackers.put(tsLocation, timeoutTracker); | |
} | |
*/ | |
doLookup(context, tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns, | |
options, authorizations, returnType,docRawFields); | |
if (tsFailures.size() > 0) { | |
locator.invalidateCache(tsFailures.keySet()); | |
synchronized (failures) { | |
failures.putAll(tsFailures); | |
} | |
} | |
} catch (IOException e) { | |
e.printStackTrace(); | |
if (!DocumentScan.this.queryThreadPool.isShutdown()) { | |
synchronized (failures) { | |
failures.putAll(tsFailures); | |
failures.putAll(unscanned); | |
} | |
locator.invalidateCache(context, tsLocation); | |
} | |
log.debug("IOException thrown", e); | |
} catch (AccumuloSecurityException e) { | |
e.printStackTrace(); | |
e.setTableInfo(getTableInfo()); | |
log.debug("AccumuloSecurityException thrown", e); | |
Tables.clearCache(context); | |
if (!Tables.exists(context, tableId)) | |
fatalException = new TableDeletedException(tableId.canonical()); | |
else | |
fatalException = e; | |
} catch (SampleNotPresentException e) { | |
fatalException = e; | |
} catch (Throwable t) { | |
t.printStackTrace(); | |
if (queryThreadPool.isShutdown()) | |
log.debug("Caught exception, but queryThreadPool is shutdown", t); | |
else | |
log.warn("Caught exception, but queryThreadPool is not shutdown", t); | |
fatalException = t; | |
} finally { | |
semaphore.release(); | |
Thread.currentThread().setName(threadName); | |
if (semaphore.tryAcquire(semaphoreSize)) { | |
// finished processing all queries | |
if (fatalException == null && failures.size() > 0) { | |
// there were some failures | |
try { | |
processFailures(failures, receiver, columns); | |
} catch (TableNotFoundException | AccumuloException e) { | |
log.debug("{}", e.getMessage(), e); | |
fatalException = e; | |
} catch (AccumuloSecurityException e) { | |
e.setTableInfo(getTableInfo()); | |
log.debug("{}", e.getMessage(), e); | |
fatalException = e; | |
} catch (Throwable t) { | |
log.debug("{}", t.getMessage(), t); | |
fatalException = t; | |
} | |
if (fatalException != null) { | |
// we are finished with this batch query | |
if (!resultsQueue.offer(LAST_BATCH)) { | |
log.debug( | |
"Could not add to result queue after seeing fatalException in processFailures", | |
fatalException); | |
} | |
} | |
} else { | |
// we are finished with this batch query | |
if (fatalException != null) { | |
if (!resultsQueue.offer(LAST_BATCH)) { | |
log.debug("Could not add to result queue after seeing fatalException", | |
fatalException); | |
} | |
} else { | |
try { | |
resultsQueue.put(LAST_BATCH); | |
} catch (InterruptedException e) { | |
fatalException = e; | |
if (!resultsQueue.offer(LAST_BATCH)) { | |
log.debug("Could not add to result queue after seeing fatalException", | |
fatalException); | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, | |
final ResultReceiver receiver, List<Column> columns) { | |
/* | |
if (timedoutServers.containsAll(binnedRanges.keySet())) { | |
// all servers have timed out | |
throw new TimedOutException(timedoutServers); | |
}*/ | |
// when there are lots of threads and a few tablet servers | |
// it is good to break request to tablet servers up, the | |
// following code determines if this is the case | |
int maxTabletsPerRequest = Integer.MAX_VALUE; | |
System.out.println("maxTabletsPerRequest " + numThreads + " " + binnedRanges.size()); | |
if (numThreads / binnedRanges.size() > 1) { | |
int totalNumberOfTablets = 0; | |
for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) { | |
totalNumberOfTablets += entry.getValue().size(); | |
} | |
maxTabletsPerRequest = totalNumberOfTablets / (numThreads); | |
System.out.println("maxTabletsPerRequest is now " + maxTabletsPerRequest + " " + totalNumberOfTablets); | |
if (maxTabletsPerRequest == 0) { | |
maxTabletsPerRequest = 1; | |
} | |
} | |
if (maxTabletsPerThread > 0 ){ | |
maxTabletsPerRequest = maxTabletsPerThread; | |
} | |
Map<KeyExtent,List<Range>> failures = new HashMap<>(); | |
/* | |
if (timedoutServers.size() > 0) { | |
// go ahead and fail any timed out servers | |
for (Iterator<Entry<String,Map<KeyExtent,List<Range>>>> iterator = | |
binnedRanges.entrySet().iterator(); iterator.hasNext();) { | |
Entry<String,Map<KeyExtent,List<Range>>> entry = iterator.next(); | |
if (timedoutServers.contains(entry.getKey())) { | |
failures.putAll(entry.getValue()); | |
iterator.remove(); | |
} | |
} | |
}*/ | |
// randomize tabletserver order... this will help when there are multiple | |
// batch readers and writers running against accumulo | |
List<String> locations = new ArrayList<>(binnedRanges.keySet()); | |
Collections.shuffle(locations); | |
List<QueryTask> queryTasks = new ArrayList<>(); | |
for (final String tsLocation : locations) { | |
final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation); | |
if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) { | |
QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns, this.returnType); | |
queryTasks.add(queryTask); | |
} else { | |
HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<>(); | |
for (Entry<KeyExtent,List<Range>> entry : tabletsRanges.entrySet()) { | |
tabletSubset.put(entry.getKey(), entry.getValue()); | |
if (tabletSubset.size() >= maxTabletsPerRequest) { | |
QueryTask queryTask = | |
new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, this.returnType); | |
queryTasks.add(queryTask); | |
tabletSubset = new HashMap<>(); | |
} | |
} | |
if (tabletSubset.size() > 0) { | |
QueryTask queryTask = | |
new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, this.returnType); | |
queryTasks.add(queryTask); | |
} | |
} | |
} | |
final Semaphore semaphore = new Semaphore(queryTasks.size()); | |
semaphore.acquireUninterruptibly(queryTasks.size()); | |
for (QueryTask queryTask : queryTasks) { | |
queryTask.setSemaphore(semaphore, queryTasks.size()); | |
queryThreadPool.execute(queryTask); | |
} | |
} | |
static void trackScanning(Map<KeyExtent,List<Range>> failures, | |
Map<KeyExtent,List<Range>> unscanned, MultiScanResult scanResult) { | |
// translate returned failures, remove them from unscanned, and add them to failures | |
Map<KeyExtent,List<Range>> retFailures = Translator.translate(scanResult.failures, | |
Translators.TKET, new Translator.ListTranslator<>(Translators.TRT)); | |
unscanned.keySet().removeAll(retFailures.keySet()); | |
failures.putAll(retFailures); | |
// translate full scans and remove them from unscanned | |
HashSet<KeyExtent> fullScans = | |
new HashSet<>(Translator.translate(scanResult.fullScans, Translators.TKET)); | |
unscanned.keySet().removeAll(fullScans); | |
// remove partial scan from unscanned | |
if (scanResult.partScan != null) { | |
KeyExtent ke = new KeyExtent(scanResult.partScan); | |
Key nextKey = new Key(scanResult.partNextKey); | |
ListIterator<Range> iterator = unscanned.get(ke).listIterator(); | |
while (iterator.hasNext()) { | |
Range range = iterator.next(); | |
if (range.afterEndKey(nextKey) || (nextKey.equals(range.getEndKey()) | |
&& scanResult.partNextKeyInclusive != range.isEndKeyInclusive())) { | |
iterator.remove(); | |
} else if (range.contains(nextKey)) { | |
iterator.remove(); | |
Range partRange = new Range(nextKey, scanResult.partNextKeyInclusive, range.getEndKey(), | |
range.isEndKeyInclusive()); | |
iterator.add(partRange); | |
} | |
} | |
} | |
} | |
static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested, | |
Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned, | |
ResultReceiver receiver, List<Column> columns, ScannerOptions options, | |
Authorizations authorizations /* , TimeoutTracker timeoutTracker */, DocumentSerialization.ReturnType returnType, boolean docRawFields) | |
throws IOException, AccumuloSecurityException, AccumuloServerException { | |
if (requested.size() == 0) { | |
return; | |
} | |
// copy requested to unscanned map. we will remove ranges as they are scanned in trackScanning() | |
for (Entry<KeyExtent,List<Range>> entry : requested.entrySet()) { | |
ArrayList<Range> ranges = new ArrayList<>(); | |
for (Range range : entry.getValue()) { | |
ranges.add(new Range(range)); | |
} | |
unscanned.put(new KeyExtent(entry.getKey()), ranges); | |
} | |
// timeoutTracker.startingScan(); | |
TTransport transport = null; | |
try { | |
final HostAndPort parsedServer = HostAndPort.fromString(server); | |
final TabletClientService.Client client; | |
// if (timeoutTracker.getTimeOut() < context.getClientTimeoutInMillis()) | |
// client = ThriftUtil.getTServerClient(parsedServer, context, timeoutTracker.getTimeOut()); | |
// else | |
client = ThriftUtil.getTServerClient(parsedServer, context); | |
MyScannerOptions opts = new MyScannerOptions(options); | |
try { | |
OpTimer timer = null; | |
TabletType ttype = TabletType.type(requested.keySet()); | |
boolean waitForWrites = !ThriftScanner.serversWaitedForWrites.get(ttype).contains(server); | |
Map<TKeyExtent,List<TRange>> thriftTabletRanges = Translator.translate(requested, | |
Translators.KET, new Translator.ListTranslator<>(Translators.RT)); | |
Map<String,String> execHints = null; | |
InitialMultiScan imsr = client.startMultiScan(TraceUtil.traceInfo(), context.rpcCreds(), | |
thriftTabletRanges, Translator.translate(columns, Translators.CT), | |
opts.getServerSideIteratorList(), opts.getServerSideIteratorOptions(), | |
ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites, | |
SamplerConfigurationImpl.toThrift(options.getSamplerConfiguration()), | |
Long.MAX_VALUE, options.getClassLoaderContext(), execHints); | |
if (waitForWrites) | |
ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString()); | |
MultiScanResult scanResult = imsr.result; | |
if (timer != null) { | |
timer.stop(); | |
log.trace("tid={} Got 1st multi scan results, #results={} {} in {}", | |
Thread.currentThread().getId(), scanResult.results.size(), | |
(scanResult.more ? "scanID=" + imsr.scanID : ""), | |
String.format("%.3f secs", timer.scale(TimeUnit.SECONDS))); | |
} | |
/* | |
ArrayList<Document> entries = new ArrayList<>(scanResult.results.size()); | |
for (TKeyValue kv : scanResult.results) { | |
entries.add( getDocument(returnType,docRawFields,kv)); | |
//new SimpleImmutableEntry<>(new Key(kv.key), new Value(kv.value.array(),false))); | |
}*/ | |
if (scanResult.results.size() > 0) | |
receiver.receive(scanResult.results.parallelStream().map( x -> { | |
return getDocument(returnType,docRawFields,x); | |
}).collect(Collectors.toList())); | |
// if (entries.size() > 0 || scanResult.fullScans.size() > 0) | |
// timeoutTracker.madeProgress(); | |
trackScanning(failures, unscanned, scanResult); | |
AtomicLong nextOpid = new AtomicLong(); | |
while (scanResult.more) { | |
//timeoutTracker.check(); | |
if (timer != null) { | |
log.trace("tid={} oid={} Continuing multi scan, scanid={}", | |
Thread.currentThread().getId(), nextOpid.get(), imsr.scanID); | |
timer.reset().start(); | |
} | |
scanResult = client.continueMultiScan(TraceUtil.traceInfo(), imsr.scanID); | |
if (timer != null) { | |
timer.stop(); | |
log.trace("tid={} oid={} Got more multi scan results, #results={} {} in {}", | |
Thread.currentThread().getId(), nextOpid.getAndIncrement(), | |
scanResult.results.size(), (scanResult.more ? " scanID=" + imsr.scanID : ""), | |
String.format("%.3f secs", timer.scale(TimeUnit.SECONDS))); | |
} | |
/* | |
entries = new ArrayList<>(scanResult.results.size()); | |
for (TKeyValue kv : scanResult.results) { | |
entries.add( getDocument(returnType,docRawFields,kv)); | |
//entries.add(new SimpleImmutableEntry<>(new Key(kv.key), new Value(kv.value.array(),false))); | |
} | |
if (entries.size() > 0) | |
receiver.receive(entries); | |
*/ | |
if (scanResult.results.size() > 0) | |
receiver.receive(scanResult.results.parallelStream().map( x -> { | |
return getDocument(returnType,docRawFields,x); | |
}).collect(Collectors.toList())); | |
trackScanning(failures, unscanned, scanResult); | |
} | |
client.closeMultiScan(TraceUtil.traceInfo(), imsr.scanID); | |
} finally { | |
ThriftUtil.returnClient(client); | |
} | |
} catch (TTransportException e) { | |
log.debug("Server : {} msg : {}", server, e.getMessage()); | |
// timeoutTracker.errorOccured(); | |
throw new IOException(e); | |
} catch (ThriftSecurityException e) { | |
log.debug("Server : {} msg : {}", server, e.getMessage(), e); | |
throw new AccumuloSecurityException(e.user, e.code, e); | |
} catch (TApplicationException e) { | |
log.debug("Server : {} msg : {}", server, e.getMessage(), e); | |
throw new AccumuloServerException(server, e); | |
} catch (NoSuchScanIDException e) { | |
log.debug("Server : {} msg : {}", server, e.getMessage(), e); | |
throw new IOException(e); | |
} catch (TSampleNotPresentException e) { | |
log.debug("Server : " + server + " msg : " + e.getMessage(), e); | |
String tableInfo = "?"; | |
if (e.getExtent() != null) { | |
TableId tableId = new KeyExtent(e.getExtent()).getTableId(); | |
tableInfo = Tables.getPrintableTableInfoFromId(context, tableId); | |
} | |
String message = "Table " + tableInfo + " does not have sampling configured or built"; | |
throw new SampleNotPresentException(message, e); | |
} catch (TException e) { | |
log.debug("Server : {} msg : {}", server, e.getMessage(), e); | |
// timeoutTracker.errorOccured(); | |
throw new IOException(e); | |
} finally { | |
ThriftTransportPool.getInstance().returnTransport(transport); | |
} | |
} | |
static Document getDocument(DocumentSerialization.ReturnType returnType,boolean docRawFields, TKeyValue kv){ | |
Document document = null; | |
byte [] array = kv.value.array(); | |
int offset = 0; | |
int size = array.length; | |
if ( !org.apache.thrift.TBaseHelper.wrapsFullArray(kv.value)){ | |
size = kv.value.remaining(); | |
offset = kv.value.arrayOffset() + kv.value.position(); | |
} | |
if (DocumentSerialization.ReturnType.kryo == returnType) { | |
Input input = new Input(array,offset,size); | |
document = kryo.readObject(input, Document.class); | |
if (null == document) { | |
throw new RuntimeException("Deserialized null Document"); | |
} | |
input.close(); | |
} | |
else if (DocumentSerialization.ReturnType.json == returnType) { | |
InputStream jsonStream = new ByteArrayInputStream(array, offset+3, size - 3); | |
if (!docRawFields) { | |
Reader rdr = new InputStreamReader(jsonStream); | |
//Reader rdr = new InputStreamReader(DocumentSerialization.consumeHeader(kv.value.array())); | |
JsonObject jsonObject = jsonParser.parse(rdr).getAsJsonObject(); | |
document = jsonDeser.deserialize(jsonObject, null, null); | |
} | |
else{ | |
document = new Document(); | |
TypeAttribute<?> attr = new TypeAttribute(new NoOpType(jsonStream.toString()),null,true); | |
document.put("rawjson",attr); | |
} | |
} | |
else{ | |
throw new UnsupportedOperationException("only kryo and json are supported"); | |
} | |
return document; | |
} | |
static int sumSizes(Collection<List<Range>> values) { | |
int sum = 0; | |
for (List<Range> list : values) { | |
sum += list.size(); | |
} | |
return sum; | |
} | |
public static class MyScannerOptions extends ScannerOptions{ | |
public MyScannerOptions(ScannerOptions opts){ | |
super(opts); | |
} | |
public List<IterInfo> getServerSideIteratorList(){ | |
return serverSideIteratorList; | |
} | |
public Map<String,Map<String,String>> getServerSideIteratorOptions(){ | |
return serverSideIteratorOptions; | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment