Skip to content

Instantly share code, notes, and snippets.

@Exerosis
Created June 3, 2018 15:08
Show Gist options
  • Save Exerosis/4a511fa3feb883adaf2119d3b190dbae to your computer and use it in GitHub Desktop.
Save Exerosis/4a511fa3feb883adaf2119d3b190dbae to your computer and use it in GitHub Desktop.
package com.mynt.network
interface Connection : AutoCloseable {
val read: Read
val write: Write
val isOpen: Boolean
}
package com.mynt.network.implementation
import com.mynt.network.Connection
import com.mynt.network.Read
import com.mynt.network.Write
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.EmptyCoroutineContext
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineUninterceptedOrReturn
val BYTE = { buffer: ByteBuffer -> buffer.get() }
val SHORT = { buffer: ByteBuffer -> buffer.short }
val INT = { buffer: ByteBuffer -> buffer.int }
val FLOAT = { buffer: ByteBuffer -> buffer.float }
val LONG = { buffer: ByteBuffer -> buffer.long }
val DOUBLE = { buffer: ByteBuffer -> buffer.double }
suspend inline fun <Type> continued(
crossinline block: (Continuation<Type>) -> Any?
) = suspendCoroutineUninterceptedOrReturn(block)
fun <Type> continuation(callback: (Type) -> Unit) = object : Continuation<Type> {
override val context = EmptyCoroutineContext
override fun resume(value: Type) = callback(value)
override fun resumeWithException(exception: Throwable) = throw exception
}
//TODO little bit of a weird way of doing this, but I think it makes sense.
inline fun Connection.read(
block: Read.() -> Unit
) = block(read)
inline fun Connection.write(
block: Write.() -> Unit
) = block(write)
open class Holder<Type> {
protected var value: Type? = null
fun hold(value: Type): Boolean {
if (this.value != null)
return false
this.value = value
return true
}
fun release(): Type {
val temp = value!!
value = null
return temp
}
}
open class InProgressException : IllegalStateException()
package com.mynt.network.implementation
import com.mynt.network.Connection
import com.mynt.network.Read
import com.mynt.network.Write
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.EmptyCoroutineContext
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineUninterceptedOrReturn
val BYTE = { buffer: ByteBuffer -> buffer.get() }
val SHORT = { buffer: ByteBuffer -> buffer.short }
val INT = { buffer: ByteBuffer -> buffer.int }
val FLOAT = { buffer: ByteBuffer -> buffer.float }
val LONG = { buffer: ByteBuffer -> buffer.long }
val DOUBLE = { buffer: ByteBuffer -> buffer.double }
suspend inline fun <Type> continued(
crossinline block: (Continuation<Type>) -> Any?
) = suspendCoroutineUninterceptedOrReturn(block)
fun <Type> continuation(callback: (Type) -> Unit) = object : Continuation<Type> {
override val context = EmptyCoroutineContext
override fun resume(value: Type) = callback(value)
override fun resumeWithException(exception: Throwable) = throw exception
}
//TODO little bit of a weird way of doing this, but I think it makes sense.
inline fun Connection.read(
block: Read.() -> Unit
) = block(read)
inline fun Connection.write(
block: Write.() -> Unit
) = block(write)
open class Holder<Type> {
protected var value: Type? = null
fun hold(value: Type): Boolean {
if (this.value != null)
return false
this.value = value
return true
}
fun release(): Type {
val temp = value!!
value = null
return temp
}
}
open class InProgressException : IllegalStateException()
package com.mynt.network.implementation.nio
import com.mynt.network.Connection
import java.nio.channels.Channel
abstract class ChannelConnection(
private val channel: Channel
) : Connection {
override val isOpen
get() = channel.isOpen
override fun close() = channel.close()
}
package com.mynt.network.implementation.nio
import com.mynt.network.Read
import com.mynt.network.Write
import java.nio.ByteBuffer
import java.nio.channels.DatagramChannel
class DatagramSocketConnection(
channel: DatagramChannel,
output: ByteBuffer,
input: ByteBuffer
) : ChannelConnection(channel) {
override val read: Read
get() = throw UnsupportedOperationException("Coming soon!")
override val write: Write
get() = throw UnsupportedOperationException("Coming soon!")
}
package com.mynt.network.implementation.nio
class WebSocketConnection {
}
package com.mynt.network.implementation.queued
import com.mynt.network.ReadCoordinator
import com.mynt.network.implementation.*
import com.mynt.network.implementation.sequential.SequentialRead
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class QueuedRead(
private val read: ReadCoordinator,
private val buffer: ByteBuffer
) : SequentialRead(read, buffer) {
//--Complex--
fun buffer(buffer: ByteBuffer, callback: (ByteBuffer) -> Unit) {
val result = read.buffer(this.buffer, buffer, continuation(callback))
if (result != COROUTINE_SUSPENDED)
callback(result as ByteBuffer)
}
fun array(
array: ByteArray,
amount: Int = array.size,
offset: Int = 0,
callback: (ByteArray) -> Unit) {
val result = read.array(buffer, array, offset, amount, continuation(callback))
if (result != COROUTINE_SUSPENDED)
callback(result as ByteArray)
}
@Suppress("UNCHECKED_CAST")
private fun <Type : Number> number(
amount: Int,
converter: (ByteBuffer) -> Type,
callback: (Type) -> Unit
) {
val result = read.number(buffer, amount, converter, continuation(callback))
if (result != COROUTINE_SUSPENDED)
callback(result as Type)
}
//--Primitive--
fun byte(callback: (Byte) -> Unit) = number(1, BYTE, callback)
fun short(callback: (Short) -> Unit) = number(2, SHORT, callback)
fun int(callback: (Int) -> Unit) = number(4, INT, callback)
fun float(callback: (Float) -> Unit) = number(4, FLOAT, callback)
fun long(callback: (Long) -> Unit) = number(8, LONG, callback)
fun double(callback: (Double) -> Unit) = number(8, DOUBLE, callback)
}
package com.mynt.network.implementation.queued
import com.mynt.network.ReadCoordinator
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.Continuation
//TODO Implement.
class QueuedReadCoordinator : ReadCoordinator {
override fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any {
throw UnsupportedOperationException("Stub")
}
override fun array(
using: ByteBuffer,
array: ByteArray,
amount: Int,
offset: Int,
continuation: Continuation<ByteArray>
): Any {
throw UnsupportedOperationException("Stub")
}
override fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
reader: (ByteBuffer) -> Type,
continuation: Continuation<Type>
): Any {
throw UnsupportedOperationException("Stub")
}
}
package com.mynt.network.implementation.queued
import com.mynt.network.implementation.sequential.SequentialWrite
import com.mynt.network.implementation.sequential.SequentialWriteCoordinator
import java.nio.ByteBuffer
class QueuedWrite(
private val write: SequentialWriteCoordinator,
private val buffer: ByteBuffer
) : SequentialWrite(write, buffer) {
}
package com.mynt.network.implementation.queued
class QueuedWriteCoordinator {
}
package com.mynt.network.implementation.sequential
import com.mynt.network.Read
import com.mynt.network.ReadCoordinator
import com.mynt.network.implementation.*
import java.nio.ByteBuffer
//TODO this probably can't stay static forever.
open class SequentialRead(
private val read: ReadCoordinator,
private val buffer: ByteBuffer
) : Read {
//--Complex--
override suspend fun array(
array: ByteArray,
amount: Int,
offset: Int
) = continued<ByteArray> {
read.array(buffer, array, amount, offset, it)
}
override suspend fun buffer(
buffer: ByteBuffer
) = continued<ByteBuffer> {
read.buffer(this.buffer, buffer, it)
}
//--Primitive--
override suspend fun byte() = continued<Byte> {
read.number(buffer, 1, BYTE, it)
}
override suspend fun short() = continued<Short> {
read.number(buffer, 2, SHORT, it)
}
override suspend fun int() = continued<Int> {
read.number(buffer, 4, INT, it)
}
override suspend fun float() = continued<Float> {
read.number(buffer, 4, FLOAT, it)
}
override suspend fun long() = continued<Long> {
read.number(buffer, 8, LONG, it)
}
override suspend fun double() = continued<Double> {
read.number(buffer, 8, DOUBLE, it)
}
}
package com.mynt.network.implementation.sequential
import com.mynt.network.ReadCoordinator
import com.mynt.network.implementation.Holder
import com.mynt.network.implementation.InProgressException
import java.nio.ByteBuffer
import java.nio.channels.CompletionHandler
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class SequentialReadCoordinator(
private val read: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit
) : ReadCoordinator {
private abstract class ReadHandler<Type> : Holder<Continuation<Type>>(), CompletionHandler<Int, ByteBuffer> {
var required = 0
override fun failed(reason: Throwable, buffer: ByteBuffer) {
release().resumeWithException(reason)
}
}
private val arrayHandler = object : ReadHandler<ByteArray>() {
var offset = 0
var array: ByteArray? = null
operator fun invoke(
using: ByteBuffer,
array: ByteArray,
offset: Int,
amount: Int,
continuation: Continuation<ByteArray>
): Any {
if (!hold(continuation))
throw InProgressException()
val remaining = using.remaining()
if (remaining > 0) {
if (remaining >= amount) {
using.get(array, offset, amount)
this.offset = offset
return array
}
using.get(array, offset, remaining)
this.offset = offset + remaining
}
required = amount - remaining
this.array = array
using.flip()
read(using, this)
return COROUTINE_SUSPENDED
}
override fun completed(count: Int, buffer: ByteBuffer) {
val current = required
required -= count
if (required < 1) {
buffer.flip()
buffer.get(array, offset, current)
release().resume(array!!)
} else {
if (buffer.remaining() < required) {
val remaining = buffer.position()
buffer.flip()
buffer.get(array, offset, remaining).clear()
offset += remaining
}
read(buffer, this)
}
}
}
private val bufferHandler = object : ReadHandler<ByteBuffer>() {
operator fun invoke(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any {
//TODO we could use required instead, but maybe nulling out is good?
if (!hold(continuation))
throw InProgressException()
if (using.hasRemaining())
destination.put(using)
required = destination.remaining()
return if (required < 1)
destination
else {
read(destination, this)
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, destination: ByteBuffer) {
required -= count
if (required < 1) {
release().resume(destination)
} else {
read(destination, this)
}
}
}
private val primitiveHandler = object : ReadHandler<Number>() {
var marked = false
var converter: ((ByteBuffer) -> Number)? = null
operator fun invoke(
using: ByteBuffer,
amount: Int,
converter: (ByteBuffer) -> Number,
continuation: Continuation<Number>
): Any {
val remaining = using.remaining()
return if (remaining >= amount)
converter(using)
else {
if (!hold(continuation))
throw InProgressException()
this.converter = converter
required = amount - remaining
val capacity = using.capacity()
val limit = using.limit()
if (remaining == 0)
using.clear()
if (capacity - limit < required)
using.compact()
else {
marked = true
using.mark().position(limit).limit(capacity)
}
read(using, this)
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, buffer: ByteBuffer) {
required -= count
if (required < 1) {
if (marked) {
marked = false
val limit = buffer.position()
buffer.reset()
buffer.limit(limit)
} else
buffer.flip()
release().resume(converter!!.invoke(buffer))
}
}
}
override fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
) = bufferHandler(using, destination, continuation)
override fun array(
using: ByteBuffer,
array: ByteArray,
amount: Int,
offset: Int,
continuation: Continuation<ByteArray>
) = arrayHandler(using, array, offset, amount, continuation)
override fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
reader: (ByteBuffer) -> Type,
continuation: Continuation<Type>
) = primitiveHandler(using, amount, reader, continuation as Continuation<Number>)
}
package com.mynt.network.implementation.sequential
import com.mynt.network.Write
import com.mynt.network.implementation.continued
import java.nio.ByteBuffer
open class SequentialWrite(
private val write: SequentialWriteCoordinator,
private val buffer: ByteBuffer
) : Write {
override suspend fun buffer(value: ByteBuffer) = continued<Unit> {
write.test(value, it)
}
override suspend fun byte(value: Byte) = continued<Unit> {
buffer.clear()
buffer.put(value)
write.test(buffer, it)
}
}
package com.mynt.network.implementation.sequential
import com.mynt.network.WriteCoordinator
import java.nio.ByteBuffer
import java.nio.channels.CompletionHandler
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class SequentialWriteCoordinator(
private val write: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit
) : WriteCoordinator {
//TODO think about and deal with writes.
//TODO maybe while ur at sculpture garden today?
override fun test(
using: ByteBuffer,
continuation: Continuation<Unit>
): Any {
write(using, object : CompletionHandler<Int, ByteBuffer> {
var required = using.flip().remaining()
override fun completed(count: Int, buffer: ByteBuffer) {
required -= count
if (required < 1)
continuation.resume(Unit)
else
write(using, this)
}
override fun failed(reason: Throwable, buffer: ByteBuffer) {
continuation.resumeWithException(reason)
}
})
return COROUTINE_SUSPENDED
}
}
package com.mynt.network
interface Connection : AutoCloseable {
val read: Read
val write: Write
val isOpen: Boolean
}
package com.mynt.network.implementation
import com.mynt.network.Connection
import com.mynt.network.Read
import com.mynt.network.Write
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.EmptyCoroutineContext
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineUninterceptedOrReturn
val BYTE = { buffer: ByteBuffer -> buffer.get() }
val SHORT = { buffer: ByteBuffer -> buffer.short }
val INT = { buffer: ByteBuffer -> buffer.int }
val FLOAT = { buffer: ByteBuffer -> buffer.float }
val LONG = { buffer: ByteBuffer -> buffer.long }
val DOUBLE = { buffer: ByteBuffer -> buffer.double }
suspend inline fun <Type> continued(
crossinline block: (Continuation<Type>) -> Any?
) = suspendCoroutineUninterceptedOrReturn(block)
fun <Type> continuation(callback: (Type) -> Unit) = object : Continuation<Type> {
override val context = EmptyCoroutineContext
override fun resume(value: Type) = callback(value)
override fun resumeWithException(exception: Throwable) = throw exception
}
//TODO little bit of a weird way of doing this, but I think it makes sense.
inline fun Connection.read(
block: Read.() -> Unit
) = block(read)
inline fun Connection.write(
block: Write.() -> Unit
) = block(write)
open class Holder<Type> {
protected var value: Type? = null
fun hold(value: Type): Boolean {
if (this.value != null)
return false
this.value = value
return true
}
fun release(): Type {
val temp = value!!
value = null
return temp
}
}
open class InProgressException : IllegalStateException()
package com.mynt.network.implementation.nio
import com.mynt.network.Connection
import java.nio.channels.Channel
abstract class ChannelConnection(
private val channel: Channel
) : Connection {
override val isOpen
get() = channel.isOpen
override fun close() = channel.close()
}
package com.mynt.network.implementation.nio
import com.mynt.network.Read
import com.mynt.network.Write
import java.nio.ByteBuffer
import java.nio.channels.DatagramChannel
class DatagramSocketConnection(
channel: DatagramChannel,
output: ByteBuffer,
input: ByteBuffer
) : ChannelConnection(channel) {
override val read: Read
get() = throw UnsupportedOperationException("Coming soon!")
override val write: Write
get() = throw UnsupportedOperationException("Coming soon!")
}
package com.mynt.network.implementation.nio
class WebSocketConnection {
}
package com.mynt.network.implementation.queued
import com.mynt.network.ReadCoordinator
import com.mynt.network.implementation.*
import com.mynt.network.implementation.sequential.SequentialRead
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class QueuedRead(
private val read: ReadCoordinator,
private val buffer: ByteBuffer
) : SequentialRead(read, buffer) {
//--Complex--
fun buffer(buffer: ByteBuffer, callback: (ByteBuffer) -> Unit) {
val result = read.buffer(this.buffer, buffer, continuation(callback))
if (result != COROUTINE_SUSPENDED)
callback(result as ByteBuffer)
}
fun array(
array: ByteArray,
amount: Int = array.size,
offset: Int = 0,
callback: (ByteArray) -> Unit) {
val result = read.array(buffer, array, offset, amount, continuation(callback))
if (result != COROUTINE_SUSPENDED)
callback(result as ByteArray)
}
@Suppress("UNCHECKED_CAST")
private fun <Type : Number> number(
amount: Int,
converter: (ByteBuffer) -> Type,
callback: (Type) -> Unit
) {
val result = read.number(buffer, amount, converter, continuation(callback))
if (result != COROUTINE_SUSPENDED)
callback(result as Type)
}
//--Primitive--
fun byte(callback: (Byte) -> Unit) = number(1, BYTE, callback)
fun short(callback: (Short) -> Unit) = number(2, SHORT, callback)
fun int(callback: (Int) -> Unit) = number(4, INT, callback)
fun float(callback: (Float) -> Unit) = number(4, FLOAT, callback)
fun long(callback: (Long) -> Unit) = number(8, LONG, callback)
fun double(callback: (Double) -> Unit) = number(8, DOUBLE, callback)
}
package com.mynt.network.implementation.queued
import com.mynt.network.ReadCoordinator
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.Continuation
//TODO Implement.
class QueuedReadCoordinator : ReadCoordinator {
override fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any {
throw UnsupportedOperationException("Stub")
}
override fun array(
using: ByteBuffer,
array: ByteArray,
amount: Int,
offset: Int,
continuation: Continuation<ByteArray>
): Any {
throw UnsupportedOperationException("Stub")
}
override fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
reader: (ByteBuffer) -> Type,
continuation: Continuation<Type>
): Any {
throw UnsupportedOperationException("Stub")
}
}
package com.mynt.network.implementation.queued
import com.mynt.network.implementation.sequential.SequentialWrite
import com.mynt.network.implementation.sequential.SequentialWriteCoordinator
import java.nio.ByteBuffer
class QueuedWrite(
private val write: SequentialWriteCoordinator,
private val buffer: ByteBuffer
) : SequentialWrite(write, buffer) {
}
package com.mynt.network.implementation.queued
class QueuedWriteCoordinator {
}
package com.mynt.network.implementation.sequential
import com.mynt.network.Read
import com.mynt.network.ReadCoordinator
import com.mynt.network.implementation.*
import java.nio.ByteBuffer
//TODO this probably can't stay static forever.
open class SequentialRead(
private val read: ReadCoordinator,
private val buffer: ByteBuffer
) : Read {
//--Complex--
override suspend fun array(
array: ByteArray,
amount: Int,
offset: Int
) = continued<ByteArray> {
read.array(buffer, array, amount, offset, it)
}
override suspend fun buffer(
buffer: ByteBuffer
) = continued<ByteBuffer> {
read.buffer(this.buffer, buffer, it)
}
//--Primitive--
override suspend fun byte() = continued<Byte> {
read.number(buffer, 1, BYTE, it)
}
override suspend fun short() = continued<Short> {
read.number(buffer, 2, SHORT, it)
}
override suspend fun int() = continued<Int> {
read.number(buffer, 4, INT, it)
}
override suspend fun float() = continued<Float> {
read.number(buffer, 4, FLOAT, it)
}
override suspend fun long() = continued<Long> {
read.number(buffer, 8, LONG, it)
}
override suspend fun double() = continued<Double> {
read.number(buffer, 8, DOUBLE, it)
}
}
package com.mynt.network.implementation.sequential
import com.mynt.network.ReadCoordinator
import com.mynt.network.implementation.Holder
import com.mynt.network.implementation.InProgressException
import java.nio.ByteBuffer
import java.nio.channels.CompletionHandler
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class SequentialReadCoordinator(
private val read: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit
) : ReadCoordinator {
private abstract class ReadHandler<Type> : Holder<Continuation<Type>>(), CompletionHandler<Int, ByteBuffer> {
var required = 0
override fun failed(reason: Throwable, buffer: ByteBuffer) {
release().resumeWithException(reason)
}
}
private val arrayHandler = object : ReadHandler<ByteArray>() {
var offset = 0
var array: ByteArray? = null
operator fun invoke(
using: ByteBuffer,
array: ByteArray,
offset: Int,
amount: Int,
continuation: Continuation<ByteArray>
): Any {
if (!hold(continuation))
throw InProgressException()
val remaining = using.remaining()
if (remaining > 0) {
if (remaining >= amount) {
using.get(array, offset, amount)
this.offset = offset
return array
}
using.get(array, offset, remaining)
this.offset = offset + remaining
}
required = amount - remaining
this.array = array
using.flip()
read(using, this)
return COROUTINE_SUSPENDED
}
override fun completed(count: Int, buffer: ByteBuffer) {
val current = required
required -= count
if (required < 1) {
buffer.flip()
buffer.get(array, offset, current)
release().resume(array!!)
} else {
if (buffer.remaining() < required) {
val remaining = buffer.position()
buffer.flip()
buffer.get(array, offset, remaining).clear()
offset += remaining
}
read(buffer, this)
}
}
}
private val bufferHandler = object : ReadHandler<ByteBuffer>() {
operator fun invoke(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any {
//TODO we could use required instead, but maybe nulling out is good?
if (!hold(continuation))
throw InProgressException()
if (using.hasRemaining())
destination.put(using)
required = destination.remaining()
return if (required < 1)
destination
else {
read(destination, this)
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, destination: ByteBuffer) {
required -= count
if (required < 1) {
release().resume(destination)
} else {
read(destination, this)
}
}
}
private val primitiveHandler = object : ReadHandler<Number>() {
var marked = false
var converter: ((ByteBuffer) -> Number)? = null
operator fun invoke(
using: ByteBuffer,
amount: Int,
converter: (ByteBuffer) -> Number,
continuation: Continuation<Number>
): Any {
val remaining = using.remaining()
return if (remaining >= amount)
converter(using)
else {
if (!hold(continuation))
throw InProgressException()
this.converter = converter
required = amount - remaining
val capacity = using.capacity()
val limit = using.limit()
if (remaining == 0)
using.clear()
if (capacity - limit < required)
using.compact()
else {
marked = true
using.mark().position(limit).limit(capacity)
}
read(using, this)
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, buffer: ByteBuffer) {
required -= count
if (required < 1) {
if (marked) {
marked = false
val limit = buffer.position()
buffer.reset()
buffer.limit(limit)
} else
buffer.flip()
release().resume(converter!!.invoke(buffer))
}
}
}
override fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
) = bufferHandler(using, destination, continuation)
override fun array(
using: ByteBuffer,
array: ByteArray,
amount: Int,
offset: Int,
continuation: Continuation<ByteArray>
) = arrayHandler(using, array, offset, amount, continuation)
override fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
reader: (ByteBuffer) -> Type,
continuation: Continuation<Type>
) = primitiveHandler(using, amount, reader, continuation as Continuation<Number>)
}
package com.mynt.network.implementation.sequential
import com.mynt.network.Write
import com.mynt.network.implementation.continued
import java.nio.ByteBuffer
open class SequentialWrite(
private val write: SequentialWriteCoordinator,
private val buffer: ByteBuffer
) : Write {
override suspend fun buffer(value: ByteBuffer) = continued<Unit> {
write.test(value, it)
}
override suspend fun byte(value: Byte) = continued<Unit> {
buffer.clear()
buffer.put(value)
write.test(buffer, it)
}
}
package com.mynt.network.implementation.sequential
import com.mynt.network.WriteCoordinator
import java.nio.ByteBuffer
import java.nio.channels.CompletionHandler
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class SequentialWriteCoordinator(
private val write: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit
) : WriteCoordinator {
//TODO think about and deal with writes.
//TODO maybe while ur at sculpture garden today?
override fun test(
using: ByteBuffer,
continuation: Continuation<Unit>
): Any {
write(using, object : CompletionHandler<Int, ByteBuffer> {
var required = using.flip().remaining()
override fun completed(count: Int, buffer: ByteBuffer) {
required -= count
if (required < 1)
continuation.resume(Unit)
else
write(using, this)
}
override fun failed(reason: Throwable, buffer: ByteBuffer) {
continuation.resumeWithException(reason)
}
})
return COROUTINE_SUSPENDED
}
}
package com.mynt.network
import java.net.SocketAddress
interface Provider : AutoCloseable {
suspend fun accept(address: SocketAddress): Connection
suspend fun connect(address: SocketAddress): Connection
val isOpen: Boolean
}
package com.mynt.network.providers
package com.mynt.network.providers
import com.mynt.network.Connection
import com.mynt.network.Provider
import com.mynt.network.implementation.continued
import com.mynt.network.implementation.sequential.SequentialRead
import com.mynt.network.implementation.sequential.SequentialReadCoordinator
import com.mynt.network.implementation.sequential.SequentialWrite
import com.mynt.network.implementation.sequential.SequentialWriteCoordinator
import kotlinx.coroutines.experimental.async
import java.lang.Boolean.TRUE
import java.net.SocketAddress
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousChannelGroup
import java.nio.channels.AsynchronousServerSocketChannel
import java.nio.channels.AsynchronousSocketChannel
import java.nio.channels.CompletionHandler
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MILLISECONDS
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class TCPSocketProvider(
private val group: AsynchronousChannelGroup,
private val allocator: () -> ByteBuffer
) : Provider {
private val servers = HashMap<SocketAddress, AsynchronousServerSocketChannel>()
//TODO is this really a win?
private val serverFactory = { address: SocketAddress ->
group.provider().openAsynchronousServerSocketChannel(group).bind(address)
}
private open class Handler(
allocator: () -> ByteBuffer
) : Connection {
open lateinit var channel: AsynchronousSocketChannel
override val read = SequentialRead(SequentialReadCoordinator { buffer, handler ->
channel.read(buffer, buffer, handler)
}, allocator())
override val write = SequentialWrite(SequentialWriteCoordinator { buffer, handler ->
channel.write(buffer, buffer, handler)
}, allocator().flip() as ByteBuffer)
override val isOpen
get() = channel.isOpen
override fun close() = channel.close()
}
//--Accept--
private class AcceptHandler(
allocator: () -> ByteBuffer
) : Handler(allocator), CompletionHandler<AsynchronousSocketChannel, Continuation<Connection>> {
override fun completed(channel: AsynchronousSocketChannel, continuation: Continuation<Connection>) {
this.channel = channel
continuation.resume(this)
}
override fun failed(reason: Throwable, continuation: Continuation<Connection>) =
continuation.resumeWithException(reason)
}
override suspend fun accept(address: SocketAddress) = continued<Connection> {
servers.computeIfAbsent(address, serverFactory).accept(it, AcceptHandler(allocator))
COROUTINE_SUSPENDED
}
//--Connect--
private class ConnectHandler(
allocator: () -> ByteBuffer,
override var channel: AsynchronousSocketChannel
) : Handler(allocator), CompletionHandler<Void?, Continuation<Connection>> {
override fun completed(ignored: Void?, continuation: Continuation<Connection>) =
continuation.resume(this)
override fun failed(reason: Throwable, continuation: Continuation<Connection>) =
continuation.resumeWithException(reason)
}
override suspend fun connect(address: SocketAddress) = continued<Connection> {
val channel = group.provider().openAsynchronousSocketChannel(group)
channel.connect(address, it, ConnectHandler(allocator, channel))
COROUTINE_SUSPENDED
}
//--State--
override val isOpen
get() = !group.isTerminated
override fun close() = group.shutdown()
//TODO Maybe just make close do this?
suspend fun awaitClose(
period: Long = Long.MAX_VALUE,
units: TimeUnit = MILLISECONDS
) = continued<Boolean> {
close()
async(it.context) {
try {
group.awaitTermination(period, units)
it.resume(TRUE)
} catch (reason: Exception) {
it.resumeWithException(reason)
}
}
}
}
package com.mynt.network
import java.nio.ByteBuffer
//TODO Add number overloads?
interface Read {
//--Complex--
suspend fun array(
amount: Int,
offset: Int = 0
) = array(ByteArray(amount), amount, offset)
suspend fun array(
array: ByteArray,
amount: Int = array.size,
offset: Int = 0
): ByteArray
suspend fun buffer(buffer: ByteBuffer): ByteBuffer
//--Primitive--
suspend fun byte(): Byte
suspend fun short(): Short
suspend fun int(): Int
suspend fun float(): Float
suspend fun long(): Long
suspend fun double(): Double
}
package com.mynt.network
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.Continuation
interface ReadCoordinator {
fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any
fun array(
using: ByteBuffer,
array: ByteArray,
amount: Int,
offset: Int,
continuation: Continuation<ByteArray>
): Any
fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
reader: (ByteBuffer) -> Type,
continuation: Continuation<Type>
): Any
}
package com.mynt.network
import java.nio.ByteBuffer
interface Write {
suspend fun buffer(value: ByteBuffer)
suspend fun byte(value: Byte)
}
package com.mynt.network
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.Continuation
interface WriteCoordinator {
fun test(
using: ByteBuffer,
continuation: Continuation<Unit>
): Any
}
package com.mynt.network.implementation.nio
import com.mynt.network.Connection
import java.nio.channels.Channel
abstract class ChannelConnection(
private val channel: Channel
) : Connection {
override val isOpen
get() = channel.isOpen
override fun close() = channel.close()
}
package com.mynt.network.implementation.nio
import com.mynt.network.Read
import com.mynt.network.Write
import java.nio.ByteBuffer
import java.nio.channels.DatagramChannel
class DatagramSocketConnection(
channel: DatagramChannel,
output: ByteBuffer,
input: ByteBuffer
) : ChannelConnection(channel) {
override val read: Read
get() = throw UnsupportedOperationException("Coming soon!")
override val write: Write
get() = throw UnsupportedOperationException("Coming soon!")
}
package com.mynt.network.implementation.nio
class WebSocketConnection {
}
package com.mynt.network
import java.net.SocketAddress
interface Provider : AutoCloseable {
suspend fun accept(address: SocketAddress): Connection
suspend fun connect(address: SocketAddress): Connection
val isOpen: Boolean
}
package com.mynt.network.providers
package com.mynt.network.providers
import com.mynt.network.Connection
import com.mynt.network.Provider
import com.mynt.network.implementation.continued
import com.mynt.network.implementation.sequential.SequentialRead
import com.mynt.network.implementation.sequential.SequentialReadCoordinator
import com.mynt.network.implementation.sequential.SequentialWrite
import com.mynt.network.implementation.sequential.SequentialWriteCoordinator
import kotlinx.coroutines.experimental.async
import java.lang.Boolean.TRUE
import java.net.SocketAddress
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousChannelGroup
import java.nio.channels.AsynchronousServerSocketChannel
import java.nio.channels.AsynchronousSocketChannel
import java.nio.channels.CompletionHandler
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MILLISECONDS
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class TCPSocketProvider(
private val group: AsynchronousChannelGroup,
private val allocator: () -> ByteBuffer
) : Provider {
private val servers = HashMap<SocketAddress, AsynchronousServerSocketChannel>()
//TODO is this really a win?
private val serverFactory = { address: SocketAddress ->
group.provider().openAsynchronousServerSocketChannel(group).bind(address)
}
private open class Handler(
allocator: () -> ByteBuffer
) : Connection {
open lateinit var channel: AsynchronousSocketChannel
override val read = SequentialRead(SequentialReadCoordinator { buffer, handler ->
channel.read(buffer, buffer, handler)
}, allocator())
override val write = SequentialWrite(SequentialWriteCoordinator { buffer, handler ->
channel.write(buffer, buffer, handler)
}, allocator().flip() as ByteBuffer)
override val isOpen
get() = channel.isOpen
override fun close() = channel.close()
}
//--Accept--
private class AcceptHandler(
allocator: () -> ByteBuffer
) : Handler(allocator), CompletionHandler<AsynchronousSocketChannel, Continuation<Connection>> {
override fun completed(channel: AsynchronousSocketChannel, continuation: Continuation<Connection>) {
this.channel = channel
continuation.resume(this)
}
override fun failed(reason: Throwable, continuation: Continuation<Connection>) =
continuation.resumeWithException(reason)
}
override suspend fun accept(address: SocketAddress) = continued<Connection> {
servers.computeIfAbsent(address, serverFactory).accept(it, AcceptHandler(allocator))
COROUTINE_SUSPENDED
}
//--Connect--
private class ConnectHandler(
allocator: () -> ByteBuffer,
override var channel: AsynchronousSocketChannel
) : Handler(allocator), CompletionHandler<Void?, Continuation<Connection>> {
override fun completed(ignored: Void?, continuation: Continuation<Connection>) =
continuation.resume(this)
override fun failed(reason: Throwable, continuation: Continuation<Connection>) =
continuation.resumeWithException(reason)
}
override suspend fun connect(address: SocketAddress) = continued<Connection> {
val channel = group.provider().openAsynchronousSocketChannel(group)
channel.connect(address, it, ConnectHandler(allocator, channel))
COROUTINE_SUSPENDED
}
//--State--
override val isOpen
get() = !group.isTerminated
override fun close() = group.shutdown()
//TODO Maybe just make close do this?
suspend fun awaitClose(
period: Long = Long.MAX_VALUE,
units: TimeUnit = MILLISECONDS
) = continued<Boolean> {
close()
async(it.context) {
try {
group.awaitTermination(period, units)
it.resume(TRUE)
} catch (reason: Exception) {
it.resumeWithException(reason)
}
}
}
}
package com.mynt.network.implementation.queued
import com.mynt.network.ReadCoordinator
import com.mynt.network.implementation.*
import com.mynt.network.implementation.sequential.SequentialRead
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class QueuedRead(
private val read: ReadCoordinator,
private val buffer: ByteBuffer
) : SequentialRead(read, buffer) {
//--Complex--
fun buffer(buffer: ByteBuffer, callback: (ByteBuffer) -> Unit) {
val result = read.buffer(this.buffer, buffer, continuation(callback))
if (result != COROUTINE_SUSPENDED)
callback(result as ByteBuffer)
}
fun array(
array: ByteArray,
amount: Int = array.size,
offset: Int = 0,
callback: (ByteArray) -> Unit) {
val result = read.array(buffer, array, offset, amount, continuation(callback))
if (result != COROUTINE_SUSPENDED)
callback(result as ByteArray)
}
@Suppress("UNCHECKED_CAST")
private fun <Type : Number> number(
amount: Int,
converter: (ByteBuffer) -> Type,
callback: (Type) -> Unit
) {
val result = read.number(buffer, amount, converter, continuation(callback))
if (result != COROUTINE_SUSPENDED)
callback(result as Type)
}
//--Primitive--
fun byte(callback: (Byte) -> Unit) = number(1, BYTE, callback)
fun short(callback: (Short) -> Unit) = number(2, SHORT, callback)
fun int(callback: (Int) -> Unit) = number(4, INT, callback)
fun float(callback: (Float) -> Unit) = number(4, FLOAT, callback)
fun long(callback: (Long) -> Unit) = number(8, LONG, callback)
fun double(callback: (Double) -> Unit) = number(8, DOUBLE, callback)
}
package com.mynt.network.implementation.queued
import com.mynt.network.ReadCoordinator
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.Continuation
//TODO Implement.
class QueuedReadCoordinator : ReadCoordinator {
override fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any {
throw UnsupportedOperationException("Stub")
}
override fun array(
using: ByteBuffer,
array: ByteArray,
amount: Int,
offset: Int,
continuation: Continuation<ByteArray>
): Any {
throw UnsupportedOperationException("Stub")
}
override fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
reader: (ByteBuffer) -> Type,
continuation: Continuation<Type>
): Any {
throw UnsupportedOperationException("Stub")
}
}
package com.mynt.network.implementation.queued
import com.mynt.network.implementation.sequential.SequentialWrite
import com.mynt.network.implementation.sequential.SequentialWriteCoordinator
import java.nio.ByteBuffer
class QueuedWrite(
private val write: SequentialWriteCoordinator,
private val buffer: ByteBuffer
) : SequentialWrite(write, buffer) {
}
package com.mynt.network.implementation.queued
class QueuedWriteCoordinator {
}
package com.mynt.network
import java.nio.ByteBuffer
//TODO Add number overloads?
interface Read {
//--Complex--
suspend fun array(
amount: Int,
offset: Int = 0
) = array(ByteArray(amount), amount, offset)
suspend fun array(
array: ByteArray,
amount: Int = array.size,
offset: Int = 0
): ByteArray
suspend fun buffer(buffer: ByteBuffer): ByteBuffer
//--Primitive--
suspend fun byte(): Byte
suspend fun short(): Short
suspend fun int(): Int
suspend fun float(): Float
suspend fun long(): Long
suspend fun double(): Double
}
package com.mynt.network
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.Continuation
interface ReadCoordinator {
fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any
fun array(
using: ByteBuffer,
array: ByteArray,
amount: Int,
offset: Int,
continuation: Continuation<ByteArray>
): Any
fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
reader: (ByteBuffer) -> Type,
continuation: Continuation<Type>
): Any
}
package com.mynt.network.providers
package com.mynt.network.implementation.sequential
import com.mynt.network.Read
import com.mynt.network.ReadCoordinator
import com.mynt.network.implementation.*
import java.nio.ByteBuffer
//TODO this probably can't stay static forever.
open class SequentialRead(
private val read: ReadCoordinator,
private val buffer: ByteBuffer
) : Read {
//--Complex--
override suspend fun array(
array: ByteArray,
amount: Int,
offset: Int
) = continued<ByteArray> {
read.array(buffer, array, amount, offset, it)
}
override suspend fun buffer(
buffer: ByteBuffer
) = continued<ByteBuffer> {
read.buffer(this.buffer, buffer, it)
}
//--Primitive--
override suspend fun byte() = continued<Byte> {
read.number(buffer, 1, BYTE, it)
}
override suspend fun short() = continued<Short> {
read.number(buffer, 2, SHORT, it)
}
override suspend fun int() = continued<Int> {
read.number(buffer, 4, INT, it)
}
override suspend fun float() = continued<Float> {
read.number(buffer, 4, FLOAT, it)
}
override suspend fun long() = continued<Long> {
read.number(buffer, 8, LONG, it)
}
override suspend fun double() = continued<Double> {
read.number(buffer, 8, DOUBLE, it)
}
}
package com.mynt.network.implementation.sequential
import com.mynt.network.ReadCoordinator
import com.mynt.network.implementation.Holder
import com.mynt.network.implementation.InProgressException
import java.nio.ByteBuffer
import java.nio.channels.CompletionHandler
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class SequentialReadCoordinator(
private val read: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit
) : ReadCoordinator {
private abstract class ReadHandler<Type> : Holder<Continuation<Type>>(), CompletionHandler<Int, ByteBuffer> {
var required = 0
override fun failed(reason: Throwable, buffer: ByteBuffer) {
release().resumeWithException(reason)
}
}
private val arrayHandler = object : ReadHandler<ByteArray>() {
var offset = 0
var array: ByteArray? = null
operator fun invoke(
using: ByteBuffer,
array: ByteArray,
offset: Int,
amount: Int,
continuation: Continuation<ByteArray>
): Any {
if (!hold(continuation))
throw InProgressException()
val remaining = using.remaining()
if (remaining > 0) {
if (remaining >= amount) {
using.get(array, offset, amount)
this.offset = offset
return array
}
using.get(array, offset, remaining)
this.offset = offset + remaining
}
required = amount - remaining
this.array = array
using.flip()
read(using, this)
return COROUTINE_SUSPENDED
}
override fun completed(count: Int, buffer: ByteBuffer) {
val current = required
required -= count
if (required < 1) {
buffer.flip()
buffer.get(array, offset, current)
release().resume(array!!)
} else {
if (buffer.remaining() < required) {
val remaining = buffer.position()
buffer.flip()
buffer.get(array, offset, remaining).clear()
offset += remaining
}
read(buffer, this)
}
}
}
private val bufferHandler = object : ReadHandler<ByteBuffer>() {
operator fun invoke(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any {
//TODO we could use required instead, but maybe nulling out is good?
if (!hold(continuation))
throw InProgressException()
if (using.hasRemaining())
destination.put(using)
required = destination.remaining()
return if (required < 1)
destination
else {
read(destination, this)
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, destination: ByteBuffer) {
required -= count
if (required < 1) {
release().resume(destination)
} else {
read(destination, this)
}
}
}
private val primitiveHandler = object : ReadHandler<Number>() {
var marked = false
var converter: ((ByteBuffer) -> Number)? = null
operator fun invoke(
using: ByteBuffer,
amount: Int,
converter: (ByteBuffer) -> Number,
continuation: Continuation<Number>
): Any {
val remaining = using.remaining()
return if (remaining >= amount)
converter(using)
else {
if (!hold(continuation))
throw InProgressException()
this.converter = converter
required = amount - remaining
val capacity = using.capacity()
val limit = using.limit()
if (remaining == 0)
using.clear()
if (capacity - limit < required)
using.compact()
else {
marked = true
using.mark().position(limit).limit(capacity)
}
read(using, this)
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, buffer: ByteBuffer) {
required -= count
if (required < 1) {
if (marked) {
marked = false
val limit = buffer.position()
buffer.reset()
buffer.limit(limit)
} else
buffer.flip()
release().resume(converter!!.invoke(buffer))
}
}
}
override fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
) = bufferHandler(using, destination, continuation)
override fun array(
using: ByteBuffer,
array: ByteArray,
amount: Int,
offset: Int,
continuation: Continuation<ByteArray>
) = arrayHandler(using, array, offset, amount, continuation)
override fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
reader: (ByteBuffer) -> Type,
continuation: Continuation<Type>
) = primitiveHandler(using, amount, reader, continuation as Continuation<Number>)
}
package com.mynt.network.implementation.sequential
import com.mynt.network.Write
import com.mynt.network.implementation.continued
import java.nio.ByteBuffer
open class SequentialWrite(
private val write: SequentialWriteCoordinator,
private val buffer: ByteBuffer
) : Write {
override suspend fun buffer(value: ByteBuffer) = continued<Unit> {
write.test(value, it)
}
override suspend fun byte(value: Byte) = continued<Unit> {
buffer.clear()
buffer.put(value)
write.test(buffer, it)
}
}
package com.mynt.network.implementation.sequential
import com.mynt.network.WriteCoordinator
import java.nio.ByteBuffer
import java.nio.channels.CompletionHandler
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class SequentialWriteCoordinator(
private val write: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit
) : WriteCoordinator {
//TODO think about and deal with writes.
//TODO maybe while ur at sculpture garden today?
override fun test(
using: ByteBuffer,
continuation: Continuation<Unit>
): Any {
write(using, object : CompletionHandler<Int, ByteBuffer> {
var required = using.flip().remaining()
override fun completed(count: Int, buffer: ByteBuffer) {
required -= count
if (required < 1)
continuation.resume(Unit)
else
write(using, this)
}
override fun failed(reason: Throwable, buffer: ByteBuffer) {
continuation.resumeWithException(reason)
}
})
return COROUTINE_SUSPENDED
}
}
package com.mynt.network.implementation.sequential
import com.mynt.network.Read
import com.mynt.network.ReadCoordinator
import com.mynt.network.implementation.*
import java.nio.ByteBuffer
//TODO this probably can't stay static forever.
open class SequentialRead(
private val read: ReadCoordinator,
private val buffer: ByteBuffer
) : Read {
//--Complex--
override suspend fun array(
array: ByteArray,
amount: Int,
offset: Int
) = continued<ByteArray> {
read.array(buffer, array, amount, offset, it)
}
override suspend fun buffer(
buffer: ByteBuffer
) = continued<ByteBuffer> {
read.buffer(this.buffer, buffer, it)
}
//--Primitive--
override suspend fun byte() = continued<Byte> {
read.number(buffer, 1, BYTE, it)
}
override suspend fun short() = continued<Short> {
read.number(buffer, 2, SHORT, it)
}
override suspend fun int() = continued<Int> {
read.number(buffer, 4, INT, it)
}
override suspend fun float() = continued<Float> {
read.number(buffer, 4, FLOAT, it)
}
override suspend fun long() = continued<Long> {
read.number(buffer, 8, LONG, it)
}
override suspend fun double() = continued<Double> {
read.number(buffer, 8, DOUBLE, it)
}
}
package com.mynt.network.implementation.sequential
import com.mynt.network.ReadCoordinator
import com.mynt.network.implementation.Holder
import com.mynt.network.implementation.InProgressException
import java.nio.ByteBuffer
import java.nio.channels.CompletionHandler
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class SequentialReadCoordinator(
private val read: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit
) : ReadCoordinator {
private abstract class ReadHandler<Type> : Holder<Continuation<Type>>(), CompletionHandler<Int, ByteBuffer> {
var required = 0
override fun failed(reason: Throwable, buffer: ByteBuffer) {
release().resumeWithException(reason)
}
}
private val arrayHandler = object : ReadHandler<ByteArray>() {
var offset = 0
var array: ByteArray? = null
operator fun invoke(
using: ByteBuffer,
array: ByteArray,
offset: Int,
amount: Int,
continuation: Continuation<ByteArray>
): Any {
if (!hold(continuation))
throw InProgressException()
val remaining = using.remaining()
if (remaining > 0) {
if (remaining >= amount) {
using.get(array, offset, amount)
this.offset = offset
return array
}
using.get(array, offset, remaining)
this.offset = offset + remaining
}
required = amount - remaining
this.array = array
using.flip()
read(using, this)
return COROUTINE_SUSPENDED
}
override fun completed(count: Int, buffer: ByteBuffer) {
val current = required
required -= count
if (required < 1) {
buffer.flip()
buffer.get(array, offset, current)
release().resume(array!!)
} else {
if (buffer.remaining() < required) {
val remaining = buffer.position()
buffer.flip()
buffer.get(array, offset, remaining).clear()
offset += remaining
}
read(buffer, this)
}
}
}
private val bufferHandler = object : ReadHandler<ByteBuffer>() {
operator fun invoke(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
): Any {
//TODO we could use required instead, but maybe nulling out is good?
if (!hold(continuation))
throw InProgressException()
if (using.hasRemaining())
destination.put(using)
required = destination.remaining()
return if (required < 1)
destination
else {
read(destination, this)
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, destination: ByteBuffer) {
required -= count
if (required < 1) {
release().resume(destination)
} else {
read(destination, this)
}
}
}
private val primitiveHandler = object : ReadHandler<Number>() {
var marked = false
var converter: ((ByteBuffer) -> Number)? = null
operator fun invoke(
using: ByteBuffer,
amount: Int,
converter: (ByteBuffer) -> Number,
continuation: Continuation<Number>
): Any {
val remaining = using.remaining()
return if (remaining >= amount)
converter(using)
else {
if (!hold(continuation))
throw InProgressException()
this.converter = converter
required = amount - remaining
val capacity = using.capacity()
val limit = using.limit()
if (remaining == 0)
using.clear()
if (capacity - limit < required)
using.compact()
else {
marked = true
using.mark().position(limit).limit(capacity)
}
read(using, this)
COROUTINE_SUSPENDED
}
}
override fun completed(count: Int, buffer: ByteBuffer) {
required -= count
if (required < 1) {
if (marked) {
marked = false
val limit = buffer.position()
buffer.reset()
buffer.limit(limit)
} else
buffer.flip()
release().resume(converter!!.invoke(buffer))
}
}
}
override fun buffer(
using: ByteBuffer,
destination: ByteBuffer,
continuation: Continuation<ByteBuffer>
) = bufferHandler(using, destination, continuation)
override fun array(
using: ByteBuffer,
array: ByteArray,
amount: Int,
offset: Int,
continuation: Continuation<ByteArray>
) = arrayHandler(using, array, offset, amount, continuation)
override fun <Type : Number> number(
using: ByteBuffer,
amount: Int,
reader: (ByteBuffer) -> Type,
continuation: Continuation<Type>
) = primitiveHandler(using, amount, reader, continuation as Continuation<Number>)
}
package com.mynt.network.implementation.sequential
import com.mynt.network.Write
import com.mynt.network.implementation.continued
import java.nio.ByteBuffer
open class SequentialWrite(
private val write: SequentialWriteCoordinator,
private val buffer: ByteBuffer
) : Write {
override suspend fun buffer(value: ByteBuffer) = continued<Unit> {
write.test(value, it)
}
override suspend fun byte(value: Byte) = continued<Unit> {
buffer.clear()
buffer.put(value)
write.test(buffer, it)
}
}
package com.mynt.network.implementation.sequential
import com.mynt.network.WriteCoordinator
import java.nio.ByteBuffer
import java.nio.channels.CompletionHandler
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class SequentialWriteCoordinator(
private val write: (ByteBuffer, CompletionHandler<Int, ByteBuffer>) -> Unit
) : WriteCoordinator {
//TODO think about and deal with writes.
//TODO maybe while ur at sculpture garden today?
override fun test(
using: ByteBuffer,
continuation: Continuation<Unit>
): Any {
write(using, object : CompletionHandler<Int, ByteBuffer> {
var required = using.flip().remaining()
override fun completed(count: Int, buffer: ByteBuffer) {
required -= count
if (required < 1)
continuation.resume(Unit)
else
write(using, this)
}
override fun failed(reason: Throwable, buffer: ByteBuffer) {
continuation.resumeWithException(reason)
}
})
return COROUTINE_SUSPENDED
}
}
package com.mynt.network.providers
import com.mynt.network.Connection
import com.mynt.network.Provider
import com.mynt.network.implementation.continued
import com.mynt.network.implementation.sequential.SequentialRead
import com.mynt.network.implementation.sequential.SequentialReadCoordinator
import com.mynt.network.implementation.sequential.SequentialWrite
import com.mynt.network.implementation.sequential.SequentialWriteCoordinator
import kotlinx.coroutines.experimental.async
import java.lang.Boolean.TRUE
import java.net.SocketAddress
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousChannelGroup
import java.nio.channels.AsynchronousServerSocketChannel
import java.nio.channels.AsynchronousSocketChannel
import java.nio.channels.CompletionHandler
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MILLISECONDS
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
class TCPSocketProvider(
private val group: AsynchronousChannelGroup,
private val allocator: () -> ByteBuffer
) : Provider {
private val servers = HashMap<SocketAddress, AsynchronousServerSocketChannel>()
//TODO is this really a win?
private val serverFactory = { address: SocketAddress ->
group.provider().openAsynchronousServerSocketChannel(group).bind(address)
}
private open class Handler(
allocator: () -> ByteBuffer
) : Connection {
open lateinit var channel: AsynchronousSocketChannel
override val read = SequentialRead(SequentialReadCoordinator { buffer, handler ->
channel.read(buffer, buffer, handler)
}, allocator())
override val write = SequentialWrite(SequentialWriteCoordinator { buffer, handler ->
channel.write(buffer, buffer, handler)
}, allocator().flip() as ByteBuffer)
override val isOpen
get() = channel.isOpen
override fun close() = channel.close()
}
//--Accept--
private class AcceptHandler(
allocator: () -> ByteBuffer
) : Handler(allocator), CompletionHandler<AsynchronousSocketChannel, Continuation<Connection>> {
override fun completed(channel: AsynchronousSocketChannel, continuation: Continuation<Connection>) {
this.channel = channel
continuation.resume(this)
}
override fun failed(reason: Throwable, continuation: Continuation<Connection>) =
continuation.resumeWithException(reason)
}
override suspend fun accept(address: SocketAddress) = continued<Connection> {
servers.computeIfAbsent(address, serverFactory).accept(it, AcceptHandler(allocator))
COROUTINE_SUSPENDED
}
//--Connect--
private class ConnectHandler(
allocator: () -> ByteBuffer,
override var channel: AsynchronousSocketChannel
) : Handler(allocator), CompletionHandler<Void?, Continuation<Connection>> {
override fun completed(ignored: Void?, continuation: Continuation<Connection>) =
continuation.resume(this)
override fun failed(reason: Throwable, continuation: Continuation<Connection>) =
continuation.resumeWithException(reason)
}
override suspend fun connect(address: SocketAddress) = continued<Connection> {
val channel = group.provider().openAsynchronousSocketChannel(group)
channel.connect(address, it, ConnectHandler(allocator, channel))
COROUTINE_SUSPENDED
}
//--State--
override val isOpen
get() = !group.isTerminated
override fun close() = group.shutdown()
//TODO Maybe just make close do this?
suspend fun awaitClose(
period: Long = Long.MAX_VALUE,
units: TimeUnit = MILLISECONDS
) = continued<Boolean> {
close()
async(it.context) {
try {
group.awaitTermination(period, units)
it.resume(TRUE)
} catch (reason: Exception) {
it.resumeWithException(reason)
}
}
}
}
package me.exerosis.testing.kotlin
import com.mynt.network.implementation.read
import com.mynt.network.providers.TCPSocketProvider
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.runBlocking
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.ByteBuffer.allocateDirect
import java.nio.channels.AsynchronousChannelGroup.withThreadPool
import java.util.Arrays.toString
import java.util.concurrent.Executors.newCachedThreadPool
fun main(args: Array<String>) {
runBlocking {
println("Starting")
val provider = TCPSocketProvider(
withThreadPool(newCachedThreadPool()),
{ allocateDirect(8).apply { flip() } }
)
val address = InetSocketAddress("localhost", 12346)
val getConnection = async {
val connection = provider.accept(address)
println("Got Connection")
async {
delay(10)
connection.read {
while (true) {
val opcode = byte()
println("Received Opcode: $opcode")
val length = int()
val bytes = array(length)
println(toString(bytes))
println(String(bytes))
}
}
}
}
val connection = provider.connect(address)
getConnection.await()
val buffer = ByteBuffer.allocateDirect(48)
val message = "Hello world!".toByteArray()
buffer.put(0)
buffer.putInt(message.size)
buffer.put(message)
connection.write.buffer(buffer)
println("Sent")
}
}
package com.mynt.network
import java.nio.ByteBuffer
interface Write {
suspend fun buffer(value: ByteBuffer)
suspend fun byte(value: Byte)
}
package com.mynt.network
import java.nio.ByteBuffer
import kotlin.coroutines.experimental.Continuation
interface WriteCoordinator {
fun test(
using: ByteBuffer,
continuation: Continuation<Unit>
): Any
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment