Skip to content

Instantly share code, notes, and snippets.

@jprante
Last active January 31, 2024 09:47
Show Gist options
  • Save jprante/b53e71aa55935ca35ba2dff5667f0821 to your computer and use it in GitHub Desktop.
Save jprante/b53e71aa55935ca35ba2dff5667f0821 to your computer and use it in GitHub Desktop.
Download all works from OpenAlex S3 with AWS Java SDK
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
public class OpenAlexAwsClientTest {
private static final Logger logger = Logger.getLogger(OpenAlexAwsClientTest.class.getName());
private final AtomicLong counter = new AtomicLong();
@Test
public void openAlexFiles() throws ExecutionException, InterruptedException, TimeoutException {
AnonymousCredentialsProvider anonymousCredentialsProvider = AnonymousCredentialsProvider.create();
String bucket = "openalex";
List<S3Object> s3Objects = new CopyOnWriteArrayList<>();
try (S3AsyncClient s3Client = S3AsyncClient.builder()
.region(Region.US_EAST_1)
.credentialsProvider(anonymousCredentialsProvider)
.build()) {
ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request.builder()
.bucket(bucket)
.build();
ListObjectsV2Publisher listObjectsV2Publisher =
s3Client.listObjectsV2Paginator(listObjectsV2Request);
listObjectsV2Publisher.contents().subscribe(s3Objects::add).get(1L, TimeUnit.MINUTES);
logger.log(Level.INFO, "found " + s3Objects.size() + " S3 objects in bucket " + bucket);
for (S3Object s3Object : s3Objects) {
log(s3Client, s3Object, bucket);
}
}
}
private void log(S3AsyncClient s3Client, S3Object s3Object, String bucket) {
String regex = "data/works/.*?.gz";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(s3Object.key());
if (matcher.matches()) {
logger.log(Level.INFO, "key = " + s3Object.key() +
" size = " + s3Object.size() +
" last modified = " + s3Object.lastModified());
counter.incrementAndGet();
}
}
private void download(S3AsyncClient s3Client, S3Object s3Object, String bucket) {
Path path = null;
try {
path = Files.createTempFile(bucket + "_", "");
// in this example, we download only files from data/works with gz suffix
String regex = "data/works/.*?.gz";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(s3Object.key());
if (matcher.matches()) {
GetObjectRequest getObjectRequest = GetObjectRequest
.builder()
.bucket(bucket)
.key(s3Object.key())
.build();
logger.log(Level.INFO, "downloading " + s3Object.key());
s3Client.getObject(getObjectRequest, path).get(1L, TimeUnit.HOURS);
logger.log(Level.INFO, "completed " + s3Object.key());
processPath(path);
}
} catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
} finally {
deletePath(path);
}
}
private void processPath(Path path) throws IOException {
// unpack and log each line
try (BufferedReader reader = new BufferedReader(new InputStreamReader(
new GZIPInputStream(Files.newInputStream(path))))) {
reader.lines().forEach(line -> logger.log(Level.INFO, "line = " + line));
}
}
private void deletePath(Path path) {
if (path != null) {
try {
Files.delete(path);
} catch (IOException e) {
logger.log(Level.WARNING, "unable to delete " + path);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment