Skip to content

Instantly share code, notes, and snippets.

View djspiewak's full-sized avatar

Daniel Spiewak djspiewak

View GitHub Profile
private[effect] final class UnsafeBounded[A](bound: Int) {
private[this] val buffer = new Array[AnyRef](bound)
private[this] val filled = new Array[Boolean](bound) // can't bitpack this because of word tearing
private[this] val first = new AtomicInteger(0)
private[this] val last = new AtomicInteger(0)
private[this] val length = new AtomicInteger(0)
def debug(): String = buffer.mkString("[", ", ", "]")
private[effect] final class UnsafeUnbounded[A] {
private[this] val first = new AtomicReference[Cell]
private[this] val last = new AtomicReference[Cell]
def put(data: A): () => Unit = {
val cell = new Cell(data)
val oldLast = last.getAndSet(cell)
if (oldLast == null) {
@tailrec
private[effect] final class UnsafeBounded[A](bound: Int) {
private[this] val buffer = new Array[AnyRef](bound)
private[this] val first = new AtomicInteger(0)
private[this] val last = new AtomicInteger(0)
private[this] val length = new AtomicInteger(0)
def size(): Int = length.get()
@tailrec
private final class BoundedAsyncQueue[F[_], A](capacity: Int)(implicit F: Async[F]) extends Queue[F, A] {
require(capacity > 0)
private[this] val buffer = new UnsafeBounded[A](capacity)
private[this] val waiters = new UnsafeUnbounded[Either[Throwable, Unit] => Unit]()
def offer(a: A): F[Unit] = F defer {
try {
buffer.put(a)
notifyOne()
cats.effect.IOFiber@3ae5495a RUNNING
├ flatMap @ catseffect.examples.LiveFiberSnapshot$.$anonfun$loop$2(IOAppSpec.scala:294)
├ map @ catseffect.examples.LiveFiberSnapshot$.loop$lzycompute(IOAppSpec.scala:293)
├ flatMap @ catseffect.examples.LiveFiberSnapshot$.$anonfun$loop$2(IOAppSpec.scala:294)
├ map @ catseffect.examples.LiveFiberSnapshot$.loop$lzycompute(IOAppSpec.scala:293)
├ flatMap @ catseffect.examples.LiveFiberSnapshot$.$anonfun$loop$2(IOAppSpec.scala:294)
├ map @ catseffect.examples.LiveFiberSnapshot$.loop$lzycompute(IOAppSpec.scala:293)
├ flatMap @ catseffect.examples.LiveFiberSnapshot$.$anonfun$loop$2(IOAppSpec.scala:294)
├ >> @ catseffect.examples.LiveFiberSnapshot$.loop$lzycompute(IOAppSpec.scala:294)
├ flatMap @ catseffect.examples.LiveFiberSnapshot$.$anonfun$loop$2(IOAppSpec.scala:294)
[error] cats.effect.IOFiber@494388963 RUNNING
[error] ├ flatMap @ TestApp$.$anonfun$loop$2(TestApp.scala:27)
[error] ├ >> @ TestApp$.loop$lzycompute(TestApp.scala:27)
[error] ├ map @ TestApp$.loop$lzycompute(TestApp.scala:26)
[error] ├ flatMap @ TestApp$.$anonfun$loop$2(TestApp.scala:27)
[error] ├ flatMap @ TestApp$.$anonfun$loop$2(TestApp.scala:27)
[error] ├ >> @ TestApp$.loop$lzycompute(TestApp.scala:27)
[error] ├ >> @ TestApp$.loop$lzycompute(TestApp.scala:27)
[error] ├ map @ TestApp$.loop$lzycompute(TestApp.scala:26)
[error] ├ flatMap @ TestApp$.$anonfun$loop$2(TestApp.scala:27)
[error] cats.effect.IOFiber@766695369 RUNNING
[error] ├ map @ TestApp$.loop$lzycompute(TestApp.scala:25)
[error] ├ >> @ TestApp$.loop$lzycompute(TestApp.scala:26)
[error] ├ flatMap @ TestApp$.$anonfun$loop$2(TestApp.scala:26)
[error] ├ map @ TestApp$.loop$lzycompute(TestApp.scala:25)
[error] ├ >> @ TestApp$.loop$lzycompute(TestApp.scala:26)
[error] ├ >> @ TestApp$.loop$lzycompute(TestApp.scala:26)
[error] ├ >> @ TestApp$.loop$lzycompute(TestApp.scala:26)
[error] ├ flatMap @ TestApp$.$anonfun$loop$2(TestApp.scala:26)
[error] ├ flatMap @ TestApp$.$anonfun$loop$2(TestApp.scala:26)
[error] cats.effect.IOFiber@58d4928e RUNNING
[error] ├ map @ TestApp$.loop$lzycompute(TestApp.scala:25)
[error] ├ map @ TestApp$.loop$lzycompute(TestApp.scala:25)
[error] ├ >> @ TestApp$.loop$lzycompute(TestApp.scala:26)
[error] ├ flatMap @ TestApp$.$anonfun$loop$2(TestApp.scala:26)
[error] ├ map @ TestApp$.loop$lzycompute(TestApp.scala:25)
[error] ├ >> @ TestApp$.loop$lzycompute(TestApp.scala:26)
[error] ├ flatMap @ TestApp$.$anonfun$loop$2(TestApp.scala:26)
[error] ├ map @ TestApp$.loop$lzycompute(TestApp.scala:25)
[error] ├ >> @ TestApp$.loop$lzycompute(TestApp.scala:26)
/*
* Copyright 2020-2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
final override def map2[A, B, Z](fa: ParallelF[F, A], fb: ParallelF[F, B])(
f: (A, B) => Z): ParallelF[F, Z] =
ParallelF(
F.uncancelable { poll =>
for {
fiberA <- F.start(ParallelF.value(fa))
fiberB <- F.start(ParallelF.value(fb))
// start a pair of supervisors to ensure that the opposite is canceled on error
_ <- F start {