Skip to content

Instantly share code, notes, and snippets.

View smaldini's full-sized avatar

Stephane Maldini smaldini

View GitHub Profile
return PublisherFactory.create(
(n, s) -> {
long i = 0l;
while (i < n && s.context().get() < elements) {
if(s.isCancelled()) return;
s.onNext(s.context().getAndIncrement());
i++;
}
if(s.context().get() == elements){
@smaldini
smaldini / gist:5d93da9aa606b12e4069
Last active August 29, 2015 14:21
a simple proxy
HttpServer<Buffer, Buffer> server = NetStreams.httpServer("0.0.0.0", 80);
server.get("/search/{search}", requestIn ->
NetStreams.httpClient()
.get("http://google.co.uk/?q=" + requestIn.param("search"))
.flatMap(repliesOut -> requestIn.writeWith(repliesOut))
);
server.start().await();
/*
* Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@smaldini
smaldini / gist:df93cc613b57b8610356
Last active August 29, 2015 14:17
Another Http Reactor with hot stream server-side
@Test
public void test5() throws Exception {
//Hot stream of data, could be injected from anywhere
Broadcaster<String> broadcaster = Broadcaster.<String>create(Environment.sharedDispatcher());
//Will go up to 16 parallel threads to proceed clients
final int MAX_PARALLEL = 16;
//Get a reference to the tail of the operation pipeline (microbatching + partitioning)

Keybase proof

I hereby claim:

  • I am smaldini on github.
  • I am smaldini (https://keybase.io/smaldini) on keybase.
  • I have a public key whose fingerprint is 2696 137E 7ECA D45A CF63 4F82 D68B DFBA 314B 6F16

To claim this, I am signing this object:

@smaldini
smaldini / gist:b95b8d325dafa4de608f
Last active August 29, 2015 14:14
Preliminary HTTP support in reactor-net
def "http responds to requests from clients"() {
given: "a simple HttpServer"
//Listen on localhost using default impl (Netty) and assign a global codec to receive/reply String data
def server = NetStreams.httpServer {
it.codec(StandardCodecs.STRING_CODEC).listen(port)
}
//Prepare a client using default impl (Netty) to connect on http://localhost:port/ and assign global codec to send/receive String data
def client = NetStreams.httpClient {
@smaldini
smaldini / gist:f0ccba917954d8862526
Last active August 29, 2015 14:14
Reactor, Reactive Streams and Netty
def "step-read and flush every 5 elems with manual decoding"() {
given: "a TcpServer and a TcpClient"
def latch = new CountDownLatch(10)
def server = NetStreams.tcpServer(port)
def client = NetStreams.tcpClient("localhost", port)
def codec = new JsonCodec<Pojo, Pojo>(Pojo)
when: "the client/server are prepared"
server.pipeline { input ->
class ChronicleStreamSpec extends Specification {
def "ChronicleStream persists objects and notifies subscribers"() {
given:
"2 slaves and 1 master"
def putPromise = IOStreams.<Integer, String> persistentMapReader('journal')
.onPut()
.log('put')
.next()
public class SensorProcessor implements ReactorProcessor<SensorData, SensorSummary> {
@Override
public Stream<SensorSummary> process(Stream<SensorData> inputStream) {
return inputStream
.buffer(5, 20, TimeUnit.SECONDS)
//would be better to convert to stream of double 'values' and then have generic avg for type safety.
package reactor.rx;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.AbstractReactorTest;
import reactor.fn.tuple.Tuple;
import reactor.io.IOStreams;
import reactor.rx.stream.MapStream;