Skip to content

Instantly share code, notes, and snippets.

// Using a hypothetical channel API, I assume I have channel `in` with a 4096-wide buffer,
// and a channel `out` writing to Kafka.
// lookupNodeInRedis and processOnServer have semaphores of, say, 16 and 8, to *globally* limit
// concurrent use of those resources.
var processingQueue = new BufferedChannel(50); // this buffer bounds the number of processing threads
// Consume and process input
Thread.startVirtualThread(() -> {
// Using a hypothetical channel API, I assume I have channel `in` with a 4096-wide buffer,
// and a channel `out` writing to Kafka.
// lookupNodeInRedis and processOnServer have semaphores of, say, 16 and 8, to *globally* limit
// concurrent use of those resources.
HighLevelLib.processInOrder(50, // how many processors
in,
HighLevelLib.batchEvery(Duration.ofSeconds(1), out),
msg ->
@pron
pron / gist:6092559
Created July 26, 2013 22:01
Recursive go blocks with Pulsar
(ns co.paralleluniverse.pulsar.examples.walker
(:use [co.paralleluniverse.pulsar.core :exclude [close!]]
[co.paralleluniverse.pulsar.async]))
(defn walk [tree ch]
(letsfn [(walker [t]
(when t
(walker (:left t))
(>! ch (:value t))
(walker (:right t))))]
package jmodern;
import com.codahale.metrics.*;
import com.codahale.metrics.annotation.*;
import com.fasterxml.jackson.annotation.*;
import com.google.common.base.Optional;
import feign.Feign;
import feign.jackson.*;
import feign.jaxrs.*;
import io.dropwizard.Application;
package foo;
import co.paralleluniverse.fibers.*;
import co.paralleluniverse.strands.*;
import co.paralleluniverse.strands.channels.*;
import static co.paralleluniverse.strands.channels.Channels.*;
public class quasarwhispers {
static SuspendableRunnable whisper(LongChannel left, LongChannel right) {
return () -> { right.send(left.receive() + 1); }
}
package foo;
import co.paralleluniverse.fibers.*;
import co.paralleluniverse.strands.*;
import co.paralleluniverse.strands.channels.*;
import static co.paralleluniverse.strands.channels.Channels.*;
public class quasarwhispers {
static void quasarWhispers(int n) throws Exception {
LongChannel leftmost = newLongChannel(0);
LongChannel left = leftmost, right = leftmost;
package jmodern;
import com.codahale.metrics.*;
import com.codahale.metrics.annotation.*;
import com.fasterxml.jackson.annotation.*;
import com.google.common.base.Optional;
import dagger.Module;
import dagger.ObjectGraph;
import dagger.Provides;
import io.dropwizard.Application;
package jmodern;
import com.codahale.metrics.*;
import com.codahale.metrics.annotation.*;
import com.fasterxml.jackson.annotation.*;
import com.google.common.base.Optional;
import feign.Feign;
import feign.jackson.*;
import feign.jaxrs.*;
import io.dropwizard.Application;
@pron
pron / Main.java
Last active August 29, 2015 14:01
package jmodern;
import com.codahale.metrics.*;
import com.codahale.metrics.annotation.*;
import com.fasterxml.jackson.annotation.*;
import com.google.common.base.Optional;
import feign.Feign;
import feign.jackson.*;
import feign.jaxrs.*;
import io.dropwizard.Application;
public class Bench {
public static void main(String[] args) {
double res;
long start;
int numVals = 1000000;
System.out.println("Warming up");
start = System.nanoTime();
res = bar(new double[10000], 5000);
System.out.println("Done. " + ((System.nanoTime() - start) / 1000000) + " ms (" + res + ")");