Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created September 25, 2022 09:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djspiewak/5facabeb0009ca5d57987cb539630699 to your computer and use it in GitHub Desktop.
Save djspiewak/5facabeb0009ca5d57987cb539630699 to your computer and use it in GitHub Desktop.

Integrated Runtime Strawman

We need to add an IORuntimeConfig parameter for a PollingSystem:

abstract class PollingSystem {
  protected[unsafe] def init(): PollingState
  protected[unsafe] def poll(state: PollingState, timeoutNanos: Long): Boolean
  protected[unsafe] def unpark(thread: Thread): Unit
  protected[unsafe] def close(state: PollingState): Unit
}

// pure marker trait
trait PollingState

Obviously, it should be possible to define this value both as a protected override within IOApp as well as the IORuntimeConfig itself. The former allows some degree of compositionality if the downstream implementor wants to define a custom IOApp subtype.

The contract of PollingSystem is such that it must support at least interruptibly sleeping the thread for up to a timeout. Additional functionality may be supported and would be PollingSystem-specific.

Additionally, on IORuntime itself, we define the following:

def unsafeCurrentPollingState(): PollingState =
  Thread.currentThread() match {
    case wt: WorkerThread => wt.currentPollingState()
    case _ => null
  }

This gives us an efficient thread local mechanism for the polling system, which in turn allows a downstream framework to get their PollingState instance which can be used to register callbacks. Note that this is a very unsafe, very untyped API in which a lot of casting will take place under common use.

Example

The simplest PollingSystem is just what is necessary to handle integrated timers:

object SleepingPollingSystem extends PollingSystem {

  def init(): PollingState = new PollingState {}

  def poll(state: PollingState, timeoutNanos: Long): Boolean = {
    Unsafe.parkNanos(timeoutNanos)
    true
  }

  def unpark(thread: Thread): Unit =
    LockSupport.unpark(thread)

  def close(state: PollingState): Unit = ()
}

In the event that we wanted to have a polling system which supports something like epoll, it would additionally need functionality to maintain the epoll file descriptor within its own PollingState (to which it would need to cast), as well as the ability to register for events. This event registration logic would be called by the downstream implementor itself, empowered via the IORuntime.unsafeCurrentPollingState() mechanism.

@armanbilge
Copy link

armanbilge commented Sep 25, 2022

Obviously, it should be possible to define this value both as a protected override within IOApp as well as the IORuntimeConfig itself. The former allows some degree of compositionality if the downstream implementor wants to define a custom IOApp subtype.

Personally I think this should be passed via IORuntime rather than IORuntimeConfig, as a pluggable component like the compute/blocking polls or the scheduler. The idea being that users are far more likely to want to use a custom IORuntimeConfig than a custom IORuntime (indeed, IOApp discourages you from overriding IORuntime).

In fact, if we go further it really feels like it should be passed as an argument here. Since after all this only makes sense with the work-stealing thread pool.
https://github.com/typelevel/cats-effect/blob/6cf9bdc6a5f7dd75999e12d3723b17dbca7824f7/core/jvm/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala#L37-L43

This is also consistent with my current usage in epollcat in friends: I offer an EpollRuntime(...) which takes an IORuntimeConfig (and eventually an EpollRuntimeConfig) and returns an IORuntime.

@armanbilge
Copy link

armanbilge commented Sep 25, 2022

// pure marker trait
trait PollingState

Why a marker trait? Honest either abstract class PollingSystem[S] or abstract class PollingSystem { type S ... seem a lot more ergonomic to me.

@armanbilge
Copy link

protected[unsafe] def unpark(thread: Thread): Unit

I need to think about this. This works for your trivial timer case but I am wondering if it should also give you access to that other thread's PollingState as well.

@armanbilge
Copy link

Also: PollingSystem will need access to reportFailure.

@armanbilge
Copy link

Or alternatively, access to execute so it can submit stuff to the WSTP/thread. Basically it needs a way of executing callbacks.

@djspiewak
Copy link
Author

Personally I think this should be passed via IORuntime rather than IORuntimeConfig

This is reasonable. It also synergizes well with the type member suggestion. I tend to mentally blur the line between the config and the runtime itself, but I think you're right that this fits a bit better in the strata of the latter.

Why a marker trait?

I didn't think of doing the member type. Agreed this is much nicer and it avoids the need to cast in user code if we do it right (there will be ugly casts in IORuntime itself).

I need to think about this. This works for your trivial timer case but I am wondering if it should also give you access to that other thread's PollingState as well.

It probably does. Let's just assume it does. :-) I'll amend the API.

PollingSystem will need access to reportFailure.

It can't just fail the associated callback?

Or alternatively, access to execute so it can submit stuff to the WSTP/thread. Basically it needs a way of executing callbacks.

Well, callbacks will be of type Either[Throwable, A] => Unit, so it can just run them directly. One of the things that isn't depicted here is the fact that subtypes will need to define their own mechanism for registering for events. I'm envisioning that this method will be on their subtype of PollingSystem and will take a callback as a parameter, which in turn will originally come from an IO.async. The PollingSystem subtype will be responsible for hanging onto this callback value somewhere in its PollingState (on the JVM heap). This should be a relatively simple state management problem since implementors can assume single-writer on all things.

@armanbilge
Copy link

Well, callbacks will be of type Either[Throwable, A] => Unit, so it can just run them directly.

Right, and if executing the callback itself fails for some reason it will need to reportFailure.

@djspiewak
Copy link
Author

The callback doesn’t fail though. It’s the callback produced by IOFiber which resumes the run loop, so any failures are handled there.

@armanbilge
Copy link

Perhaps :) it's a bit of a closed-world assumption :)

If we think more generally that this is part of the Runnable-based IORuntime machinery (which is making less and less assumptions that it is specific to IO and Fiber) then I think we should at least consider cases like this.

@djspiewak
Copy link
Author

Hmm, this is a very fair point. It doesn't hurt to give it access to reportFailure, so we may as well do that. I would rather not force it to explicitly execute all callbacks though. It should be the responsibility of the callback itself to handle the shifting, since IOFiber is extremely efficient about this, and this is where a lot of the thread affinity optimization comes from.

@armanbilge
Copy link

Yup, definitely agree about that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment