Skip to content

Instantly share code, notes, and snippets.

@morj
Created May 15, 2018 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save morj/cb4b713e05200d5114b62c539e3d7ad7 to your computer and use it in GitHub Desktop.
Save morj/cb4b713e05200d5114b62c539e3d7ad7 to your computer and use it in GitHub Desktop.
package sandbox
import jetbrains.exodus.io.Block
import jetbrains.exodus.io.DataReader
import jetbrains.exodus.io.FileDataReader
import jetbrains.exodus.io.RemoveBlockType
import jetbrains.exodus.log.Log
import jetbrains.exodus.log.LogUtil
import mu.KLogging
import software.amazon.awssdk.core.AwsRequestOverrideConfig
import software.amazon.awssdk.core.async.AsyncResponseHandler
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.GetObjectRequest
import software.amazon.awssdk.services.s3.model.GetObjectResponse
import software.amazon.awssdk.services.s3.model.ListObjectsRequest
import software.amazon.awssdk.services.s3.model.S3Object
import java.util.concurrent.atomic.AtomicLong
class S3DataReader(val s3: S3AsyncClient, val bucketName: String, val requestOverrideConfig: AwsRequestOverrideConfig? = null) : DataReader {
companion object : KLogging()
val bytesRead = AtomicLong()
private val blocks: Map<Long, S3Block> = s3.listObjects(ListObjectsRequest.builder()
.requestOverrideConfig(requestOverrideConfig)
.bucket(bucketName)
.delimiter("/")
//.prefix("My")
.build()).get().contents().asSequence().filter {
it.key().endsWith(".xd")
}.map {
S3Block(LogUtil.getAddress(it.key()), it)
}.associateBy {
it._address
}
@Suppress("UNCHECKED_CAST")
override fun getBlocks(): Array<Block> {
val files = blocks
val size = files.size
val result = arrayOfNulls<Block>(size)
var i = 0
blocks.forEach {
result[i++] = it.value
}
FileDataReader.sortBlocks(result)
return result as Array<Block>
}
override fun setLog(log: Log) {
}
override fun getLocation(): String = "s3:$bucketName"
override fun removeBlock(blockAddress: Long, rbt: RemoveBlockType) {
// TODO
}
override fun truncateBlock(blockAddress: Long, length: Long) {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun close() {
// TODO
}
override fun getBlock(address: Long): Block {
logger.info { "Get block at ${LogUtil.getLogFilename(address)}" }
return blocks[address]!!
}
override fun clear() {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
private inner class S3Block(val _address: Long, val s3Object: S3Object) : Block {
override fun getAddress() = _address
override fun setReadOnly(): Boolean {
return true
}
override fun length(): Long = s3Object.size()
override fun read(output: ByteArray, position: Long, count: Int): Int {
if (count <= 0) {
return 0
}
val range = "bytes=$position-${position + count - 1}"
logger.debug { "Request range: $range" }
val written = s3.getObject(
GetObjectRequest.builder().range(range)
.requestOverrideConfig(requestOverrideConfig).bucket(bucketName).key(s3Object.key()).build(),
ByteArrayAsyncResponseHandler(output)
).get()
if (written < count) {
logger.debug { "Read underflow: expected $count, got $written" }
}
bytesRead.addAndGet(written.toLong())
return written
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment