Created
May 15, 2018 14:19
-
-
Save morj/cb4b713e05200d5114b62c539e3d7ad7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sandbox | |
import jetbrains.exodus.io.Block | |
import jetbrains.exodus.io.DataReader | |
import jetbrains.exodus.io.FileDataReader | |
import jetbrains.exodus.io.RemoveBlockType | |
import jetbrains.exodus.log.Log | |
import jetbrains.exodus.log.LogUtil | |
import mu.KLogging | |
import software.amazon.awssdk.core.AwsRequestOverrideConfig | |
import software.amazon.awssdk.core.async.AsyncResponseHandler | |
import software.amazon.awssdk.services.s3.S3AsyncClient | |
import software.amazon.awssdk.services.s3.model.GetObjectRequest | |
import software.amazon.awssdk.services.s3.model.GetObjectResponse | |
import software.amazon.awssdk.services.s3.model.ListObjectsRequest | |
import software.amazon.awssdk.services.s3.model.S3Object | |
import java.util.concurrent.atomic.AtomicLong | |
class S3DataReader(val s3: S3AsyncClient, val bucketName: String, val requestOverrideConfig: AwsRequestOverrideConfig? = null) : DataReader { | |
companion object : KLogging() | |
val bytesRead = AtomicLong() | |
private val blocks: Map<Long, S3Block> = s3.listObjects(ListObjectsRequest.builder() | |
.requestOverrideConfig(requestOverrideConfig) | |
.bucket(bucketName) | |
.delimiter("/") | |
//.prefix("My") | |
.build()).get().contents().asSequence().filter { | |
it.key().endsWith(".xd") | |
}.map { | |
S3Block(LogUtil.getAddress(it.key()), it) | |
}.associateBy { | |
it._address | |
} | |
@Suppress("UNCHECKED_CAST") | |
override fun getBlocks(): Array<Block> { | |
val files = blocks | |
val size = files.size | |
val result = arrayOfNulls<Block>(size) | |
var i = 0 | |
blocks.forEach { | |
result[i++] = it.value | |
} | |
FileDataReader.sortBlocks(result) | |
return result as Array<Block> | |
} | |
override fun setLog(log: Log) { | |
} | |
override fun getLocation(): String = "s3:$bucketName" | |
override fun removeBlock(blockAddress: Long, rbt: RemoveBlockType) { | |
// TODO | |
} | |
override fun truncateBlock(blockAddress: Long, length: Long) { | |
TODO("not implemented") //To change body of created functions use File | Settings | File Templates. | |
} | |
override fun close() { | |
// TODO | |
} | |
override fun getBlock(address: Long): Block { | |
logger.info { "Get block at ${LogUtil.getLogFilename(address)}" } | |
return blocks[address]!! | |
} | |
override fun clear() { | |
TODO("not implemented") //To change body of created functions use File | Settings | File Templates. | |
} | |
private inner class S3Block(val _address: Long, val s3Object: S3Object) : Block { | |
override fun getAddress() = _address | |
override fun setReadOnly(): Boolean { | |
return true | |
} | |
override fun length(): Long = s3Object.size() | |
override fun read(output: ByteArray, position: Long, count: Int): Int { | |
if (count <= 0) { | |
return 0 | |
} | |
val range = "bytes=$position-${position + count - 1}" | |
logger.debug { "Request range: $range" } | |
val written = s3.getObject( | |
GetObjectRequest.builder().range(range) | |
.requestOverrideConfig(requestOverrideConfig).bucket(bucketName).key(s3Object.key()).build(), | |
ByteArrayAsyncResponseHandler(output) | |
).get() | |
if (written < count) { | |
logger.debug { "Read underflow: expected $count, got $written" } | |
} | |
bytesRead.addAndGet(written.toLong()) | |
return written | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment