Skip to content

Instantly share code, notes, and snippets.

@nomisRev
Created November 28, 2019 09:11
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save nomisRev/7e857956cc79e00810a4bf3da55fea1b to your computer and use it in GitHub Desktop.
Save nomisRev/7e857956cc79e00810a4bf3da55fea1b to your computer and use it in GitHub Desktop.
Stacksafe Mono Reactor
import reactor.core.publisher.Mono
import java.util.concurrent.Executor
fun noTrampolining(): Mono<Int> {
var initial = Mono.just(0)
(0..5000).forEach { i ->
initial = initial.map { i }
}
return initial
}
fun <A> trampoline(a: A): Mono<A> =
Mono.create { sink ->
trampoline { sink.success(a) }
}
fun trampolined(): Mono<Int> {
var initial = Mono.just(0)
(0..5000).forEach { i ->
initial = if (i % maxStackDepthSize == 0) {
initial.flatMap { trampoline(it) }
} else {
initial.map { i }
}
}
return initial
}
fun main() {
trampolined().block()
noTrampolining().block() //StackOverFlow
}
/**
* EVERYTHING BELOW THIS POINT WAS COPY-PASTE FROM ARROW'S //arrow.fx.internal.Utils.kt
*/
const val maxStackDepthSize = 127
inline fun trampoline(crossinline f: () -> Unit): Unit =
_trampoline.get().execute(Runnable { f() })
private val underlying = Executor { it.run() }
@PublishedApi
internal val _trampoline = object : ThreadLocal<TrampolineExecutor>() {
override fun initialValue(): TrampolineExecutor =
TrampolineExecutor(underlying)
}
@PublishedApi
internal class TrampolineExecutor(val underlying: Executor) {
private var immediateQueue = Platform.ArrayStack<Runnable>()
@Volatile
private var withinLoop = false
private fun startLoop(runnable: Runnable) {
withinLoop = true
try {
immediateLoop(runnable)
} finally {
withinLoop = false
}
}
fun execute(runnable: Runnable): Unit =
if (!withinLoop) startLoop(runnable)
else immediateQueue.push(runnable)
private fun forkTheRest() {
class ResumeRun(val head: Runnable, val rest: Platform.ArrayStack<Runnable>) : Runnable {
override fun run() {
immediateQueue.pushAll(rest)
immediateLoop(head)
}
}
val head = immediateQueue.pop()
if (head != null) {
val rest = immediateQueue
immediateQueue = Platform.ArrayStack()
underlying.execute(ResumeRun(head, rest))
}
}
@Suppress("SwallowedException") // Should we rewrite with while??
private tailrec fun immediateLoop(task: Runnable) {
try {
task.run()
} catch (ex: Throwable) {
forkTheRest()
// ex.nonFatalOrThrow() //not required???
}
val next = immediateQueue.pop()
return if (next != null) immediateLoop(next)
else Unit
}
}
private const val initialIndex: Int = 0
private const val chunkSize: Int = 8
object Platform {
@Suppress("UNCHECKED_CAST")
class ArrayStack<A> {
private val initialArray: Array<Any?> = arrayOfNulls<Any?>(chunkSize)
private val modulo = chunkSize - 1
private var array = initialArray
private var index = initialIndex
/** Returns `true` if the stack is empty. */
fun isEmpty(): Boolean =
index == 0 && (array.getOrNull(0) == null)
/** Pushes an item on the stack. */
fun push(a: A) {
if (index == modulo) {
val newArray = arrayOfNulls<Any?>(chunkSize)
newArray[0] = array
array = newArray
index = 1
} else {
index += 1
}
array[index] = a
}
/** Pushes an entire iterator on the stack. */
fun pushAll(cursor: Iterator<A>) {
while (cursor.hasNext()) push(cursor.next())
}
/** Pushes an entire iterable on the stack. */
fun pushAll(seq: Iterable<A>) {
pushAll(seq.iterator())
}
/** Pushes the contents of another stack on this stack. */
fun pushAll(stack: ArrayStack<A>) {
pushAll(stack.iteratorReversed())
}
/** Pops an item from the stack (in LIFO order).
*
* Returns `null` in case the stack is empty.
*/
fun pop(): A? {
if (index == 0) {
if (array.getOrNull(0) != null) {
array = array[0] as Array<Any?>
index = modulo
} else {
return null
}
}
val result = array[index] as A
// GC purposes
array[index] = null
index -= 1
return result
}
/** Builds an iterator out of this stack. */
@Suppress("IteratorNotThrowingNoSuchElementException")
fun iteratorReversed(): Iterator<A> =
object : Iterator<A> {
private var array = this@ArrayStack.array
private var index = this@ArrayStack.index
override fun hasNext(): Boolean =
index > 0 || (array.getOrNull(0) != null)
override fun next(): A {
if (index == 0) {
array = array[0] as Array<Any?>
index = modulo
}
val result = array[index] as A
index -= 1
return result
}
}
fun isNotEmpty(): Boolean =
!isEmpty()
}
/**
* Establishes the maximum stack depth for `IO#map` operations.
*
* The default is `128`, from which we substract one as an
* optimization. This default has been reached like this:
*
* - according to official docs, the default stack size on 32-bits
* Windows and Linux was 320 KB, whereas for 64-bits it is 1024 KB
* - according to measurements chaining `Function1` references uses
* approximately 32 bytes of stack space on a 64 bits system;
* this could be lower if "compressed oops" is activated
* - therefore a "map fusion" that goes 128 in stack depth can use
* about 4 KB of stack space
*/
const val maxStackDepthSize = 127
/**
* Composes multiple errors together, meant for those cases in which error suppression, due to a second error being
* triggered, is not acceptable.
*
* On top of the JVM this function uses Throwable#addSuppressed, available since Java 7. On top of JavaScript the
* function would return a CompositeException.
*/
fun composeErrors(first: Throwable, vararg rest: Throwable): Throwable {
rest.forEach { if (it != first) first.addSuppressed(it) }
return first
}
/**
* Composes multiple errors together, meant for those cases in which error suppression, due to a second error being
* triggered, is not acceptable.
*
* On top of the JVM this function uses Throwable#addSuppressed, available since Java 7. On top of JavaScript the
* function would return a CompositeException.
*/
fun composeErrors(first: Throwable, rest: List<Throwable>): Throwable {
rest.forEach { if (it != first) first.addSuppressed(it) }
return first
}
inline fun trampoline(crossinline f: () -> Unit): Unit =
_trampoline.get().execute(Runnable { f() })
private val underlying = Executor { it.run() }
@PublishedApi
internal val _trampoline = object : ThreadLocal<TrampolineExecutor>() {
override fun initialValue(): TrampolineExecutor =
TrampolineExecutor(underlying)
}
@PublishedApi
internal class TrampolineExecutor(val underlying: Executor) {
private var immediateQueue = ArrayStack<Runnable>()
@Volatile
private var withinLoop = false
private fun startLoop(runnable: Runnable) {
withinLoop = true
try {
immediateLoop(runnable)
} finally {
withinLoop = false
}
}
fun execute(runnable: Runnable): Unit =
if (!withinLoop) startLoop(runnable)
else immediateQueue.push(runnable)
private fun forkTheRest() {
class ResumeRun(val head: Runnable, val rest: ArrayStack<Runnable>) : Runnable {
override fun run() {
immediateQueue.pushAll(rest)
immediateLoop(head)
}
}
val head = immediateQueue.pop()
if (head != null) {
val rest = immediateQueue
immediateQueue = ArrayStack()
underlying.execute(ResumeRun(head, rest))
}
}
@Suppress("SwallowedException") // Should we rewrite with while??
private tailrec fun immediateLoop(task: Runnable) {
try {
task.run()
} catch (ex: Throwable) {
forkTheRest()
// ex.nonFatalOrThrow() //not required???
}
val next = immediateQueue.pop()
return if (next != null) immediateLoop(next)
else Unit
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment