Last active
April 26, 2017 10:01
-
-
Save Groostav/7e89dde503909a13d9ac21cd3c0f4fa2 to your computer and use it in GitHub Desktop.
heartbeat and IO whiteboarding
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ResponseDemultiplexingPipe(val blockingPipe: Pipe) { | |
sealed class IO {} | |
class Read(val id: UUID, response: Channel<MessageDTO>): IO | |
class Write(val id: UUID, response: MessageDTO): IO | |
var map: Map<UUID, Differed<MessageDTO>> = emptyMap() | |
val actor = actor<IO>(CommonPool) { | |
for(msg in this.channel){ | |
when(msg){ | |
is Read -> { | |
if(msg.id in map.keys){ | |
msg.response.send(map[msg.id].value) | |
} | |
else { | |
val result = Differed() | |
map += msg.id to result | |
response.send(result.await()) //is this legal here? | |
} | |
} | |
is Write -> { | |
if(msg.id in map.keys) { map[msg.id].complete(msg.response) } | |
else map[msg.id] = CompletedDifferred(msg.response) } | |
} | |
} | |
} | |
} | |
//var messageBuffer: Map<UUID, MessageDTO> = emptyMap() //I'm pretty sure this is erroneous... | |
init { | |
singletonCachedThreadPool.submit { readAndDispatch() } | |
} | |
suspend fun readMessage(respondingTo: UUID): MessageDTO { | |
val response = Channel<MessageDTO>() | |
actor.send(Read(respondingTo, response)) | |
return response.recieve() | |
} | |
private fun readAndDispatch() { | |
val message = blockingPipe.readMessage() | |
actor.send(Write(message.id, message)) | |
singletonCachedThreadPool.submit { readAndDispatch() } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ResponseDemultiplexingPipe(val blockingPipe: Pipe) { | |
val channelOrOtherSyncRoot: MultibroadcastTODOChannel = TODO() //like a map<UUID, Differred<Message>> | |
val serializingDispatcher = singletonCachedThreadPool.asSingleThreadedExecutorFor(this).asDispatcher() | |
//var messageBuffer: Map<UUID, MessageDTO> = emptyMap() //I'm pretty sure this is erroneous... | |
init { | |
singletonCachedThreadPool.submit { readAndDispatch() } | |
} | |
suspend fun readMessage(respondingTo: UUID): MessageDTO | |
= run(serializingDispatcher) { | |
//val response = messageBuffer.remove(respondingTo) | |
return await(channelOrOtherSyncRoot.take(respondingTo)) // .get(respondingTo).get() | |
} | |
private fun readAndDispatch() { | |
val message = blockingPipe.readMessage() | |
run(serializingDispatcher){ | |
val wasDispatched = channelOrOtherSyncRoot.offer(message) // .get(message.respondingTo).set(message) | |
//if( ! wasDispatched) { messageBuffer += message.ResponseToID to message } | |
singletonCachedThreadPool.submit { readAndDispatch() } | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ResponseDemultiplexingPipe(val blockingPipe: Pipe) { | |
var consumers: Map<UUID, CompletableFuture<MessageDTO>> = emptyMap() | |
var messageBuffer: Map<UUID, MessageDTO> = emptyMap() | |
init { | |
singletonCachedThreadPool.submit { readAndDispatch() } | |
} | |
fun readMessage(respondingTo: UUID): CompletableFuture<MessageDTO> { | |
synchronized(this){ | |
val response = messageBuffer.remove(respondingTo) | |
if(response != null) return CompletableFuture(response) | |
val waiter = CompletableFuture() | |
consumers += respondingTo to waiter | |
return waiter | |
} | |
} | |
private fun readAndDispatch(){ | |
val message = blockingPipe.readMessage() | |
val consumer = synchronized(this){ | |
val consumer = consumers.remove(message.ID) | |
if(consumer == null) messages += message | |
consumer | |
} | |
consumer?.complete(message) | |
singletonCachedThreadPool.submit { readAndDispatch() } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SpecificJob(val pipe: ResponseDemultiplexingPipe): Job { | |
override fun onStart(){ | |
launch(singletonCachedThreadPool.asDispatcher()) { runHeartbeatTail() } | |
} | |
fun runHeartbeatTail() { | |
val request = pipe.write(Heartbeat()) | |
val response = await(upTo = 5.seconds) { pipe.readMessage<HeartbeatResponse>(request.ID) } | |
when(response){ | |
null -> onPluginFlatline() | |
is Left -> onPluginError(response.l) | |
else -> | |
} | |
NonblockingSleep(5.seconds) | |
launch(singletonCachedThreadPool.asDispatcher()) { runHeartbeatTail() } | |
} | |
override fun run() = async<DomainModel> { | |
val request = pipe.write(newInputs()) | |
val response = await { pipe.readMessage<NewOutputs>(request.ID) } | |
return buidlDomainModel(response) | |
} | |
//extension functions: | |
val Int.seconds: Duration get() = ... | |
fun <T> await(upTo: Duration, block: suspend.() -> T): T? where T: Any //nonnull T, disambiguates timeout clause | |
= withTimeout(duration.toMillis) { try { ... } catch(CancellationException ex){ return null } } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment