Skip to content

Instantly share code, notes, and snippets.

@Deviad
Created May 20, 2021 08:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Deviad/aca50d874d9e1095c7b60156f4b27903 to your computer and use it in GitHub Desktop.
Save Deviad/aca50d874d9e1095c7b60156f4b27903 to your computer and use it in GitHub Desktop.
package con.worldpay.syscon;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import io.micronaut.context.annotation.Value;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Consumes;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.Produces;
import io.micronaut.http.multipart.FileUpload;
import io.micronaut.http.multipart.PartData;
import io.micronaut.http.multipart.StreamingFileUpload;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.functions.Function;
import io.reactivex.subjects.BehaviorSubject;
import io.vavr.CheckedConsumer;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.inject.Inject;
import javax.validation.constraints.NotNull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import static com.amazonaws.regions.Regions.EU_WEST_2;
import static io.vavr.API.unchecked;
@Controller("/api/upload")
@Slf4j
public class UploadController {
private String bucketName;
private BasicAWSCredentials awsCredentials;
private MetadataClient metadataClient;
@Inject
public UploadController(BasicAWSCredentials awsCredentials, @Value("${aws.bucket-name}") String bucketName, MetadataClient client) {
this.bucketName = bucketName;
this.awsCredentials = awsCredentials;
this.metadataClient = client;
}
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Produces(MediaType.APPLICATION_JSON)
@Post
public Mono<Response<SaveCommandResponseData>> uploadFile(@NotNull StreamingFileUpload file) {
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
ZonedDateTime start = Instant.now().atZone(ZoneId.systemDefault());
String startTimestamp = start.format(dateFormatter);
log.info("got into the upload function at: {}", startTimestamp);
int threadCt = Runtime.getRuntime().availableProcessors() + 1;
long multipartUploadThreshold = 0L;
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withRegion(EU_WEST_2)
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.build();
TransferManager tm = TransferManagerBuilder.standard()
.withS3Client(s3Client)
.withExecutorFactory(() -> Executors.newFixedThreadPool(threadCt))
.withMultipartUploadThreshold(multipartUploadThreshold)
.build();
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, file.getFilename());
s3Client.initiateMultipartUpload(initRequest);
BehaviorSubject<Boolean> subject = BehaviorSubject.create();
subject.onNext(false);
Flowable.fromPublisher(file)
.map(partData -> {
var encrypted = encrypt(partData);
InputStream inputStream = unchecked(encrypted::getInputStream).get();
PutObjectRequest request = new PutObjectRequest(bucketName,
file.getFilename(),
inputStream,
createObjectMetadata(file))
.withCannedAcl(CannedAccessControlList.PublicRead);
Consumer<InputStream> uncheckedIs = CheckedConsumer.of(InputStream::close).unchecked();
uncheckedIs.accept(inputStream);
return tm.upload(request);
})
.onErrorResumeNext((Function<? super Throwable, ? extends Publisher<? extends Upload>>) Flux::error)
.subscribe(upload -> {
do {
} while (!upload.isDone());
subject.onNext(true);
});
return Flux.from(subject.toFlowable(BackpressureStrategy.LATEST))
.takeUntil(x -> x.booleanValue())
.single()
.flatMap((x) -> {
String url = s3Client.getUrl(bucketName, file.getFilename()).toExternalForm();
var req = SaveMetadataRequest
.builder()
.serviceName("syscon")
.fileName(file.getFilename())
.contentType("application/pdf")
.fileSize(file.getSize())
.created(Instant.now().toEpochMilli())
.uri(url)
.build();
final Response<MetadataServiceResponse> res = metadataClient.saveMetadata(req);
return Mono.just(Response.of(SaveCommandResponseData
.builder()
.metadataId(res.getData().getId().toString())
.url(url).build(), Status.SUCCESS));
});
// ZonedDateTime end = Instant.now().atZone(ZoneId.systemDefault());
// String endTimeStamp = end.format(dateFormatter);
// log.info("getting out of the upload function at: {}", endTimeStamp);
//
// return
}
private PartData encrypt(PartData file) {
ByteArrayOutputStream out = null;
try {
out = new ByteArrayOutputStream();
byte[] buffer = new byte[8192];
final InputStream is = file.getInputStream();
for (int length; (length = is.read(buffer)) != -1; ) {
out.write(buffer, 0, length);
}
ByteArrayOutputStream finalOut = out;
return new PartData() {
@Override
public InputStream getInputStream() throws IOException {
return new ByteArrayInputStream(finalOut.toByteArray());
}
@Override
public byte[] getBytes() throws IOException {
return file.getBytes();
}
@Override
public ByteBuffer getByteBuffer() throws IOException {
return file.getByteBuffer();
}
@Override
public Optional<MediaType> getContentType() {
return file.getContentType();
}
};
} catch (IOException ioe) {
throw new RuntimeException(ioe);
} finally {
try {
if (out != null)
out.close();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
private ObjectMetadata createObjectMetadata(FileUpload file) {
ObjectMetadata objectMetadata = new ObjectMetadata();
file.getContentType().ifPresent(contentType -> objectMetadata.setContentType(contentType.getName()));
if (file.getSize() != 0) {
objectMetadata.setContentLength(file.getSize());
}
return objectMetadata;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment