Skip to content

Instantly share code, notes, and snippets.

@daschl
Created July 16, 2014 12:26
Show Gist options
  • Save daschl/1b7db65cf5d73f6af264 to your computer and use it in GitHub Desktop.
Save daschl/1b7db65cf5d73f6af264 to your computer and use it in GitHub Desktop.
// 1: do func1(); which returns an Observable<Res1>
// 2: once Res1 is onComplete (so done without error):
// 3a: do func2();
// 3b: as long as the result does not match a given codition.
// 4: if done, return the original result from func1
In sync it would more or less look like:
Res1 res1 = func1();
done = false;
while(!done) {
if (func2() == condition) {
done = true;
}
}
... for more context: I store a document on the server and then need to poll the server for its state (replicated, persisted).. and once my needed state is satisfied I just return whatever got returned from the original store call.
return res1;
@daschl
Copy link
Author

daschl commented Jul 16, 2014

Ha! I think this works:

what do you think guys?

    private <D extends Document<?>> Observable<D> observe(Observable<D> original, PersistTo persistTo, ReplicateTo replicateTo) {
        return original.flatMap(new Func1<D, Observable<? extends D>>() {
            @Override
            public Observable<? extends D> call(final D original) {
                // simulate with persistto master for now
                return Observable.defer(new Func0<Observable<ObserveResponse>>() {
                    @Override
                    public Observable<ObserveResponse> call() {
                        return core.send(new ObserveRequest(original.id(), original.cas(), true, (short) 0, bucket));
                    }
                }).repeat()
                    .skipWhile(new Func1<ObserveResponse, Boolean>() {
                        @Override
                        public Boolean call(ObserveResponse observeResponse) {
                            // also simulate with persistto master for now
                            System.err.println(observeResponse.observeStatus());
                            return observeResponse.observeStatus() != ObserveResponse.ObserveStatus.FOUND_PERSISTED;
                        }
                    }).take(1).map(new Func1<ObserveResponse, D>() {
                        @Override
                        public D call(ObserveResponse observeResponse) {
                            return original;
                        }
                    });
            }
        });
    }

@headinthebox
Copy link

Yup, that looks good.

object MainScala {
  def main(args: Array[String]): Unit = {
    def upsert(i: Int) = {
      println(s"upsert($i)")
      Observable.items(i)
    }

    var i = 0;
    def send() = {
      val result = AsyncSubject[Int]();
      Observable[Int](observer => {
        println(s"calling send() with $i")
        observer.onNext(i)
        i += 1
        observer.onCompleted()
      }).subscribe(result)
      result
    }
    def test(n: Int) = n < 5
    val zs = upsert(1).flatMap(x =>
      Observable.defer{ send() }.repeat.dropWhile(n => test(n))
        .take(1).map(_ => x)
    )
    zs.subscribe(z => println(z))
    readLine()
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment