Skip to content

Instantly share code, notes, and snippets.

@hochgi
hochgi / Scheduling.scala
Created February 20, 2019 11:11
scala scheduling with akka
import akka.actor.{ActorSystem, Scheduler}
import com.astoncap.util.concurrent.Async._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.Try
trait Scheduling {
implicit val scheduling: Scheduling = this
@hochgi
hochgi / FutureOps.scala
Last active July 3, 2018 06:09
Fast dynamic async reduce
implicit class FutureOps(companion: Future.type) {
def unorderedReduce[A, B](in: Iterable[A])
(fn: A => Future[B], combop: (B,B) => B)
(implicit executor: ExecutionContext): Future[B] = {
val promises = Array.fill(2 * in.size - 1)(Promise.apply[B])
var i = 0 // used for iteration optimization (not needing to search for uncompleted promise from start every time)
in.foreach { a =>
fn(a).onComplete { t =>
var p = promises(i)
@hochgi
hochgi / .gitignore
Last active January 17, 2020 01:22
qotd for CM-Well
# idea stuff
*.iml
.idea/
# scala staff
*.class
*.log
branches.properties
logs
@hochgi
hochgi / Trump.ttl
Last active November 21, 2017 17:27
RDF sample data to demonstrate CM-Well's yg pipe feature
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
@prefix dc: <http://purl.org/dc/terms/> .
@prefix locn: <http://www.w3.org/ns/locn#> .
@prefix madsrdf: <http://www.loc.gov/mads/rdf/v1#> .
@prefix geonames: <http://www.geonames.org/ontology#> .
@prefix geo: <http://www.w3.org/2003/01/geo/wgs84_pos#> .
@prefix xmpl: <http://ont.example.org/2017/v1.0#> .
@prefix time: <http://www.w3.org/2006/time#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@hochgi
hochgi / PartitionWithMeter.scala
Last active June 13, 2016 09:07
PartitionWith measure
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import com.typesafe.config.ConfigFactory
import org.scalameter._
import scala.concurrent._ ,duration._
import scala.concurrent.ExecutionContext.Implicits.global
object PartitionWithMeter {
@hochgi
hochgi / PartitionWith.scala
Last active March 6, 2022 11:48
akka-stream retry flow
package hochgi.util
package object collections {
/**
* `partition` and `map` combined.
* for a given collection, and a function from the collection elements to `Either[A,B]`,
* generates a tuple of 2 collections of types `A` and `B`
*
* @param xs the collection of elements
* @param f a function that convert an element to an `Either[A,B]`
@hochgi
hochgi / SimpleHttpClient.scala
Last active April 3, 2016 07:46
wrapping akka-http client
package hochgi.util.http
import java.io.InputStream
import akka.actor.ActorSystem
import akka.http.scaladsl._
import akka.http.scaladsl.model.HttpHeader.ParsingResult
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import akka.stream.stage.{OutHandler, InHandler, GraphStageLogic, GraphStage}
@hochgi
hochgi / Aggregate.scala
Created January 18, 2016 07:01
Aggregation stage for akka-streams to serve as a "smart buffer", a kind of hybrid for conflate and buffer stages
object Aggregate {
private[this] val inc: Any ⇒ Long = _⇒1L
def apply[In,Out](max: Long, seed: In ⇒ Out)(aggregate: (Out, In) ⇒ Out) =
AggregateWeighted[In,Out](max, inc, seed)(aggregate)
}
object AggregateWeighted {
@hochgi
hochgi / akka-stream_UnfoldUsingGraphStage.scala
Created December 9, 2015 15:46
unfold/unfoldAsync implementation using GraphStage in akka-stream
import akka.stream.stage.{OutHandler, GraphStageLogic, GraphStage}
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.stream.scaladsl._
import akka.stream._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
class Unfold[S, E](s: S,f: S => Option[(S, E)]) extends GraphStage[SourceShape[E]] {
@hochgi
hochgi / AkkaStreamUnfold.scala
Last active December 1, 2015 12:38
akka-stream-unfold
import akka.actor._
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.stream.scaladsl.Source
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
def unfold[S,E](s: S)
(f: S => Option[(S,E)]): Source[E,ActorRef] =