Created
May 29, 2018 08:44
-
-
Save Exerosis/c2bd63832e3a92ce0ba247c74f6915ea to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha | |
import java.nio.ByteBuffer | |
import kotlin.coroutines.experimental.Continuation | |
interface ReadCoordinator { | |
fun buffer( | |
using: ByteBuffer, | |
destination: ByteBuffer, | |
continuation: Continuation<ByteBuffer> | |
): Any | |
fun array( | |
using: ByteBuffer, | |
array: ByteArray, | |
offset: Int, | |
amount: Int, | |
continuation: Continuation<ByteArray> | |
): Any | |
fun <Type : Number> number( | |
using: ByteBuffer, | |
amount: Int, | |
converter: (ByteBuffer) -> Type, | |
continuation: Continuation<Type> | |
): Any | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha | |
import java.nio.ByteBuffer | |
interface SequentialRead { | |
//--Complex-- | |
suspend fun buffer(): ByteBuffer | |
suspend fun array(): ByteArray | |
//--Primitive-- | |
suspend fun byte(): Byte | |
suspend fun short(): Short | |
suspend fun int(): Int | |
suspend fun float(): Float | |
suspend fun long(): Long | |
suspend fun double(): Double | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha | |
import java.nio.ByteBuffer | |
import java.nio.channels.CompletionHandler | |
import kotlin.coroutines.experimental.Continuation | |
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED | |
class SequentialReadCoordinator( | |
private val read: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit | |
) : ReadCoordinator { | |
class ReadInProgressException : IllegalStateException() | |
private abstract class Handler<Type> : CompletionHandler<Int, ByteBuffer> { | |
var required = 0 | |
var continuation: Continuation<Type>? = null | |
override fun failed(reason: Throwable, buffer: ByteBuffer) { | |
continuation!!.resumeWithException(reason) | |
continuation = null | |
} | |
} | |
private val arrayHandler: (ByteBuffer, ByteArray, Int, Int, Continuation<ByteArray>) -> Any = object : | |
(ByteBuffer, ByteArray, Int, Int, Continuation<ByteArray>) -> Any, | |
Handler<ByteArray>() { | |
var offset = 0 | |
var array: ByteArray? = null | |
override fun invoke( | |
using: ByteBuffer, | |
array: ByteArray, | |
offset: Int, | |
amount: Int, | |
continuation: Continuation<ByteArray> | |
): Any { | |
if (this.continuation != null) | |
throw ReadInProgressException() | |
val remaining = using.remaining() | |
return if (remaining >= amount) { | |
using.get(array, offset, amount) | |
array | |
} else { | |
required = amount - remaining | |
this.continuation = continuation | |
this.array = array | |
this.offset = offset - remaining | |
using.flip() | |
if (this.offset != offset) | |
read(using, this) | |
else | |
completed(remaining, using) | |
COROUTINE_SUSPENDED | |
} | |
} | |
override fun completed(count: Int, buffer: ByteBuffer) { | |
required -= count | |
if (required < 1) { | |
buffer.flip() | |
buffer.get(array, offset, required).clear() | |
continuation!!.resume(array!!) | |
continuation = null | |
} else { | |
if (buffer.remaining() < required) { | |
val remaining = buffer.position() | |
buffer.flip() | |
buffer.get(array, offset, remaining).clear() | |
offset += remaining | |
} else | |
read(buffer, this) | |
} | |
} | |
} | |
private val bufferHandler: (ByteBuffer, ByteBuffer, Continuation<ByteBuffer>) -> Any = object : | |
(ByteBuffer, ByteBuffer, Continuation<ByteBuffer>) -> Any, | |
Handler<ByteBuffer>() { | |
override fun invoke( | |
using: ByteBuffer, | |
destination: ByteBuffer, | |
continuation: Continuation<ByteBuffer> | |
): Any { | |
//TODO we could use required instead, but maybe nulling out is good? | |
if (this.continuation != null) | |
throw ReadInProgressException() | |
if (using.hasRemaining()) | |
destination.put(using) | |
required = destination.remaining() | |
return if (required < 1) | |
destination | |
else { | |
this.continuation = continuation | |
read(destination, this) | |
COROUTINE_SUSPENDED | |
} | |
} | |
override fun completed(count: Int, destination: ByteBuffer) { | |
required -= count | |
if (required < 1) { | |
continuation!!.resume(destination) | |
continuation = null | |
} else { | |
read(destination, this) | |
} | |
} | |
} | |
private val primitiveHandler: (ByteBuffer, Int, (ByteBuffer) -> Number, Continuation<Number>) -> Any = object : | |
(ByteBuffer, Int, (ByteBuffer) -> Number, Continuation<Number>) -> Any, | |
Handler<Number>() { | |
var marked = false | |
var converter: ((ByteBuffer) -> Number)? = null | |
override fun invoke( | |
using: ByteBuffer, | |
amount: Int, | |
converter: (ByteBuffer) -> Number, | |
continuation: Continuation<Number> | |
): Any { | |
if (this.continuation != null) | |
throw ReadInProgressException() | |
val remaining = using.remaining() | |
return if (remaining >= amount) | |
converter(using.flip() as ByteBuffer) | |
else { | |
this.continuation = continuation | |
this.converter = converter | |
required = amount - remaining | |
if (remaining == 0) | |
using.clear() | |
else { | |
val capacity = using.capacity() | |
val limit = using.limit() | |
if (capacity - limit < required) { | |
using.compact() | |
} else { | |
marked = true | |
using.mark().position(limit).limit(capacity) | |
} | |
read(using, this) | |
} | |
COROUTINE_SUSPENDED | |
} | |
} | |
override fun completed(count: Int, buffer: ByteBuffer) { | |
required -= count | |
if (required < 1) { | |
if (marked) { | |
marked = false | |
buffer.reset() | |
} else | |
buffer.flip() | |
continuation!!.resume(converter!!.invoke(buffer)) | |
converter = null | |
continuation = null | |
} | |
} | |
} | |
override fun buffer( | |
using: ByteBuffer, | |
destination: ByteBuffer, | |
continuation: Continuation<ByteBuffer> | |
) = bufferHandler(using, destination, continuation) | |
override fun array( | |
using: ByteBuffer, | |
array: ByteArray, | |
offset: Int, | |
amount: Int, | |
continuation: Continuation<ByteArray> | |
) = arrayHandler(using, array, offset, amount, continuation) | |
override fun <Type : Number> number( | |
using: ByteBuffer, | |
amount: Int, | |
converter: (ByteBuffer) -> Type, | |
continuation: Continuation<Type> | |
) = primitiveHandler(using, amount, converter, continuation as Continuation<Number>) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment