Skip to content

Instantly share code, notes, and snippets.

@Exerosis
Created June 6, 2018 05:58
Show Gist options
  • Save Exerosis/c7eae061ed5246b3b66c4e7333c12cba to your computer and use it in GitHub Desktop.
Save Exerosis/c7eae061ed5246b3b66c4e7333c12cba to your computer and use it in GitHub Desktop.
package com.mynt.network.implementation.sequential
import com.mynt.network.ReadCoordinator
import com.mynt.network.implementation.Holder
import com.mynt.network.implementation.InProgressException
import java.nio.ByteBuffer
import java.nio.channels.CompletionHandler
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class SequentialReadCoordinator(
private val read: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit
) : ReadCoordinator {
private abstract class Handler<Type> : Holder<Continuation<Type>>(), CompletionHandler<Int, ByteBuffer> {
var required = 0
override fun failed(reason: Throwable, buffer: ByteBuffer) {
release().resumeWithException(reason)
}
}
private val arrayHandler = object : Handler<ByteArray>() {
var offset = 0
var array: ByteArray? = null
operator fun invoke(
using: ByteBuffer,
array: ByteArray,
offset: Int,
amount: Int,
continuation: Continuation<ByteArray>
): Any {
if (!hold(continuation))
throw InProgressException()
val remaining = using.remaining()
if (remaining > 0) {
if (remaining >= amount) {
using.get(array, offset, amount)
this.offset = offset
return array
}
using.get(array, offset, remaining)
this.offset = offset + remaining
}
required = amount - remaining
this.array = array
using.flip()
read(using, this)
return COROUTINE_SUSPENDED
}
override fun completed(count: Int, buffer: ByteBuffer) {
val current = required
required -= count
if (required < 1) {
buffer.flip()
buffer.get(array, offset, current)
release().resume(array!!)
} else {
if (buffer.remaining() < required) {
val remaining = buffer.position()
buffer.flip()
buffer.get(array, offset, remaining).clear()
offset += remaining
}
read(buffer, this)
}
}
}
private val bufferHandler = object : Handler<ByteBuffer>() {
operator fun invoke(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any {
//TODO we could use required instead, but maybe nulling out is good?
if (!hold(continuation))
throw InProgressException()
if (using.hasRemaining())
destination.put(using)
required = destination.remaining()
return if (required < 1)
destination
else {
read(destination, this)
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, destination: ByteBuffer) {
required -= count
if (required < 1) {
release().resume(destination)
} else {
read(destination, this)
}
}
}
private val primitiveHandler = object : Handler<Number>() {
var marked = false
var converter: ((ByteBuffer) -> Number)? = null
operator fun invoke(
using: ByteBuffer,
amount: Int,
converter: (ByteBuffer) -> Number,
continuation: Continuation<Number>
): Any {
val remaining = using.remaining()
return if (remaining >= amount)
converter(using)
else {
if (!hold(continuation))
throw InProgressException()
this.converter = converter
required = amount - remaining
val capacity = using.capacity()
val limit = using.limit()
if (remaining == 0)
using.clear()
if (capacity - limit < required)
using.compact()
else {
marked = true
using.mark().position(limit).limit(capacity)
}
read(using, this)
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, buffer: ByteBuffer) {
required -= count
if (required < 1) {
if (marked) {
marked = false
val limit = buffer.position()
buffer.reset()
buffer.limit(limit)
} else
buffer.flip()
release().resume(converter!!.invoke(buffer))
}
}
}
override fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
) = bufferHandler(using, destination, continuation)
override fun array(
using: ByteBuffer,
array: ByteArray,
amount: Int,
offset: Int,
continuation: Continuation<ByteArray>
) = arrayHandler(using, array, offset, amount, continuation)
override fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
reader: (ByteBuffer) -> Type,
continuation: Continuation<Type>
) = primitiveHandler(using, amount, reader, continuation as Continuation<Number>)
}
package com.mynt.network.implementation.sequential
import com.mynt.network.WriteCoordinator
import com.mynt.network.implementation.Holder
import com.mynt.network.implementation.InProgressException
import java.nio.ByteBuffer
import java.nio.channels.CompletionHandler
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
//TODO Impl fully.
class SequentialWriteCoordinator(
private val write: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit
) : WriteCoordinator {
private val bufferHandler = object :
Holder<Continuation<Unit>>(),
CompletionHandler<Int, ByteBuffer> {
var required = 0
override fun completed(count: Int, buffer: ByteBuffer) {
required -= count
if (required < 1)
release().resume(Unit)
else
write(buffer, this)
}
operator fun invoke(
using: ByteBuffer,
buffer: ByteBuffer,
continuation: Continuation<Unit>
): Any {
if (!hold(continuation))
throw InProgressException()
required = buffer.remaining()
return if (required < 1) Unit else {
write(buffer, this)
COROUTINE_SUSPENDED
}
}
override fun failed(reason: Throwable, buffer: ByteBuffer) {
release().resumeWithException(reason)
}
}
override fun array(
using: ByteBuffer,
array: ByteArray,
amount: Int,
offset: Int,
continuation: Continuation<Unit>
) = bufferHandler(using, ByteBuffer.wrap(array, offset, amount), continuation)
override fun buffer(
using: ByteBuffer,
buffer: ByteBuffer,
continuation: Continuation<Unit>
) = bufferHandler(using, buffer, continuation)
override fun number(
using: ByteBuffer,
writer: (ByteBuffer) -> Number,
continuation: Continuation<Unit>
): Any {
using.clear()
writer(using)
return bufferHandler(using, using, continuation)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment