Skip to content

Instantly share code, notes, and snippets.

@javrasya
Created January 5, 2024 19:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save javrasya/98cfe90bd1a2585c56c4c3346a518477 to your computer and use it in GitHub Desktop.
Save javrasya/98cfe90bd1a2585c56c4c3346a518477 to your computer and use it in GitHub Desktop.
A custom iceberg split assigner which evenly distribute the work load over the discovered splits
import com.google.common.collect.Lists;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.flink.source.assigner.DefaultSplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
public class LimitingTasksWithSplitsAssigner extends DefaultSplitAssigner {
private final Integer taskLimitPerSplit;
public LimitingTasksWithSplitsAssigner(Integer taskLimitPerSplit) {
super(null);
this.taskLimitPerSplit = taskLimitPerSplit;
}
public LimitingTasksWithSplitsAssigner(Collection<IcebergSourceSplitState> assignerState, int taskLimitPerSplit) {
super(null, assignerState);
this.taskLimitPerSplit = taskLimitPerSplit;
}
@Override
public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) {
if (null == taskLimitPerSplit) {
super.onDiscoveredSplits(splits);
} else {
List<FileScanTask> fileScanTasks = splits.stream().map(IcebergSourceSplit::task).flatMap(t -> t.tasks().stream()).collect(Collectors.toList());
List<List<FileScanTask>> partitions = Lists.partition(fileScanTasks, taskLimitPerSplit);
Collection<IcebergSourceSplit> newSplits = partitions.stream().map(BaseCombinedScanTask::new)
.map(IcebergSourceSplit::fromCombinedScanTask)
.collect(Collectors.toList());
super.onDiscoveredSplits(newSplits);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment