Skip to content

Instantly share code, notes, and snippets.

@rfecher
Created February 9, 2019 21:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rfecher/cdfd0d1e9304e75fa1787bb0d4a44988 to your computer and use it in GitHub Desktop.
Save rfecher/cdfd0d1e9304e75fa1787bb0d4a44988 to your computer and use it in GitHub Desktop.
Fix for multiple adapters in an index
/**
* Copyright (c) 2013-2019 Contributors to the Eclipse Foundation
*
* <p> See the NOTICE file distributed with this work for additional information regarding copyright
* ownership. All rights reserved. This program and the accompanying materials are made available
* under the terms of the Apache License, Version 2.0 which accompanies this distribution and is
* available at http://www.apache.org/licenses/LICENSE-2.0.txt
*/
package org.locationtech.geowave.core.store.util;
import java.util.HashSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
import org.locationtech.geowave.core.index.ByteArray;
import org.locationtech.geowave.core.index.IndexUtils;
import org.locationtech.geowave.core.store.adapter.PersistentAdapterStore;
import org.locationtech.geowave.core.store.adapter.exceptions.AdapterException;
import org.locationtech.geowave.core.store.api.Index;
import org.locationtech.geowave.core.store.base.BaseDataStoreUtils;
import org.locationtech.geowave.core.store.base.dataidx.DataIndexRetrieval;
import org.locationtech.geowave.core.store.callback.ScanCallback;
import org.locationtech.geowave.core.store.entities.GeoWaveRow;
import org.locationtech.geowave.core.store.query.filter.QueryFilter;
public class NativeEntryIteratorWrapper<T> implements Iterator<T> {
private final byte[] fieldSubsetBitmask;
private final boolean decodePersistenceEncoding;
private Integer bitPosition = null;
private ByteArray skipUntilRow;
private boolean reachedEnd = false;
private Set<Short> invalidAdapters = new HashSet<>();
protected final DataIndexRetrieval dataIndexRetrieval;
protected final PersistentAdapterStore adapterStore;
protected final Index index;
protected final Iterator<GeoWaveRow> scannerIt;
protected final QueryFilter[] clientFilters;
protected final ScanCallback<T, ? extends GeoWaveRow> scanCallback;
protected T nextValue;
public NativeEntryIteratorWrapper(
final PersistentAdapterStore adapterStore,
final Index index,
final Iterator<GeoWaveRow> scannerIt,
final QueryFilter[] clientFilters,
final ScanCallback<T, ? extends GeoWaveRow> scanCallback,
final byte[] fieldSubsetBitmask,
final double[] maxResolutionSubsamplingPerDimension,
final boolean decodePersistenceEncoding,
final DataIndexRetrieval dataIndexRetrieval) {
this.adapterStore = adapterStore;
this.index = index;
this.scannerIt = scannerIt;
this.clientFilters = clientFilters;
this.scanCallback = scanCallback;
this.fieldSubsetBitmask = fieldSubsetBitmask;
this.decodePersistenceEncoding = decodePersistenceEncoding;
this.dataIndexRetrieval = dataIndexRetrieval;
initializeBitPosition(maxResolutionSubsamplingPerDimension);
}
protected void findNext() {
while ((nextValue == null) && hasNextScannedResult()) {
final GeoWaveRow row = getNextEncodedResult();
final T decodedValue = decodeRow(row, clientFilters, index);
if (decodedValue != null) {
nextValue = decodedValue;
return;
}
}
}
protected boolean hasNextScannedResult() {
return scannerIt.hasNext();
}
protected GeoWaveRow getNextEncodedResult() {
return scannerIt.next();
}
@Override
public boolean hasNext() {
findNext();
return nextValue != null;
}
@Override
public T next() throws NoSuchElementException {
if (nextValue == null) {
findNext();
}
final T previousNext = nextValue;
if (nextValue == null) {
throw new NoSuchElementException();
}
nextValue = null;
return previousNext;
}
@Override
public void remove() {
scannerIt.remove();
}
@SuppressWarnings("unchecked")
protected T decodeRow(
final GeoWaveRow row,
final QueryFilter[] clientFilters,
final Index index) {
Object decodedRow = null;
if (!invalidAdapters.contains(row.getAdapterId()) && ((bitPosition == null) || passesSkipFilter(row))) {
try {
decodedRow =
BaseDataStoreUtils.decodeRow(
row,
clientFilters,
null,
adapterStore,
index,
scanCallback,
fieldSubsetBitmask,
decodePersistenceEncoding,
dataIndexRetrieval);
if (decodedRow != null) {
incrementSkipRow(row);
}
} catch (final AdapterException e) {
invalidAdapters.add(row.getAdapterId());
// Attempting to decode future rows with the same adapter is
// pointless.
}
}
return (T) decodedRow;
}
boolean first = false;
private boolean passesSkipFilter(final GeoWaveRow row) {
if ((reachedEnd == true)
|| ((skipUntilRow != null)
&& ((skipUntilRow.compareTo(new ByteArray(row.getSortKey()))) > 0))) {
return false;
}
return true;
}
private void incrementSkipRow(final GeoWaveRow row) {
if (bitPosition != null) {
final byte[] nextRow = IndexUtils.getNextRowForSkip(row.getSortKey(), bitPosition);
if (nextRow == null) {
reachedEnd = true;
} else {
skipUntilRow = new ByteArray(nextRow);
}
}
}
private void initializeBitPosition(final double[] maxResolutionSubsamplingPerDimension) {
if ((maxResolutionSubsamplingPerDimension != null)
&& (maxResolutionSubsamplingPerDimension.length > 0)) {
bitPosition =
IndexUtils.getBitPositionOnSortKeyFromSubsamplingArray(
index.getIndexStrategy(),
maxResolutionSubsamplingPerDimension);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment