Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created September 25, 2022 09:07
Show Gist options
  • Save djspiewak/5facabeb0009ca5d57987cb539630699 to your computer and use it in GitHub Desktop.
Save djspiewak/5facabeb0009ca5d57987cb539630699 to your computer and use it in GitHub Desktop.

Integrated Runtime Strawman

We need to add an IORuntimeConfig parameter for a PollingSystem:

abstract class PollingSystem {
  protected[unsafe] def init(): PollingState
  protected[unsafe] def poll(state: PollingState, timeoutNanos: Long): Boolean
  protected[unsafe] def unpark(thread: Thread): Unit
  protected[unsafe] def close(state: PollingState): Unit
}

// pure marker trait
trait PollingState

Obviously, it should be possible to define this value both as a protected override within IOApp as well as the IORuntimeConfig itself. The former allows some degree of compositionality if the downstream implementor wants to define a custom IOApp subtype.

The contract of PollingSystem is such that it must support at least interruptibly sleeping the thread for up to a timeout. Additional functionality may be supported and would be PollingSystem-specific.

Additionally, on IORuntime itself, we define the following:

def unsafeCurrentPollingState(): PollingState =
  Thread.currentThread() match {
    case wt: WorkerThread => wt.currentPollingState()
    case _ => null
  }

This gives us an efficient thread local mechanism for the polling system, which in turn allows a downstream framework to get their PollingState instance which can be used to register callbacks. Note that this is a very unsafe, very untyped API in which a lot of casting will take place under common use.

Example

The simplest PollingSystem is just what is necessary to handle integrated timers:

object SleepingPollingSystem extends PollingSystem {

  def init(): PollingState = new PollingState {}

  def poll(state: PollingState, timeoutNanos: Long): Boolean = {
    Unsafe.parkNanos(timeoutNanos)
    true
  }

  def unpark(thread: Thread): Unit =
    LockSupport.unpark(thread)

  def close(state: PollingState): Unit = ()
}

In the event that we wanted to have a polling system which supports something like epoll, it would additionally need functionality to maintain the epoll file descriptor within its own PollingState (to which it would need to cast), as well as the ability to register for events. This event registration logic would be called by the downstream implementor itself, empowered via the IORuntime.unsafeCurrentPollingState() mechanism.

@djspiewak
Copy link
Author

The callback doesn’t fail though. It’s the callback produced by IOFiber which resumes the run loop, so any failures are handled there.

@armanbilge
Copy link

Perhaps :) it's a bit of a closed-world assumption :)

If we think more generally that this is part of the Runnable-based IORuntime machinery (which is making less and less assumptions that it is specific to IO and Fiber) then I think we should at least consider cases like this.

@djspiewak
Copy link
Author

Hmm, this is a very fair point. It doesn't hurt to give it access to reportFailure, so we may as well do that. I would rather not force it to explicitly execute all callbacks though. It should be the responsibility of the callback itself to handle the shifting, since IOFiber is extremely efficient about this, and this is where a lot of the thread affinity optimization comes from.

@armanbilge
Copy link

Yup, definitely agree about that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment