Skip to content

Instantly share code, notes, and snippets.

@baughmann
Last active December 8, 2022 21:36
Show Gist options
  • Save baughmann/6b8bfb59fa4364d0138070d2b8a309d7 to your computer and use it in GitHub Desktop.
Save baughmann/6b8bfb59fa4364d0138070d2b8a309d7 to your computer and use it in GitHub Desktop.
Vert.X Web Streaming Multipart File Data to MinIO / S3
// Basically, this allows you to easily stream a Multipart file upload request to MinIO, rather than save a file locally on the server
// or reading the full file into memory.
//
// While it was created with MinIO in mind, you'll notice that `createMultipartUpload()`, `uploadPart()` and `completeMultipartUpload()`
// are all available inside Amazon's S3 Java SDK. that means you can easily swap this over to use the S3 API instead
package server
import com.google.common.collect.HashMultimap
import io.minio.MinioClient
import io.minio.messages.Part
import io.vertx.core.buffer.Buffer
import io.vertx.core.streams.ReadStream
import scala.collection.mutable.ListBuffer
class CustomMinioClient(client: MinioClient) extends MinioClient(client) {
def putReadStream(bucket: String,
objectName: String,
region: String = "us-east-1",
data: ReadStream[Buffer],
objectSize: Long,
contentType: String = "application/octet-stream"
) = {
val headers: HashMultimap[String, String] = HashMultimap.create()
headers.put("Content-Type", contentType)
var uploadId: String = null
try {
val parts = new ListBuffer[Part]()
val createResponse = createMultipartUpload(bucket, region, objectName, headers, null)
uploadId = createResponse.result.uploadId()
var partNumber = 1
var uploadedSize = 0
// a buffer to be used to build up bytes until we reach the S3 minimum part size of 5mb
var partBuffer = Buffer.buffer()
// should probably dynamically set this based on the total file size in order to avoi
// unnecessary network calls to the MinIO instance for small files
val minPartSize = 5 * 1024 * 1024
// this is called every time we receive data from the client
data.handler { buffer =>
// add teh bytes we just got to our running-buffer
partBuffer.appendBuffer(buffer)
val availableSize = objectSize - uploadedSize - partBuffer.length
val isMinPartSize = partBuffer.length >= minPartSize
val isLastPart = uploadedSize + partBuffer.length == objectSize
// if we've reached the threshhold for uploading a part,
// or if this is the last part of the file
if (isMinPartSize || isLastPart) {
// upload this part to the storage server
val partResponse = uploadPart(
bucket,
region,
objectName,
partBuffer.getBytes,
partBuffer.length,
uploadId,
partNumber,
null,
null
)
// add the parts to the array that we will use to call `complete...()`
parts.addOne(new Part(partNumber, partResponse.etag))
uploadedSize += partBuffer.length
partNumber += 1
// reset the buffer
partBuffer = Buffer.buffer()
}
}
// this is called once we have read all data from the client
data.endHandler { _ =>
val resp = completeMultipartUpload(bucket, region, objectName, uploadId, parts.toArray, null, null)
}
// should use this to abort the upload
data.exceptionHandler { exception =>
println("Handler caught exception in custom putObject: " + exception)
}
} catch {
case e: Exception =>
println("Exception thrown in custom `putObject`: " + e)
abortMultipartUpload(
bucket,
region,
objectName,
uploadId,
null,
null
)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment