Skip to content

Instantly share code, notes, and snippets.

@mikea
Created May 14, 2014 18:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mikea/b39fa67a2556a4a031a8 to your computer and use it in GitHub Desktop.
Save mikea/b39fa67a2556a4a031a8 to your computer and use it in GitHub Desktop.
Attempt to implement headAndTail function. Results in double reading a file.
public class HeadTest {
public static void main(String[] args) {
Observable<Integer> fileReader = Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
System.out.println("Reading from a file");
subscriber.onNext(100); // header
for (int i = 0; i < 10; ++i) {
subscriber.onNext(i);
}
subscriber.onCompleted();
});
Pair<Observable<Integer>, Observable<Integer>> headAndTail = headAndTail(fileReader);
Integer headerValue = BlockingObservable.from(headAndTail.x).single();
Observable<Integer> processedValues = headAndTail.y.map(i -> i + headerValue);
BlockingObservable.from(processedValues).forEach(System.out::println);
}
public static <T> Pair<Observable<T>, Observable<T>> headAndTail(Observable<T> o) {
return new Pair<>(o.first(), o.skip(1));
}
public static class Pair<X, Y> {
public final X x;
public final Y y;
public Pair(X x, Y y) {
this.x = x;
this.y = y;
}
}
}
Reading from a file
Reading from a file
100
101
102
103
104
105
106
107
108
109
@benjchristensen
Copy link

After thinking about this, using PublishSubject is not the correct approach. Here is an implementation that may work for your use case:

package test;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;

public class HeadImpl1 {

    public static void main(String[] args) {

        Observable<Long> is = Observable.create((Subscriber<? super Long> s) -> {
            long l = 0;
            while (!s.isUnsubscribed()) {
                s.onNext(l++);
            }
        });

        Pair.from(is).subscribe(p -> {
            p.head.subscribe(h -> {
                System.out.println("head => " + h);
            });

            // take(5) is just as an example here
            p.rest.take(5).subscribe(h -> {
                System.out.println("rest => " + h);
            });
        });

    }

    public static class Pair<T> {
        final Observable<T> head;
        final Observable<T> rest;

        public Pair(Observable<T> head, Observable<T> rest) {
            this.head = head.take(1);
            this.rest = rest;
        }

        public static <T> Observable<Pair<T>> from(Observable<T> is) {
            return is.groupBy(new Func1<T, Boolean>() {

                boolean firstEmitted = false;

                @Override
                public Boolean call(T l) {
                    if (firstEmitted) {
                        return Boolean.FALSE;
                    } else {
                        firstEmitted = true;
                        return Boolean.TRUE;
                    }
                }

            }).buffer(2).take(1).map(gs -> {
                return new Pair<T>(gs.get(0), gs.get(1));
            });
        }

    }
}

@benjchristensen
Copy link

Here is another approach ... it's got deficiencies, but can also work, and is better for certain uses:

package test;

import rx.Observable;
import rx.Subscriber;
import rx.subjects.PublishSubject;

public class HeadImpl3 {

    public static void main(String[] args) {

        Observable<Long> is = Observable.create((Subscriber<? super Long> s) -> {
            long l = 0;
            while (!s.isUnsubscribed()) {
                s.onNext(l++);
            }
            System.out.println("source unsubscribed");
        });

        HeadObservable.from(is).flatMap(ho -> {
            System.out.println("Head: " + ho.head);
            return ho;
        }).take(5).toBlockingObservable().forEach(t -> { // non-blocking would be .subscribe here
            System.out.println("Rest: " + t);
        });

    }

    public static class HeadObservable<T> extends Observable<T> {

        private final T head;

        protected HeadObservable(final T head, final Observable<T> is) {
            super(new OnSubscribe<T>() {

                @Override
                public void call(Subscriber<? super T> s) {
                    is.unsafeSubscribe(s);
                }

            });
            this.head = head;
        }

        public static <T> Observable<HeadObservable<T>> from(Observable<T> is) {
            return is.lift(new Operator<HeadObservable<T>, T>() {

                @Override
                public Subscriber<? super T> call(Subscriber<? super HeadObservable<T>> child) {
                    return new Subscriber<T>(child) {

                        private HeadObservable<T> ho = null;
                        private final PublishSubject<T> rest = PublishSubject.create();

                        @Override
                        public void onCompleted() {
                            child.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e) {
                            child.onError(e);
                        }

                        @Override
                        public void onNext(T t) {
                            if (ho == null) {
                                ho = new HeadObservable<T>(t, rest);
                                child.onNext(ho);
                            } else {
                                rest.onNext(t);
                            }
                        }

                    };
                }

            });
        }
    }
}

@benjchristensen
Copy link

Here is a more robust implementation that better handles unsubscribing. It does not work with synchronous sources though ... until a fix is done in RxJava which already has a PR open. See comments in code.

package test;

import rx.Observable;
import rx.Subscriber;
import rx.observables.ConnectableObservable;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

public class HeadImpl4 {

    public static void main(String[] args) {

        Observable<Long> is = Observable.create((Subscriber<? super Long> s) -> {
            long l = 0;
            while (!s.isUnsubscribed()) {
                s.onNext(l++);
            }
            System.out.println("source unsubscribed");
        }).subscribeOn(Schedulers.computation());

        HeadObservable.from(is).flatMap(ho -> {
            System.out.println("Head: " + ho.head);
            return ho;
        }).take(5).subscribe(t -> {
            System.out.println("Rest: " + t);
        });

        System.out.println("-------------------------");

        HeadObservable.from(is).flatMap(ho -> {
            System.out.println("Head: " + ho.head);
            return ho;
        }).take(5).toBlockingObservable().forEach(t -> {
            System.out.println("Rest: " + t);
        });

        System.out.println("-------------------------");

        HeadObservable.from(is).flatMap(ho -> {
            System.out.println("Head: " + ho.head);
            return ho.take(5);
        }).toBlockingObservable().forEach(t -> {
            System.out.println("Rest: " + t);
        });

    }

    public static class HeadObservable<T> extends Observable<T> {

        private final T head;

        protected HeadObservable(final T head, final Observable<T> is) {
            super(new OnSubscribe<T>() {

                @Override
                public void call(Subscriber<? super T> s) {
                    is.unsafeSubscribe(s);
                }

            });
            this.head = head;
        }

        public static <T> Observable<HeadObservable<T>> from(Observable<T> is) {

            final ConnectableObservable<T> multicast = is.multicast(PublishSubject.create());
            final Observable<T> head = multicast.take(1);
            final Observable<T> rest = multicast;

            return Observable.create(new OnSubscribe<HeadObservable<T>>() {

                @Override
                public void call(Subscriber<? super HeadObservable<T>> s) {
                    head.unsafeSubscribe(new Subscriber<T>(s) {

                        @Override
                        public void onCompleted() {
                            s.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e) {
                            s.onError(e);
                        }

                        @Override
                        public void onNext(T t) {
                            s.onNext(new HeadObservable<T>(t, rest));
                            s.onCompleted();
                        }

                    });
                    // TODO need to update ConnectableObservable to support synchronous Observables (v0.17+)
                    // see https://github.com/Netflix/RxJava/pull/1175 for possible fix to this
                    s.add(multicast.connect());
                }

            });

        }
    }
}

@benjchristensen
Copy link

This feels like the most elegant however, and works with each of the use cases I've tried:

package test;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;

public class HeadImpl1b {

    public static void main(String[] args) {

        Observable<Long> is = Observable.create((Subscriber<? super Long> s) -> {
            long l = 0;
            while (!s.isUnsubscribed()) {
                s.onNext(l++);
            }
            System.out.println("source unsubscribed");
        });

        HeadObservable.from(is).flatMap(ho -> {
            System.out.println("Head: " + ho.head);
            return ho;
        }).take(5).subscribe(t -> {
            System.out.println("Rest: " + t);
        });

        System.out.println("-------------------------");

        HeadObservable.from(is).flatMap(ho -> {
            System.out.println("Head: " + ho.head);
            return ho;
        }).take(5).toBlockingObservable().forEach(t -> {
            System.out.println("Rest: " + t);
        });

        System.out.println("-------------------------");

        HeadObservable.from(is).flatMap(ho -> {
            System.out.println("Head: " + ho.head);
            return ho.take(5);
        }).toBlockingObservable().forEach(t -> {
            System.out.println("Rest: " + t);
        });

    }

    public static class HeadObservable<T> extends Observable<T> {

        private final T head;

        protected HeadObservable(final T head, final Observable<T> is) {
            super(new OnSubscribe<T>() {

                @Override
                public void call(Subscriber<? super T> s) {
                    is.unsafeSubscribe(s);
                }

            });
            this.head = head;
        }

        public static <T> Observable<HeadObservable<T>> from(Observable<T> is) {
            return is.groupBy(new Func1<T, Boolean>() {

                boolean firstEmitted = false;

                @Override
                public Boolean call(T l) {
                    if (firstEmitted) {
                        return Boolean.FALSE;
                    } else {
                        firstEmitted = true;
                        return Boolean.TRUE;
                    }
                }

            }).buffer(2).take(1).flatMap(gs -> {
                return gs.get(0).take(1).map(t -> {
                    return new HeadObservable<T>(t, gs.get(1));
                });
            });
        }

    }
}

@benjchristensen
Copy link

Here are examples for accessing head and items at the same time:

// you have access to head and all the items here
        // and can do side-effects in here
        HeadObservable.from(is).flatMap(ho -> {
            return ho.take(5).doOnNext(t -> {
                System.out.println("Head: " + ho.head + "   Item: " + t);
            });
        }).subscribe();

        System.out.println("-------------------------");

        // or can transform the data however needed
        HeadObservable.from(is).flatMap(ho -> {
            return ho.take(5).<List<Long>> map(t -> {

                return Arrays.asList(ho.head, t);
            });
        }).subscribe(d -> {
            System.out.println("Head: " + d.get(0) + "   Item: " + d.get(1));
        });

        System.out.println("-------------------------");

        // or reduce to a single output
        HeadObservable.from(is).flatMap(ho -> {
            return ho.take(5).reduce(0L, (acc, t) -> {
                return acc + t;
            }).<List<Long>> map(a -> Arrays.asList(ho.head, a));
        }).subscribe(d -> {
            System.out.println("Head: " + d.get(0) + "  Accumulated Sum: " + d.get(1));
        });

These emit:

-------------------------
Head: 0   Item: 1
Head: 0   Item: 2
Head: 0   Item: 3
Head: 0   Item: 4
Head: 0   Item: 5
source unsubscribed
-------------------------
Head: 0   Item: 1
Head: 0   Item: 2
Head: 0   Item: 3
Head: 0   Item: 4
Head: 0   Item: 5
source unsubscribed
-------------------------
Head: 0  Accumulated Sum: 15
source unsubscribed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment