Skip to content

Instantly share code, notes, and snippets.

View djspiewak's full-sized avatar

Daniel Spiewak djspiewak

View GitHub Profile

Integrated Runtime Strawman

We need to add an IORuntimeConfig parameter for a PollingSystem:

abstract class PollingSystem {
  protected[unsafe] def init(): PollingState
  protected[unsafe] def poll(state: PollingState, timeoutNanos: Long): Boolean
  protected[unsafe] def unpark(thread: Thread): Unit
  protected[unsafe] def close(state: PollingState): Unit

What follows are some of my (very) rough thoughts on what we can and should do with respect to CPS transformation in Scala at the language level. I'll try to start with some motivation behind my thinking, as well as some rambling observations on the nature of the problem space, but don't expect too much coherence here. :-)

The Problem

Async programming is hard.

Okay let's actually be more specific than that. High-performance I/O is hard. Signal multiplexing is a powerful technique for achieving high(er) performance I/O, particularly network I/O, but the tradeoff is that, in order to utilize it, the user-space programming model must allow for suspension and resumption of sequential continuations (often called "fibers" or "coroutines"). Achieving this type of programming model without significant tradeoffs in usability is what is exceptionally hard.

If that wasn't bad enough though, these problems are inextricably conflated with another set of problem spaces which are, themselves, very difficult. In

[error] java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
[error] at xsbt.DottydocRunner.run(DottydocRunner.java:65)
[error] at xsbt.ScaladocInterface.run(ScaladocInterface.java:11)
[error] at jdk.internal.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
[error] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[error] at sbt.internal.inc.AnalyzingCompiler.invoke(AnalyzingCompiler.scala:329)
[error] at sbt.internal.inc.AnalyzingCompiler.doc(AnalyzingCompiler.scala:175)
[error] at sbt.internal.inc.AnalyzingCompiler.doc(AnalyzingCompiler.scala:133)
[error] at sbt.Doc$.$anonfun$scaladoc$1(Doc.scala:52)
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
// sbt-git workarounds (TODO upstream into sbt-tl)
inThisBuild {
git.uncommittedSignifier := Some("-SNAPSHOT")
git.formattedShaVersion := {
val Description = """^.*-(\d+)-[a-zA-Z0-9]+$""".r
val suffix = git.makeUncommittedSignifierSuffix(git.gitUncommittedChanges.value, git.uncommittedSignifier.value)
val description = Try("git describe --tags --match v*".!!.trim).toOption
// ported with love from http://psy-lob-saw.blogspot.com/2014/04/notes-on-concurrent-ring-buffer-queue.html
private[effect] final class UnsafeBounded[A](bound: Int) {
private[this] val buffer = new Array[AnyRef](bound)
private[this] val sequenceBuffer = new AtomicLongArray(bound)
private[this] val head = new AtomicLong(0)
private[this] val tail = new AtomicLong(0)
0.until(bound).foreach(i => sequenceBuffer.set(i, i.toLong))
final class Channel[F[_], A] private (q: Queue[F, A], closed: Ref[F, Boolean])(implicit F: Monad[F]) {
// doesn't interrupt taking in progress
def close: F[Unit] = closed.set(true)
def isClosed: F[Boolean] = closed.get
def send(a: A): F[Option[Unit]] =
closed.get.ifM(q.offer(a).map(Some(_)), F.pure(None))
class Channel[F[_], A] private (q: Queue[F, A], closed: Ref[F, Boolean])(implicit F: Monad[F]) {
// doesn't interrupt taking in progress
def close: F[Unit] = closed.set(true)
def isClosed: F[Boolean] = closed.get
def send(a: A): F[Option[Unit]] =
closed.get.ifM(q.offer(a).map(Some(_)), F.pure(None))
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
[info] enqueue max items and dequeue in order
[error] x parallel put and take
[error] List(1961, 1437, 1487, 1494, 1433, 1458, 1490, 1414, 1413, 1410, 1619, 1411, 1645, 1793, 1646, 1916, 1426, 1614, 1613, 1605, 1408, 1441, 1624, 1572, 1466, 1625, 1696, 1419, 1409, 1622, 1840, 1958, 1899, 1423, 1479, 1611, 1498, 1471, 1474, 1462, 1467, 1432, 1440, 1415, 1489, 1884, 1023, 1890, 1854, 1477, 1482, 1499, 1406, 1485, 1444, 1418, 1417, 1492, 1616, 1473, 1969, 1869, 1842, 1421, 1459, 1451, 1428, 1496, 1465, 1456, 1846, 1698, 1693, 1484, 1621, 960, 921, 1708, 1706, 1018, 1957, 1966, 1460, 1610, 1650, 1617, 1472, 1491, 1452, 1639, 1024, 917, 1928, 1709, 1970, 1920, 1921, 1888, 1442, 1478, 1486, 1792, 1443, 1453, 1468, 1584, 1009, 941, 929, 1885, 1689, 1712, 1850, 1877, 1894, 1436, 1447, 1500, 1476, 1455, 946, 1635, 950, 925, 957, 1905, 908, 1917, 1710, 1908, 1962, 1651, 1445, 1603, 1652, 958, 940, 1673, 1662, 1669, 1054, 1870, 913, 1882, 1972, 1878, 1866, 1980, 1427, 1454, 980, 1010, 1017, 1006, 938, 1677, 1