Created
May 29, 2018 08:53
-
-
Save Exerosis/31ab7e972741b108aa87eed7e62b858f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha.coordinator | |
import java.nio.ByteBuffer | |
import kotlin.coroutines.experimental.Continuation | |
//TODO Implement maybe? | |
class QueuedReadCoordinator : ReadCoordinator { | |
override fun buffer( | |
using: ByteBuffer, | |
destination: ByteBuffer, | |
continuation: Continuation<ByteBuffer> | |
): Any { | |
throw UnsupportedOperationException("Stub") | |
} | |
override fun array( | |
using: ByteBuffer, | |
array: ByteArray, | |
offset: Int, | |
amount: Int, | |
continuation: Continuation<ByteArray> | |
): Any { | |
throw UnsupportedOperationException("Stub") | |
} | |
override fun <Type : Number> number( | |
using: ByteBuffer, | |
amount: Int, | |
converter: (ByteBuffer) -> Type, | |
continuation: Continuation<Type> | |
): Any { | |
throw UnsupportedOperationException("Stub") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha.coordinator | |
import java.nio.ByteBuffer | |
import kotlin.coroutines.experimental.Continuation | |
interface ReadCoordinator { | |
fun buffer( | |
using: ByteBuffer, | |
destination: ByteBuffer, | |
continuation: Continuation<ByteBuffer> | |
): Any | |
fun array( | |
using: ByteBuffer, | |
array: ByteArray, | |
offset: Int, | |
amount: Int, | |
continuation: Continuation<ByteArray> | |
): Any | |
fun <Type : Number> number( | |
using: ByteBuffer, | |
amount: Int, | |
converter: (ByteBuffer) -> Type, | |
continuation: Continuation<Type> | |
): Any | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha.coordinator | |
import java.nio.ByteBuffer | |
import java.nio.channels.CompletionHandler | |
import kotlin.coroutines.experimental.Continuation | |
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED | |
class SequentialReadCoordinator( | |
private val read: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit | |
) : ReadCoordinator { | |
class ReadInProgressException : IllegalStateException() | |
private abstract class Handler<Type> : CompletionHandler<Int, ByteBuffer> { | |
var required = 0 | |
var continuation: Continuation<Type>? = null | |
override fun failed(reason: Throwable, buffer: ByteBuffer) { | |
continuation!!.resumeWithException(reason) | |
continuation = null | |
} | |
} | |
private val arrayHandler: (ByteBuffer, ByteArray, Int, Int, Continuation<ByteArray>) -> Any = object : | |
(ByteBuffer, ByteArray, Int, Int, Continuation<ByteArray>) -> Any, | |
Handler<ByteArray>() { | |
var offset = 0 | |
var array: ByteArray? = null | |
override fun invoke( | |
using: ByteBuffer, | |
array: ByteArray, | |
offset: Int, | |
amount: Int, | |
continuation: Continuation<ByteArray> | |
): Any { | |
if (this.continuation != null) | |
throw ReadInProgressException() | |
val remaining = using.remaining() | |
return if (remaining >= amount) { | |
using.get(array, offset, amount) | |
array | |
} else { | |
required = amount - remaining | |
this.continuation = continuation | |
this.array = array | |
this.offset = offset - remaining | |
using.flip() | |
if (this.offset != offset) | |
read(using, this) | |
else | |
completed(remaining, using) | |
COROUTINE_SUSPENDED | |
} | |
} | |
override fun completed(count: Int, buffer: ByteBuffer) { | |
required -= count | |
if (required < 1) { | |
buffer.flip() | |
buffer.get(array, offset, required).clear() | |
continuation!!.resume(array!!) | |
continuation = null | |
} else { | |
if (buffer.remaining() < required) { | |
val remaining = buffer.position() | |
buffer.flip() | |
buffer.get(array, offset, remaining).clear() | |
offset += remaining | |
} else | |
read(buffer, this) | |
} | |
} | |
} | |
private val bufferHandler: (ByteBuffer, ByteBuffer, Continuation<ByteBuffer>) -> Any = object : | |
(ByteBuffer, ByteBuffer, Continuation<ByteBuffer>) -> Any, | |
Handler<ByteBuffer>() { | |
override fun invoke( | |
using: ByteBuffer, | |
destination: ByteBuffer, | |
continuation: Continuation<ByteBuffer> | |
): Any { | |
//TODO we could use required instead, but maybe nulling out is good? | |
if (this.continuation != null) | |
throw ReadInProgressException() | |
if (using.hasRemaining()) | |
destination.put(using) | |
required = destination.remaining() | |
return if (required < 1) | |
destination | |
else { | |
this.continuation = continuation | |
read(destination, this) | |
COROUTINE_SUSPENDED | |
} | |
} | |
override fun completed(count: Int, destination: ByteBuffer) { | |
required -= count | |
if (required < 1) { | |
continuation!!.resume(destination) | |
continuation = null | |
} else { | |
read(destination, this) | |
} | |
} | |
} | |
private val primitiveHandler: (ByteBuffer, Int, (ByteBuffer) -> Number, Continuation<Number>) -> Any = object : | |
(ByteBuffer, Int, (ByteBuffer) -> Number, Continuation<Number>) -> Any, | |
Handler<Number>() { | |
var marked = false | |
var converter: ((ByteBuffer) -> Number)? = null | |
override fun invoke( | |
using: ByteBuffer, | |
amount: Int, | |
converter: (ByteBuffer) -> Number, | |
continuation: Continuation<Number> | |
): Any { | |
if (this.continuation != null) | |
throw ReadInProgressException() | |
val remaining = using.remaining() | |
return if (remaining >= amount) | |
converter(using.flip() as ByteBuffer) | |
else { | |
this.continuation = continuation | |
this.converter = converter | |
required = amount - remaining | |
if (remaining == 0) | |
using.clear() | |
else { | |
val capacity = using.capacity() | |
val limit = using.limit() | |
if (capacity - limit < required) { | |
using.compact() | |
} else { | |
marked = true | |
using.mark().position(limit).limit(capacity) | |
} | |
read(using, this) | |
} | |
COROUTINE_SUSPENDED | |
} | |
} | |
override fun completed(count: Int, buffer: ByteBuffer) { | |
required -= count | |
if (required < 1) { | |
if (marked) { | |
marked = false | |
buffer.reset() | |
} else | |
buffer.flip() | |
continuation!!.resume(converter!!.invoke(buffer)) | |
converter = null | |
continuation = null | |
} | |
} | |
} | |
override fun buffer( | |
using: ByteBuffer, | |
destination: ByteBuffer, | |
continuation: Continuation<ByteBuffer> | |
) = bufferHandler(using, destination, continuation) | |
override fun array( | |
using: ByteBuffer, | |
array: ByteArray, | |
offset: Int, | |
amount: Int, | |
continuation: Continuation<ByteArray> | |
) = arrayHandler(using, array, offset, amount, continuation) | |
override fun <Type : Number> number( | |
using: ByteBuffer, | |
amount: Int, | |
converter: (ByteBuffer) -> Type, | |
continuation: Continuation<Type> | |
) = primitiveHandler(using, amount, converter, continuation as Continuation<Number>) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha.read | |
import java.nio.ByteBuffer | |
interface QueuedRead : Read { | |
//--Complex-- | |
fun buffer(callback: (ByteBuffer) -> Unit) | |
fun array(callback: (ByteArray) -> Unit) | |
//--Primitive-- | |
fun byte(callback: (Byte) -> Unit) | |
fun short(callback: (Short) -> Unit) | |
fun int(callback: (Int) -> Unit) | |
fun float(callback: (Float) -> Unit) | |
fun long(callback: (Long) -> Unit) | |
fun double(callback: (Double) -> Unit) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha.read | |
interface Read { | |
fun skip(amount: Number) | |
//TODO maybe other things cross between the Queued and Sequential Reads? | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha.read | |
import java.nio.ByteBuffer | |
interface SequentialRead : Read { | |
//--Complex-- | |
suspend fun buffer(): ByteBuffer | |
suspend fun array(): ByteArray | |
//--Primitive-- | |
suspend fun byte(): Byte | |
suspend fun short(): Short | |
suspend fun int(): Int | |
suspend fun float(): Float | |
suspend fun long(): Long | |
suspend fun double(): Double | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment