Skip to content

Instantly share code, notes, and snippets.

@mikea
Created May 14, 2014 18:24
Show Gist options
  • Save mikea/b39fa67a2556a4a031a8 to your computer and use it in GitHub Desktop.
Save mikea/b39fa67a2556a4a031a8 to your computer and use it in GitHub Desktop.
Attempt to implement headAndTail function. Results in double reading a file.
public class HeadTest {
public static void main(String[] args) {
Observable<Integer> fileReader = Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
System.out.println("Reading from a file");
subscriber.onNext(100); // header
for (int i = 0; i < 10; ++i) {
subscriber.onNext(i);
}
subscriber.onCompleted();
});
Pair<Observable<Integer>, Observable<Integer>> headAndTail = headAndTail(fileReader);
Integer headerValue = BlockingObservable.from(headAndTail.x).single();
Observable<Integer> processedValues = headAndTail.y.map(i -> i + headerValue);
BlockingObservable.from(processedValues).forEach(System.out::println);
}
public static <T> Pair<Observable<T>, Observable<T>> headAndTail(Observable<T> o) {
return new Pair<>(o.first(), o.skip(1));
}
public static class Pair<X, Y> {
public final X x;
public final Y y;
public Pair(X x, Y y) {
this.x = x;
this.y = y;
}
}
}
Reading from a file
Reading from a file
100
101
102
103
104
105
106
107
108
109
@benjchristensen
Copy link

Here are examples for accessing head and items at the same time:

// you have access to head and all the items here
        // and can do side-effects in here
        HeadObservable.from(is).flatMap(ho -> {
            return ho.take(5).doOnNext(t -> {
                System.out.println("Head: " + ho.head + "   Item: " + t);
            });
        }).subscribe();

        System.out.println("-------------------------");

        // or can transform the data however needed
        HeadObservable.from(is).flatMap(ho -> {
            return ho.take(5).<List<Long>> map(t -> {

                return Arrays.asList(ho.head, t);
            });
        }).subscribe(d -> {
            System.out.println("Head: " + d.get(0) + "   Item: " + d.get(1));
        });

        System.out.println("-------------------------");

        // or reduce to a single output
        HeadObservable.from(is).flatMap(ho -> {
            return ho.take(5).reduce(0L, (acc, t) -> {
                return acc + t;
            }).<List<Long>> map(a -> Arrays.asList(ho.head, a));
        }).subscribe(d -> {
            System.out.println("Head: " + d.get(0) + "  Accumulated Sum: " + d.get(1));
        });

These emit:

-------------------------
Head: 0   Item: 1
Head: 0   Item: 2
Head: 0   Item: 3
Head: 0   Item: 4
Head: 0   Item: 5
source unsubscribed
-------------------------
Head: 0   Item: 1
Head: 0   Item: 2
Head: 0   Item: 3
Head: 0   Item: 4
Head: 0   Item: 5
source unsubscribed
-------------------------
Head: 0  Accumulated Sum: 15
source unsubscribed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment