Skip to content

Instantly share code, notes, and snippets.

@daschl
Created July 16, 2014 12:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daschl/1b7db65cf5d73f6af264 to your computer and use it in GitHub Desktop.
Save daschl/1b7db65cf5d73f6af264 to your computer and use it in GitHub Desktop.
// 1: do func1(); which returns an Observable<Res1>
// 2: once Res1 is onComplete (so done without error):
// 3a: do func2();
// 3b: as long as the result does not match a given codition.
// 4: if done, return the original result from func1
In sync it would more or less look like:
Res1 res1 = func1();
done = false;
while(!done) {
if (func2() == condition) {
done = true;
}
}
... for more context: I store a document on the server and then need to poll the server for its state (replicated, persisted).. and once my needed state is satisfied I just return whatever got returned from the original store call.
return res1;
@zsxwing
Copy link

zsxwing commented Jul 16, 2014

Looks you can use repeat:

    public Observable<Integer> foo() {
        return func1().flatMap(new Func1<Integer, Observable<Integer>>() {

            @Override
            public Observable<Integer> call(final Integer originalValue) {
                return func2().repeat().skipWhile(new Func1<Integer, Boolean>() {

                    @Override
                    public Boolean call(Integer integer) {
                        // condition
                        return false;
                    }
                }).take(1).map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer originalValue) {
                        return originalValue;
                    }
                });
            }
        });
    }

@zsxwing
Copy link

zsxwing commented Jul 16, 2014

Just updated my comment to add take(1) to cancel the repeat. Sorry that I forgot it.

@headinthebox
Copy link

Yea, @zwsing's solution looks close to what I'd do.

@daschl
Copy link
Author

daschl commented Jul 16, 2014

Hmm it looks like its just caching the value, but I actually need to re-run the command. so this is my current impl:

    @Override
    public <D extends Document<?>> Observable<D> upsert(final D document, PersistTo persistTo, ReplicateTo replicateTo) {
        return upsert(document).flatMap(new Func1<D, Observable<? extends D>>() {
            @Override
            public Observable<? extends D> call(final D original) {
                return core
                    .<ObserveResponse>send(new ObserveRequest(document.id(), document.cas(), true, (short) 0, bucket))
                    .repeat()
                    .skipWhile(new Func1<ObserveResponse, Boolean>() {
                        @Override
                        public Boolean call(ObserveResponse observeResponse) {
                            System.err.println(observeResponse.hashCode());
                            return true;
                        }
                    }).map(new Func1<ObserveResponse, D>() {
                        @Override
                        public D call(ObserveResponse observeResponse) {
                            return original;
                        }
                    });
            }
        });
    }

But the response has the same hashcode and it never changes. This send method really needs to be called on every new try.

@zsxwing
Copy link

zsxwing commented Jul 16, 2014

@daschl in addition, you forget to use take(1).

@headinthebox
Copy link

Not sure what's going on actually, the code below works as I expect. Not sure why you send is returning the same value.

upsert(1)
calling send() with 0
calling send() with 1
calling send() with 2
calling send() with 3
calling send() with 4
calling send() with 5
1
object MainScala {
  def main(args: Array[String]): Unit = {
    def upsert(i: Int) = {
      println(s"upsert($i)")
      Observable.items(i)
    }
    var i = 0;

    def send() = Observable[Int](observer => {
      println(s"calling send() with $i")
      observer.onNext(i)
      i += 1
      observer.onCompleted()
    })
    def test(n: Int) = n < 5
    val zs = upsert(1).flatMap(x =>
      send().repeat.dropWhile(n => test(n))
        .take(1).map(_ => x)
    )

    zs.subscribe(z => println(z))
    readLine()
  }
}

@daschl
Copy link
Author

daschl commented Jul 16, 2014

@headinthebox, @zsxwing thanks much :) .. My thought then was around maybe using the scheduler to schedule into a observable and then use this code so N responses get emitted from one observable.
Not sure if this is the right way :)

@daschl
Copy link
Author

daschl commented Jul 16, 2014

Ha! I think this works:

what do you think guys?

    private <D extends Document<?>> Observable<D> observe(Observable<D> original, PersistTo persistTo, ReplicateTo replicateTo) {
        return original.flatMap(new Func1<D, Observable<? extends D>>() {
            @Override
            public Observable<? extends D> call(final D original) {
                // simulate with persistto master for now
                return Observable.defer(new Func0<Observable<ObserveResponse>>() {
                    @Override
                    public Observable<ObserveResponse> call() {
                        return core.send(new ObserveRequest(original.id(), original.cas(), true, (short) 0, bucket));
                    }
                }).repeat()
                    .skipWhile(new Func1<ObserveResponse, Boolean>() {
                        @Override
                        public Boolean call(ObserveResponse observeResponse) {
                            // also simulate with persistto master for now
                            System.err.println(observeResponse.observeStatus());
                            return observeResponse.observeStatus() != ObserveResponse.ObserveStatus.FOUND_PERSISTED;
                        }
                    }).take(1).map(new Func1<ObserveResponse, D>() {
                        @Override
                        public D call(ObserveResponse observeResponse) {
                            return original;
                        }
                    });
            }
        });
    }

@headinthebox
Copy link

Yup, that looks good.

object MainScala {
  def main(args: Array[String]): Unit = {
    def upsert(i: Int) = {
      println(s"upsert($i)")
      Observable.items(i)
    }

    var i = 0;
    def send() = {
      val result = AsyncSubject[Int]();
      Observable[Int](observer => {
        println(s"calling send() with $i")
        observer.onNext(i)
        i += 1
        observer.onCompleted()
      }).subscribe(result)
      result
    }
    def test(n: Int) = n < 5
    val zs = upsert(1).flatMap(x =>
      Observable.defer{ send() }.repeat.dropWhile(n => test(n))
        .take(1).map(_ => x)
    )
    zs.subscribe(z => println(z))
    readLine()
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment