Skip to content

Instantly share code, notes, and snippets.

@Groostav
Last active April 26, 2017 10:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Groostav/7e89dde503909a13d9ac21cd3c0f4fa2 to your computer and use it in GitHub Desktop.
Save Groostav/7e89dde503909a13d9ac21cd3c0f4fa2 to your computer and use it in GitHub Desktop.
heartbeat and IO whiteboarding
class ResponseDemultiplexingPipe(val blockingPipe: Pipe) {
sealed class IO {}
class Read(val id: UUID, response: Channel<MessageDTO>): IO
class Write(val id: UUID, response: MessageDTO): IO
var map: Map<UUID, Differed<MessageDTO>> = emptyMap()
val actor = actor<IO>(CommonPool) {
for(msg in this.channel){
when(msg){
is Read -> {
if(msg.id in map.keys){
msg.response.send(map[msg.id].value)
}
else {
val result = Differed()
map += msg.id to result
response.send(result.await()) //is this legal here?
}
}
is Write -> {
if(msg.id in map.keys) { map[msg.id].complete(msg.response) }
else map[msg.id] = CompletedDifferred(msg.response) }
}
}
}
}
//var messageBuffer: Map<UUID, MessageDTO> = emptyMap() //I'm pretty sure this is erroneous...
init {
singletonCachedThreadPool.submit { readAndDispatch() }
}
suspend fun readMessage(respondingTo: UUID): MessageDTO {
val response = Channel<MessageDTO>()
actor.send(Read(respondingTo, response))
return response.recieve()
}
private fun readAndDispatch() {
val message = blockingPipe.readMessage()
actor.send(Write(message.id, message))
singletonCachedThreadPool.submit { readAndDispatch() }
}
}
class ResponseDemultiplexingPipe(val blockingPipe: Pipe) {
val channelOrOtherSyncRoot: MultibroadcastTODOChannel = TODO() //like a map<UUID, Differred<Message>>
val serializingDispatcher = singletonCachedThreadPool.asSingleThreadedExecutorFor(this).asDispatcher()
//var messageBuffer: Map<UUID, MessageDTO> = emptyMap() //I'm pretty sure this is erroneous...
init {
singletonCachedThreadPool.submit { readAndDispatch() }
}
suspend fun readMessage(respondingTo: UUID): MessageDTO
= run(serializingDispatcher) {
//val response = messageBuffer.remove(respondingTo)
return await(channelOrOtherSyncRoot.take(respondingTo)) // .get(respondingTo).get()
}
private fun readAndDispatch() {
val message = blockingPipe.readMessage()
run(serializingDispatcher){
val wasDispatched = channelOrOtherSyncRoot.offer(message) // .get(message.respondingTo).set(message)
//if( ! wasDispatched) { messageBuffer += message.ResponseToID to message }
singletonCachedThreadPool.submit { readAndDispatch() }
}
}
}
class ResponseDemultiplexingPipe(val blockingPipe: Pipe) {
var consumers: Map<UUID, CompletableFuture<MessageDTO>> = emptyMap()
var messageBuffer: Map<UUID, MessageDTO> = emptyMap()
init {
singletonCachedThreadPool.submit { readAndDispatch() }
}
fun readMessage(respondingTo: UUID): CompletableFuture<MessageDTO> {
synchronized(this){
val response = messageBuffer.remove(respondingTo)
if(response != null) return CompletableFuture(response)
val waiter = CompletableFuture()
consumers += respondingTo to waiter
return waiter
}
}
private fun readAndDispatch(){
val message = blockingPipe.readMessage()
val consumer = synchronized(this){
val consumer = consumers.remove(message.ID)
if(consumer == null) messages += message
consumer
}
consumer?.complete(message)
singletonCachedThreadPool.submit { readAndDispatch() }
}
}
class SpecificJob(val pipe: ResponseDemultiplexingPipe): Job {
override fun onStart(){
launch(singletonCachedThreadPool.asDispatcher()) { runHeartbeatTail() }
}
fun runHeartbeatTail() {
val request = pipe.write(Heartbeat())
val response = await(upTo = 5.seconds) { pipe.readMessage<HeartbeatResponse>(request.ID) }
when(response){
null -> onPluginFlatline()
is Left -> onPluginError(response.l)
else ->
}
NonblockingSleep(5.seconds)
launch(singletonCachedThreadPool.asDispatcher()) { runHeartbeatTail() }
}
override fun run() = async<DomainModel> {
val request = pipe.write(newInputs())
val response = await { pipe.readMessage<NewOutputs>(request.ID) }
return buidlDomainModel(response)
}
//extension functions:
val Int.seconds: Duration get() = ...
fun <T> await(upTo: Duration, block: suspend.() -> T): T? where T: Any //nonnull T, disambiguates timeout clause
= withTimeout(duration.toMillis) { try { ... } catch(CancellationException ex){ return null } }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment