Skip to content

Instantly share code, notes, and snippets.

View derekjw's full-sized avatar

Derek Williams derekjw

  • FP Solutions Ltd
  • London, United Kingdom
View GitHub Profile
@derekjw
derekjw / Fs2MapAsync.scala
Created April 23, 2018 10:08
ordered mapAsync for fs2
import cats.implicits._
import cats.effect.Effect
import fs2.{Pipe, Stream}
import fs2.async.Promise
import fs2.async.mutable.Queue
import scala.concurrent.ExecutionContext
def mapAsync[F[_]: Effect, A, B](parallelism: Int)(f: A => F[B])(implicit executionContext: ExecutionContext): Pipe[F, A, B] = { stream =>
Stream.eval(Queue.bounded[F, Option[F[Either[Throwable, B]]]](parallelism)).flatMap { q =>
@derekjw
derekjw / fs2-async.scala
Last active February 28, 2018 11:24
Porting some akka-stream combinators to fs2
import fs2._
import scala.concurrent.ExecutionContext.Implicits.global
Scheduler[IO](2)
.flatMap { scheduler =>
Stream.range(0, 96)
.covary[IO]
.through(flatMapAsync[IO, Int, Int](16)(n => scheduler.delay[IO, Int](Stream.emit(n), Random.nextInt(1000).millis)))
}
.onFinalize(IO(println("done")))
@derekjw
derekjw / unit_test.rs
Created May 6, 2016 15:57
Rust unit test macro
macro_rules! unit_tests {
($( fn $name:ident($fixt:ident : $ftype:ty) $body:block )*) => (
$(
#[test]
fn $name() {
let mut $fixt = <$ftype as Fixture>::setup();
$body
$fixt.teardown();
}
)*
@derekjw
derekjw / parallel-ordered-streams.scala
Created April 28, 2016 21:19
Split stream with groupBy, run each substream async and ordered.
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.{Await, Promise}
import scala.concurrent.duration._
import scala.util.Random
object Main extends App {
implicit val actorSystem = ActorSystem()
package org.example
import io.circe.generic.JsonCodec
import io.circe.syntax._
@JsonCodec
case class Foo(value: String)
object Main extends App {
println(Foo("foo").asJson.noSpaces)
@derekjw
derekjw / list.rs
Last active August 29, 2015 14:21
use std::sync::Arc;
use std::fmt;
pub struct List<A> {
node: Arc<Node<A>>
}
enum Node<A> {
Cons(A, Arc<Node<A>>),
Nil
@derekjw
derekjw / FutureMacro.scala
Created March 10, 2013 19:57
macro for wrapping simple values with Future.successful
implicit class FutureCompanionW(val underyling: Future.type) extends AnyVal {
def smart[A](a: A)(implicit ec: ExecutionContext): Future[A] = macro smartFutureMacroImpl[A]
}
def smartFutureMacroImpl[A: c.WeakTypeTag](c: Context)(a: c.Expr[A])(ec: c.Expr[ExecutionContext]): c.Expr[Future[A]] = {
import c.universe._
a.tree match {
case _: Literal | _: Ident => reify(Future successful a.splice)
case _ => reify(Future(a.splice)(ec.splice))
@derekjw
derekjw / SubclassifiedIndex.scala.diff
Created September 17, 2012 15:33
Akka Eventstream fix
--- /home/derek/Development/Projects/akka/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala 2012-09-17 09:25:41.807104726 -0600
+++ SubclassifiedIndex.scala 2012-09-01 11:43:45.906813283 -0600
@@ -79,14 +78,16 @@
* Add key to this index which inherits its value set from the most specific
* super-class which is known.
*/
- def addKey(key: K): Changes =
- subkeys collectFirst {
+ def addKey(key: K): Changes = {
+ val ch = subkeys collect {
@derekjw
derekjw / RequestCont.scala
Created May 21, 2012 20:18
Safely closing over actor state
import akka.actor.Actor
object RequestCont {
def apply[A](actor: Actor)(cont: A => Unit): RequestCont[A] = new RequestCont[A](actor, cont)
}
class RequestCont[A](owner: Actor, cont: A => Unit) extends (A => ResponseCont) {
def apply(value: A): ResponseCont = {
ResponseCont(owner)(cont(value))
}
@derekjw
derekjw / gist:1764249
Created February 8, 2012 01:51
Future flatMap example
class HttpClient {
def get(uri: URI): Future[String]
}
// return Future of next URI to get
def getUri(): Future[URI]
// return's available HttpClient from a pool
def getHttpClient(): Future[HttpClient]