Created
April 22, 2019 17:52
-
-
Save arhimondr/e6cea0f19075fba5572fa575720a6f75 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static HiveSplitSource bucketedRewindable( | |
ConnectorSession session, | |
String databaseName, | |
String tableName, | |
TupleDomain<? extends ColumnHandle> compactEffectivePredicate, | |
int maxInitialSplits, | |
DataSize maxOutstandingSplitsSize, | |
HiveSplitLoader splitLoader, | |
Executor executor, | |
CounterStat highMemorySplitSourceCounter) | |
{ | |
AtomicReference<State> stateReference = new AtomicReference<>(State.initial()); | |
return new HiveSplitSource( | |
session, | |
databaseName, | |
tableName, | |
compactEffectivePredicate, | |
new PerBucket() | |
{ | |
private final Map<Integer, List<InternalHiveSplit>> splits = new ConcurrentHashMap<>(); | |
private final SettableFuture<?> finished = SettableFuture.create(); | |
@Override | |
public ListenableFuture<?> offer(OptionalInt bucketNumber, InternalHiveSplit connectorSplit) | |
{ | |
checkArgument(bucketNumber.isPresent(), "bucketNumber must be present"); | |
splits.computeIfAbsent(bucketNumber.getAsInt(), (ignored) -> new ArrayList<>()).add(connectorSplit); | |
// Do not block "offer" when running split discovery in bucketed mode. | |
// A limit is enforced on estimatedSplitSizeInBytes. | |
return immediateFuture(null); | |
} | |
@Override | |
public ListenableFuture<List<ConnectorSplit>> borrowBatchAsync(OptionalInt bucketNumber, int maxSize, Function<List<InternalHiveSplit>, BorrowResult<InternalHiveSplit, List<ConnectorSplit>>> function) | |
{ | |
checkArgument(bucketNumber.isPresent(), "bucketNumber must be present"); | |
if (!finished.isDone()) { | |
return finished.transform((ignored) -> ImmutableList.of(), executor); | |
} | |
return immediateFuture(function.apply(getSplits(bucketNumber.getAsInt(), maxSize)).getResult()); | |
} | |
private synchronized List<InternalHiveSplit> getSplits(int bucket, int batchSize) | |
{ | |
return splits.getOrDefault(bucket, ImmutableList.of()).stream() | |
.filter(split -> !split.isDone()) | |
.limit(batchSize) | |
.collect(toImmutableList()); | |
} | |
@Override | |
public void finish() | |
{ | |
finished.set(null); | |
} | |
@Override | |
public boolean isFinished(OptionalInt bucketNumber) | |
{ | |
return finished.isDone(); | |
} | |
@Override | |
public synchronized int rewind(OptionalInt bucketNumber) | |
{ | |
checkArgument(bucketNumber.isPresent(), "bucketNumber must be present"); | |
int reviwedSplitCount = 0; | |
for (InternalHiveSplit split : splits.getOrDefault(bucketNumber.getAsInt(), ImmutableList.of())) { | |
if (split.isDone()) { | |
split.resetStart(); | |
reviwedSplitCount++; | |
} | |
} | |
return reviwedSplitCount; | |
} | |
}, | |
maxInitialSplits, | |
maxOutstandingSplitsSize, | |
splitLoader, | |
stateReference, | |
highMemorySplitSourceCounter); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment