Skip to content

Instantly share code, notes, and snippets.

@simonbasle
Created March 4, 2019 16:16

Revisions

  1. simonbasle created this gist Mar 4, 2019.
    172 changes: 172 additions & 0 deletions Scratch.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,172 @@
    /*
    * Copyright (c) 2011-2019 Pivotal Software Inc, All Rights Reserved.
    *
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */

    import java.time.LocalTime;
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;

    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Hooks;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;

    class Scratch {

    public static void main(String[] args) {
    try { imperative(); System.out.println("RE RUN imperative()"); } catch (Exception t) { t.printStackTrace();}
    try { reactive(); System.out.println("RE RUN reactive()"); } catch (Exception t) { t.printStackTrace();}
    try { reactiveNoSubscribeOn(); System.out.println("RE RUN reactiveNoSubscribeOn()"); } catch (Exception t) { t.printStackTrace();}
    try { log(); System.out.println("RE RUN log()"); } catch (Exception t) { t.printStackTrace();}
    try { hook(); System.out.println("RE RUN hook()"); } catch (Exception t) { t.printStackTrace();}
    try { checkpoint(); System.out.println("RE RUN checkpoint()"); } catch (Exception t) { t.printStackTrace();}

    System.exit(0);
    }

    private static void imperative() throws ExecutionException, InterruptedException {
    final ScheduledExecutorService executor =
    Executors.newSingleThreadScheduledExecutor();

    int seconds = LocalTime.now().getSecond();
    List<Integer> source;
    if (seconds % 2 == 0) {
    source = IntStream.range(1, 11).boxed().collect(Collectors.toList());
    }
    else if (seconds % 3 == 0) {
    source = IntStream.range(0, 4).boxed().collect(Collectors.toList());
    }
    else {
    source = Arrays.asList(1, 2, 3, 4);
    }

    executor.submit(() -> source.get(5)) //line 76
    .get();
    }

    private static void reactive() {
    int seconds = LocalTime.now().getSecond();
    Mono<Integer> source;
    if (seconds % 2 == 0) {
    source = Flux.range(1, 10)
    .elementAt(5);
    }
    else if (seconds % 3 == 0) {
    source = Flux.range(0, 4)
    .elementAt(5);
    }
    else {
    source = Flux.just(1, 2, 3, 4)
    .elementAt(5);
    }

    source.subscribeOn(Schedulers.parallel())
    .block(); //line 97
    }

    private static void reactiveNoSubscribeOn() {
    int seconds = LocalTime.now().getSecond();
    Mono<Integer> source;
    if (seconds % 2 == 0) {
    source = Flux.range(1, 10)
    .elementAt(5);
    }
    else if (seconds % 3 == 0) {
    source = Flux.range(0, 4)
    .elementAt(5);
    }
    else {
    source = Flux.just(1, 2, 3, 4)
    .elementAt(5);
    }

    source.block(); //line 116
    }

    private static void log() {
    int seconds = LocalTime.now().getSecond();
    Mono<Integer> source;
    if (seconds % 2 == 0) {
    source = Flux.range(1, 10)
    .elementAt(5)
    .log("source A");
    }
    else if (seconds % 3 == 0) {
    source = Flux.range(0, 4)
    .elementAt(5)
    .log("source B");
    }
    else {
    source = Flux.just(1, 2, 3, 4)
    .elementAt(5)
    .log("source C");
    }

    source.block(); //line 138
    }


    private static void hook() {
    Hooks.onOperatorDebug();
    try {
    int seconds = LocalTime.now().getSecond();
    Mono<Integer> source;
    if (seconds % 2 == 0) {
    source = Flux.range(1, 10)
    .elementAt(5); //line 149
    }
    else if (seconds % 3 == 0) {
    source = Flux.range(0, 4)
    .elementAt(5); //line 153
    }
    else {
    source = Flux.just(1, 2, 3, 4)
    .elementAt(5); //line 157
    }

    source.block(); //line 160
    }
    finally {
    Hooks.resetOnOperatorDebug();
    }
    }

    private static void checkpoint() {
    int seconds = LocalTime.now().getSecond();
    Mono<Integer> source;
    if (seconds % 2 == 0) {
    source = Flux.range(1, 10)
    .elementAt(5)
    .checkpoint("source range(1,10)");
    }
    else if (seconds % 3 == 0) {
    source = Flux.range(0, 4)
    .elementAt(5)
    .checkpoint("source range(0,4)");
    }
    else {
    source = Flux.just(1, 2, 3, 4)
    .elementAt(5)
    .checkpoint("source just(1,2,3,4)");
    }

    source.block(); //line 186
    }
    }