Created
May 20, 2021 08:12
-
-
Save Deviad/aca50d874d9e1095c7b60156f4b27903 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package con.worldpay.syscon; | |
import com.amazonaws.auth.AWSStaticCredentialsProvider; | |
import com.amazonaws.auth.BasicAWSCredentials; | |
import com.amazonaws.services.s3.AmazonS3; | |
import com.amazonaws.services.s3.AmazonS3ClientBuilder; | |
import com.amazonaws.services.s3.model.CannedAccessControlList; | |
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; | |
import com.amazonaws.services.s3.model.ObjectMetadata; | |
import com.amazonaws.services.s3.model.PutObjectRequest; | |
import com.amazonaws.services.s3.transfer.TransferManager; | |
import com.amazonaws.services.s3.transfer.TransferManagerBuilder; | |
import com.amazonaws.services.s3.transfer.Upload; | |
import io.micronaut.context.annotation.Value; | |
import io.micronaut.http.MediaType; | |
import io.micronaut.http.annotation.Consumes; | |
import io.micronaut.http.annotation.Controller; | |
import io.micronaut.http.annotation.Post; | |
import io.micronaut.http.annotation.Produces; | |
import io.micronaut.http.multipart.FileUpload; | |
import io.micronaut.http.multipart.PartData; | |
import io.micronaut.http.multipart.StreamingFileUpload; | |
import io.reactivex.BackpressureStrategy; | |
import io.reactivex.Flowable; | |
import io.reactivex.functions.Function; | |
import io.reactivex.subjects.BehaviorSubject; | |
import io.vavr.CheckedConsumer; | |
import lombok.extern.slf4j.Slf4j; | |
import lombok.var; | |
import org.reactivestreams.Publisher; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import javax.inject.Inject; | |
import javax.validation.constraints.NotNull; | |
import java.io.ByteArrayInputStream; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.nio.ByteBuffer; | |
import java.time.Instant; | |
import java.time.ZoneId; | |
import java.time.ZonedDateTime; | |
import java.time.format.DateTimeFormatter; | |
import java.util.Optional; | |
import java.util.concurrent.Executors; | |
import java.util.function.Consumer; | |
import static com.amazonaws.regions.Regions.EU_WEST_2; | |
import static io.vavr.API.unchecked; | |
@Controller("/api/upload") | |
@Slf4j | |
public class UploadController { | |
private String bucketName; | |
private BasicAWSCredentials awsCredentials; | |
private MetadataClient metadataClient; | |
@Inject | |
public UploadController(BasicAWSCredentials awsCredentials, @Value("${aws.bucket-name}") String bucketName, MetadataClient client) { | |
this.bucketName = bucketName; | |
this.awsCredentials = awsCredentials; | |
this.metadataClient = client; | |
} | |
@Consumes(MediaType.MULTIPART_FORM_DATA) | |
@Produces(MediaType.APPLICATION_JSON) | |
@Post | |
public Mono<Response<SaveCommandResponseData>> uploadFile(@NotNull StreamingFileUpload file) { | |
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); | |
ZonedDateTime start = Instant.now().atZone(ZoneId.systemDefault()); | |
String startTimestamp = start.format(dateFormatter); | |
log.info("got into the upload function at: {}", startTimestamp); | |
int threadCt = Runtime.getRuntime().availableProcessors() + 1; | |
long multipartUploadThreshold = 0L; | |
AmazonS3 s3Client = AmazonS3ClientBuilder.standard() | |
.withRegion(EU_WEST_2) | |
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) | |
.build(); | |
TransferManager tm = TransferManagerBuilder.standard() | |
.withS3Client(s3Client) | |
.withExecutorFactory(() -> Executors.newFixedThreadPool(threadCt)) | |
.withMultipartUploadThreshold(multipartUploadThreshold) | |
.build(); | |
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, file.getFilename()); | |
s3Client.initiateMultipartUpload(initRequest); | |
BehaviorSubject<Boolean> subject = BehaviorSubject.create(); | |
subject.onNext(false); | |
Flowable.fromPublisher(file) | |
.map(partData -> { | |
var encrypted = encrypt(partData); | |
InputStream inputStream = unchecked(encrypted::getInputStream).get(); | |
PutObjectRequest request = new PutObjectRequest(bucketName, | |
file.getFilename(), | |
inputStream, | |
createObjectMetadata(file)) | |
.withCannedAcl(CannedAccessControlList.PublicRead); | |
Consumer<InputStream> uncheckedIs = CheckedConsumer.of(InputStream::close).unchecked(); | |
uncheckedIs.accept(inputStream); | |
return tm.upload(request); | |
}) | |
.onErrorResumeNext((Function<? super Throwable, ? extends Publisher<? extends Upload>>) Flux::error) | |
.subscribe(upload -> { | |
do { | |
} while (!upload.isDone()); | |
subject.onNext(true); | |
}); | |
return Flux.from(subject.toFlowable(BackpressureStrategy.LATEST)) | |
.takeUntil(x -> x.booleanValue()) | |
.single() | |
.flatMap((x) -> { | |
String url = s3Client.getUrl(bucketName, file.getFilename()).toExternalForm(); | |
var req = SaveMetadataRequest | |
.builder() | |
.serviceName("syscon") | |
.fileName(file.getFilename()) | |
.contentType("application/pdf") | |
.fileSize(file.getSize()) | |
.created(Instant.now().toEpochMilli()) | |
.uri(url) | |
.build(); | |
final Response<MetadataServiceResponse> res = metadataClient.saveMetadata(req); | |
return Mono.just(Response.of(SaveCommandResponseData | |
.builder() | |
.metadataId(res.getData().getId().toString()) | |
.url(url).build(), Status.SUCCESS)); | |
}); | |
// ZonedDateTime end = Instant.now().atZone(ZoneId.systemDefault()); | |
// String endTimeStamp = end.format(dateFormatter); | |
// log.info("getting out of the upload function at: {}", endTimeStamp); | |
// | |
// return | |
} | |
private PartData encrypt(PartData file) { | |
ByteArrayOutputStream out = null; | |
try { | |
out = new ByteArrayOutputStream(); | |
byte[] buffer = new byte[8192]; | |
final InputStream is = file.getInputStream(); | |
for (int length; (length = is.read(buffer)) != -1; ) { | |
out.write(buffer, 0, length); | |
} | |
ByteArrayOutputStream finalOut = out; | |
return new PartData() { | |
@Override | |
public InputStream getInputStream() throws IOException { | |
return new ByteArrayInputStream(finalOut.toByteArray()); | |
} | |
@Override | |
public byte[] getBytes() throws IOException { | |
return file.getBytes(); | |
} | |
@Override | |
public ByteBuffer getByteBuffer() throws IOException { | |
return file.getByteBuffer(); | |
} | |
@Override | |
public Optional<MediaType> getContentType() { | |
return file.getContentType(); | |
} | |
}; | |
} catch (IOException ioe) { | |
throw new RuntimeException(ioe); | |
} finally { | |
try { | |
if (out != null) | |
out.close(); | |
} catch (Exception ex) { | |
throw new RuntimeException(ex); | |
} | |
} | |
} | |
private ObjectMetadata createObjectMetadata(FileUpload file) { | |
ObjectMetadata objectMetadata = new ObjectMetadata(); | |
file.getContentType().ifPresent(contentType -> objectMetadata.setContentType(contentType.getName())); | |
if (file.getSize() != 0) { | |
objectMetadata.setContentLength(file.getSize()); | |
} | |
return objectMetadata; | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment