Last active
December 8, 2022 21:36
-
-
Save baughmann/6b8bfb59fa4364d0138070d2b8a309d7 to your computer and use it in GitHub Desktop.
Vert.X Web Streaming Multipart File Data to MinIO / S3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Basically, this allows you to easily stream a Multipart file upload request to MinIO, rather than save a file locally on the server | |
// or reading the full file into memory. | |
// | |
// While it was created with MinIO in mind, you'll notice that `createMultipartUpload()`, `uploadPart()` and `completeMultipartUpload()` | |
// are all available inside Amazon's S3 Java SDK. that means you can easily swap this over to use the S3 API instead | |
package server | |
import com.google.common.collect.HashMultimap | |
import io.minio.MinioClient | |
import io.minio.messages.Part | |
import io.vertx.core.buffer.Buffer | |
import io.vertx.core.streams.ReadStream | |
import scala.collection.mutable.ListBuffer | |
class CustomMinioClient(client: MinioClient) extends MinioClient(client) { | |
def putReadStream(bucket: String, | |
objectName: String, | |
region: String = "us-east-1", | |
data: ReadStream[Buffer], | |
objectSize: Long, | |
contentType: String = "application/octet-stream" | |
) = { | |
val headers: HashMultimap[String, String] = HashMultimap.create() | |
headers.put("Content-Type", contentType) | |
var uploadId: String = null | |
try { | |
val parts = new ListBuffer[Part]() | |
val createResponse = createMultipartUpload(bucket, region, objectName, headers, null) | |
uploadId = createResponse.result.uploadId() | |
var partNumber = 1 | |
var uploadedSize = 0 | |
// a buffer to be used to build up bytes until we reach the S3 minimum part size of 5mb | |
var partBuffer = Buffer.buffer() | |
// should probably dynamically set this based on the total file size in order to avoi | |
// unnecessary network calls to the MinIO instance for small files | |
val minPartSize = 5 * 1024 * 1024 | |
// this is called every time we receive data from the client | |
data.handler { buffer => | |
// add teh bytes we just got to our running-buffer | |
partBuffer.appendBuffer(buffer) | |
val availableSize = objectSize - uploadedSize - partBuffer.length | |
val isMinPartSize = partBuffer.length >= minPartSize | |
val isLastPart = uploadedSize + partBuffer.length == objectSize | |
// if we've reached the threshhold for uploading a part, | |
// or if this is the last part of the file | |
if (isMinPartSize || isLastPart) { | |
// upload this part to the storage server | |
val partResponse = uploadPart( | |
bucket, | |
region, | |
objectName, | |
partBuffer.getBytes, | |
partBuffer.length, | |
uploadId, | |
partNumber, | |
null, | |
null | |
) | |
// add the parts to the array that we will use to call `complete...()` | |
parts.addOne(new Part(partNumber, partResponse.etag)) | |
uploadedSize += partBuffer.length | |
partNumber += 1 | |
// reset the buffer | |
partBuffer = Buffer.buffer() | |
} | |
} | |
// this is called once we have read all data from the client | |
data.endHandler { _ => | |
val resp = completeMultipartUpload(bucket, region, objectName, uploadId, parts.toArray, null, null) | |
} | |
// should use this to abort the upload | |
data.exceptionHandler { exception => | |
println("Handler caught exception in custom putObject: " + exception) | |
} | |
} catch { | |
case e: Exception => | |
println("Exception thrown in custom `putObject`: " + e) | |
abortMultipartUpload( | |
bucket, | |
region, | |
objectName, | |
uploadId, | |
null, | |
null | |
) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment