Skip to content

Instantly share code, notes, and snippets.

Vasily Kirichenko vasily-kirichenko

Block or report user

Report or block vasily-kirichenko

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
View rider_jstack.log
2019-01-25 11:04:29
Full thread dump OpenJDK 64-Bit Server VM (25.152-b26 mixed mode):
"ApplicationImpl pooled thread 763" #2017 daemon prio=4 os_prio=-1 tid=0x000000004b056800 nid=0x4d90 waiting on condition [0x0000000070bcf000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000aa188de0> (a java.util.concurrent.SynchronousQueue$TransferStack)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
View rider_jstack.log
2018-12-22 09:28:32
Full thread dump OpenJDK 64-Bit Server VM (25.152-b26 mixed mode):
"Attach Listener" #558 daemon prio=9 os_prio=31 tid=0x00007f98d6b16800 nid=0x12f47 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"Timer-0" #26 daemon prio=5 os_prio=31 tid=0x00007f98cfd2c800 nid=0xdf17 in Object.wait() [0x00007000082ab000]
View ddata.scala
import akka.NotUsed
import akka.actor.Scheduler
import akka.actor.typed.{ActorRef, _}
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.Cluster
import akka.cluster.ddata.{LWWMap, LWWMapKey, ReplicatedData}
import akka.cluster.ddata.typed.scaladsl.{Replicator, _}
import akka.stream._
View caching_flow.scala
def cache[In, Out](flow: Flow[In, Out, _]) = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
var cache: Map[In, Out] = Map.empty
val partition = b.add(Partition[In](2, in => if (cache.contains(in)) 0 else 1))
val merge = b.add(Merge[Out](2))
partition.out(0).map(in => cache(in)) ~> merge
val broadcast = b.add(Broadcast[In](2))
partition.out(1) ~> broadcast
View paralle_workers_with_monix.scala
def processor(account: String, taskCh: MVar[Int]): Observable[String] =
Observable.fromAsyncStateAction[Unit, String](_ =>
for {
task <- taskCh.take
// simulate real processing by an async sleep
_ <- Task.sleep(Random.nextInt(500).millis)
} yield {
println(s"[$account] processed $task")
((task + 100).toString, ())
}
View offer_async.fs
open Akka.Actor
open Akka.Streams
open Akka.Streams.Dsl
open System
open System.Threading
[<EntryPoint>]
let main _ =
let sys = ActorSystem.Create "test"
let mat = ActorMaterializer.Create sys
View LifeCycle.fs
let props (_scanType: ScanType) : Akka.Actor.Props =
props(
let rec start (ctx: Actor<obj>) (state: State) (msg: obj) =
match msg with
| LifecycleEvent PostStop -> kill state.Process
| :? Msg as msg ->
match msg with
| Start ->
Starter.start def |> Async.map Started |!> ctx.Self.Retype()
become (starting ctx state)
View CResult.fs
[<AutoOpen>]
module CResult =
open System.Diagnostics
[<Sealed>]
type CResultBuilder () =
static let zero = Ok ()
member inline __.Return value : Result<'T, 'Error> = Ok value
View Spans.fs
[<MemoryDiagnoser>]
type Betch() =
let s = "123 456"
[<Benchmark(Baseline = true)>]
member __.Array() =
let [|foo; bar|] = s.Split ' '
let _x =
match BigInteger.TryParse foo, Int32.TryParse bar with
| (true, x), (true, y) -> x, y
View ImageProcessingPipeline.fs
open Akka.Actor
open Akka.Streams
open Akkling.Streams
open System
type Image = Image of string
let imageJob delay jobName (Image image) =
async {
printfn "%s Started: %s" jobName image
You can’t perform that action at this time.