Skip to content

Instantly share code, notes, and snippets.

@javrasya
Last active January 8, 2024 12:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save javrasya/513f838a8af355b51506ca2a2dc1e3d8 to your computer and use it in GitHub Desktop.
Save javrasya/513f838a8af355b51506ca2a2dc1e3d8 to your computer and use it in GitHub Desktop.
Custom S3FileIO which keeps re-opens a new S3 client and use it if the original one is closed. The bug it covers happens due to messed up lifecycle issue. This works with Iceberg version 1.4 not lower.
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.util.SerializableSupplier;
import software.amazon.awssdk.services.s3.S3Client;
import java.util.concurrent.atomic.AtomicBoolean;
public class CustomS3FileIO extends S3FileIO {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final SerializableSupplier<S3Client> s3;
private transient volatile S3Client backupClient;
public CustomS3FileIO() {
this.s3 = AwsClientFactories.defaultFactory()::s3;
}
public CustomS3FileIO(SerializableSupplier<S3Client> s3) {
super(s3);
this.s3 = s3;
}
public CustomS3FileIO(SerializableSupplier<S3Client> s3, S3FileIOProperties s3FileIOProperties) {
super(s3, s3FileIOProperties);
this.s3 = s3;
}
@Override
public S3Client client() {
if (closed.compareAndSet(true, false)) {
synchronized (this) {
if (backupClient == null) {
this.backupClient = this.s3.get();
}
}
}
if (null != backupClient) {
return backupClient;
} else {
return super.client();
}
}
@Override
public void close() {
synchronized (closed) {
super.close();
if (closed.compareAndSet(false, true)) {
if (backupClient != null) {
backupClient.close();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment