Skip to content

Instantly share code, notes, and snippets.

@arhimondr
Created April 22, 2019 17:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arhimondr/e6cea0f19075fba5572fa575720a6f75 to your computer and use it in GitHub Desktop.
Save arhimondr/e6cea0f19075fba5572fa575720a6f75 to your computer and use it in GitHub Desktop.
public static HiveSplitSource bucketedRewindable(
ConnectorSession session,
String databaseName,
String tableName,
TupleDomain<? extends ColumnHandle> compactEffectivePredicate,
int maxInitialSplits,
DataSize maxOutstandingSplitsSize,
HiveSplitLoader splitLoader,
Executor executor,
CounterStat highMemorySplitSourceCounter)
{
AtomicReference<State> stateReference = new AtomicReference<>(State.initial());
return new HiveSplitSource(
session,
databaseName,
tableName,
compactEffectivePredicate,
new PerBucket()
{
private final Map<Integer, List<InternalHiveSplit>> splits = new ConcurrentHashMap<>();
private final SettableFuture<?> finished = SettableFuture.create();
@Override
public ListenableFuture<?> offer(OptionalInt bucketNumber, InternalHiveSplit connectorSplit)
{
checkArgument(bucketNumber.isPresent(), "bucketNumber must be present");
splits.computeIfAbsent(bucketNumber.getAsInt(), (ignored) -> new ArrayList<>()).add(connectorSplit);
// Do not block "offer" when running split discovery in bucketed mode.
// A limit is enforced on estimatedSplitSizeInBytes.
return immediateFuture(null);
}
@Override
public ListenableFuture<List<ConnectorSplit>> borrowBatchAsync(OptionalInt bucketNumber, int maxSize, Function<List<InternalHiveSplit>, BorrowResult<InternalHiveSplit, List<ConnectorSplit>>> function)
{
checkArgument(bucketNumber.isPresent(), "bucketNumber must be present");
if (!finished.isDone()) {
return finished.transform((ignored) -> ImmutableList.of(), executor);
}
return immediateFuture(function.apply(getSplits(bucketNumber.getAsInt(), maxSize)).getResult());
}
private synchronized List<InternalHiveSplit> getSplits(int bucket, int batchSize)
{
return splits.getOrDefault(bucket, ImmutableList.of()).stream()
.filter(split -> !split.isDone())
.limit(batchSize)
.collect(toImmutableList());
}
@Override
public void finish()
{
finished.set(null);
}
@Override
public boolean isFinished(OptionalInt bucketNumber)
{
return finished.isDone();
}
@Override
public synchronized int rewind(OptionalInt bucketNumber)
{
checkArgument(bucketNumber.isPresent(), "bucketNumber must be present");
int reviwedSplitCount = 0;
for (InternalHiveSplit split : splits.getOrDefault(bucketNumber.getAsInt(), ImmutableList.of())) {
if (split.isDone()) {
split.resetStart();
reviwedSplitCount++;
}
}
return reviwedSplitCount;
}
},
maxInitialSplits,
maxOutstandingSplitsSize,
splitLoader,
stateReference,
highMemorySplitSourceCounter);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment