Skip to content

Instantly share code, notes, and snippets.

@Exerosis
Created May 29, 2018 08:53
Show Gist options
  • Save Exerosis/31ab7e972741b108aa87eed7e62b858f to your computer and use it in GitHub Desktop.
Save Exerosis/31ab7e972741b108aa87eed7e62b858f to your computer and use it in GitHub Desktop.
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha.coordinator
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.Continuation
//TODO Implement maybe?
class QueuedReadCoordinator : ReadCoordinator {
override fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any {
throw UnsupportedOperationException("Stub")
}
override fun array(
using: ByteBuffer,
array: ByteArray,
offset: Int,
amount: Int,
continuation: Continuation<ByteArray>
): Any {
throw UnsupportedOperationException("Stub")
}
override fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
converter: (ByteBuffer) -> Type,
continuation: Continuation<Type>
): Any {
throw UnsupportedOperationException("Stub")
}
}
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha.coordinator
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.Continuation
interface ReadCoordinator {
fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any
fun array(
using: ByteBuffer,
array: ByteArray,
offset: Int,
amount: Int,
continuation: Continuation<ByteArray>
): Any
fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
converter: (ByteBuffer) -> Type,
continuation: Continuation<Type>
): Any
}
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha.coordinator
import java.nio.ByteBuffer
import java.nio.channels.CompletionHandler
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class SequentialReadCoordinator(
private val read: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit
) : ReadCoordinator {
class ReadInProgressException : IllegalStateException()
private abstract class Handler<Type> : CompletionHandler<Int, ByteBuffer> {
var required = 0
var continuation: Continuation<Type>? = null
override fun failed(reason: Throwable, buffer: ByteBuffer) {
continuation!!.resumeWithException(reason)
continuation = null
}
}
private val arrayHandler: (ByteBuffer, ByteArray, Int, Int, Continuation<ByteArray>) -> Any = object :
(ByteBuffer, ByteArray, Int, Int, Continuation<ByteArray>) -> Any,
Handler<ByteArray>() {
var offset = 0
var array: ByteArray? = null
override fun invoke(
using: ByteBuffer,
array: ByteArray,
offset: Int,
amount: Int,
continuation: Continuation<ByteArray>
): Any {
if (this.continuation != null)
throw ReadInProgressException()
val remaining = using.remaining()
return if (remaining >= amount) {
using.get(array, offset, amount)
array
} else {
required = amount - remaining
this.continuation = continuation
this.array = array
this.offset = offset - remaining
using.flip()
if (this.offset != offset)
read(using, this)
else
completed(remaining, using)
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, buffer: ByteBuffer) {
required -= count
if (required < 1) {
buffer.flip()
buffer.get(array, offset, required).clear()
continuation!!.resume(array!!)
continuation = null
} else {
if (buffer.remaining() < required) {
val remaining = buffer.position()
buffer.flip()
buffer.get(array, offset, remaining).clear()
offset += remaining
} else
read(buffer, this)
}
}
}
private val bufferHandler: (ByteBuffer, ByteBuffer, Continuation<ByteBuffer>) -> Any = object :
(ByteBuffer, ByteBuffer, Continuation<ByteBuffer>) -> Any,
Handler<ByteBuffer>() {
override fun invoke(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any {
//TODO we could use required instead, but maybe nulling out is good?
if (this.continuation != null)
throw ReadInProgressException()
if (using.hasRemaining())
destination.put(using)
required = destination.remaining()
return if (required < 1)
destination
else {
this.continuation = continuation
read(destination, this)
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, destination: ByteBuffer) {
required -= count
if (required < 1) {
continuation!!.resume(destination)
continuation = null
} else {
read(destination, this)
}
}
}
private val primitiveHandler: (ByteBuffer, Int, (ByteBuffer) -> Number, Continuation<Number>) -> Any = object :
(ByteBuffer, Int, (ByteBuffer) -> Number, Continuation<Number>) -> Any,
Handler<Number>() {
var marked = false
var converter: ((ByteBuffer) -> Number)? = null
override fun invoke(
using: ByteBuffer,
amount: Int,
converter: (ByteBuffer) -> Number,
continuation: Continuation<Number>
): Any {
if (this.continuation != null)
throw ReadInProgressException()
val remaining = using.remaining()
return if (remaining >= amount)
converter(using.flip() as ByteBuffer)
else {
this.continuation = continuation
this.converter = converter
required = amount - remaining
if (remaining == 0)
using.clear()
else {
val capacity = using.capacity()
val limit = using.limit()
if (capacity - limit < required) {
using.compact()
} else {
marked = true
using.mark().position(limit).limit(capacity)
}
read(using, this)
}
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, buffer: ByteBuffer) {
required -= count
if (required < 1) {
if (marked) {
marked = false
buffer.reset()
} else
buffer.flip()
continuation!!.resume(converter!!.invoke(buffer))
converter = null
continuation = null
}
}
}
override fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
) = bufferHandler(using, destination, continuation)
override fun array(
using: ByteBuffer,
array: ByteArray,
offset: Int,
amount: Int,
continuation: Continuation<ByteArray>
) = arrayHandler(using, array, offset, amount, continuation)
override fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
converter: (ByteBuffer) -> Type,
continuation: Continuation<Type>
) = primitiveHandler(using, amount, converter, continuation as Continuation<Number>)
}
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha.read
import java.nio.ByteBuffer
interface QueuedRead : Read {
//--Complex--
fun buffer(callback: (ByteBuffer) -> Unit)
fun array(callback: (ByteArray) -> Unit)
//--Primitive--
fun byte(callback: (Byte) -> Unit)
fun short(callback: (Short) -> Unit)
fun int(callback: (Int) -> Unit)
fun float(callback: (Float) -> Unit)
fun long(callback: (Long) -> Unit)
fun double(callback: (Double) -> Unit)
}
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha.read
interface Read {
fun skip(amount: Number)
//TODO maybe other things cross between the Queued and Sequential Reads?
}
package com.mynt.proxy.alpha.tcp.alpha.alpha.alpha.alpha.read
import java.nio.ByteBuffer
interface SequentialRead : Read {
//--Complex--
suspend fun buffer(): ByteBuffer
suspend fun array(): ByteArray
//--Primitive--
suspend fun byte(): Byte
suspend fun short(): Short
suspend fun int(): Int
suspend fun float(): Float
suspend fun long(): Long
suspend fun double(): Double
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment