-
-
Save daschl/1b7db65cf5d73f6af264 to your computer and use it in GitHub Desktop.
// 1: do func1(); which returns an Observable<Res1> | |
// 2: once Res1 is onComplete (so done without error): | |
// 3a: do func2(); | |
// 3b: as long as the result does not match a given codition. | |
// 4: if done, return the original result from func1 | |
In sync it would more or less look like: | |
Res1 res1 = func1(); | |
done = false; | |
while(!done) { | |
if (func2() == condition) { | |
done = true; | |
} | |
} | |
... for more context: I store a document on the server and then need to poll the server for its state (replicated, persisted).. and once my needed state is satisfied I just return whatever got returned from the original store call. | |
return res1; |
Just updated my comment to add take(1)
to cancel the repeat
. Sorry that I forgot it.
Yea, @zwsing's solution looks close to what I'd do.
Hmm it looks like its just caching the value, but I actually need to re-run the command. so this is my current impl:
@Override
public <D extends Document<?>> Observable<D> upsert(final D document, PersistTo persistTo, ReplicateTo replicateTo) {
return upsert(document).flatMap(new Func1<D, Observable<? extends D>>() {
@Override
public Observable<? extends D> call(final D original) {
return core
.<ObserveResponse>send(new ObserveRequest(document.id(), document.cas(), true, (short) 0, bucket))
.repeat()
.skipWhile(new Func1<ObserveResponse, Boolean>() {
@Override
public Boolean call(ObserveResponse observeResponse) {
System.err.println(observeResponse.hashCode());
return true;
}
}).map(new Func1<ObserveResponse, D>() {
@Override
public D call(ObserveResponse observeResponse) {
return original;
}
});
}
});
}
But the response has the same hashcode and it never changes. This send
method really needs to be called on every new try.
@daschl in addition, you forget to use take(1)
.
Not sure what's going on actually, the code below works as I expect. Not sure why you send
is returning the same value.
upsert(1)
calling send() with 0
calling send() with 1
calling send() with 2
calling send() with 3
calling send() with 4
calling send() with 5
1
object MainScala {
def main(args: Array[String]): Unit = {
def upsert(i: Int) = {
println(s"upsert($i)")
Observable.items(i)
}
var i = 0;
def send() = Observable[Int](observer => {
println(s"calling send() with $i")
observer.onNext(i)
i += 1
observer.onCompleted()
})
def test(n: Int) = n < 5
val zs = upsert(1).flatMap(x =>
send().repeat.dropWhile(n => test(n))
.take(1).map(_ => x)
)
zs.subscribe(z => println(z))
readLine()
}
}
@headinthebox, @zsxwing thanks much :) .. My thought then was around maybe using the scheduler to schedule into a observable and then use this code so N responses get emitted from one observable.
Not sure if this is the right way :)
Ha! I think this works:
what do you think guys?
private <D extends Document<?>> Observable<D> observe(Observable<D> original, PersistTo persistTo, ReplicateTo replicateTo) {
return original.flatMap(new Func1<D, Observable<? extends D>>() {
@Override
public Observable<? extends D> call(final D original) {
// simulate with persistto master for now
return Observable.defer(new Func0<Observable<ObserveResponse>>() {
@Override
public Observable<ObserveResponse> call() {
return core.send(new ObserveRequest(original.id(), original.cas(), true, (short) 0, bucket));
}
}).repeat()
.skipWhile(new Func1<ObserveResponse, Boolean>() {
@Override
public Boolean call(ObserveResponse observeResponse) {
// also simulate with persistto master for now
System.err.println(observeResponse.observeStatus());
return observeResponse.observeStatus() != ObserveResponse.ObserveStatus.FOUND_PERSISTED;
}
}).take(1).map(new Func1<ObserveResponse, D>() {
@Override
public D call(ObserveResponse observeResponse) {
return original;
}
});
}
});
}
Yup, that looks good.
object MainScala {
def main(args: Array[String]): Unit = {
def upsert(i: Int) = {
println(s"upsert($i)")
Observable.items(i)
}
var i = 0;
def send() = {
val result = AsyncSubject[Int]();
Observable[Int](observer => {
println(s"calling send() with $i")
observer.onNext(i)
i += 1
observer.onCompleted()
}).subscribe(result)
result
}
def test(n: Int) = n < 5
val zs = upsert(1).flatMap(x =>
Observable.defer{ send() }.repeat.dropWhile(n => test(n))
.take(1).map(_ => x)
)
zs.subscribe(z => println(z))
readLine()
}
}
Looks you can use
repeat
: