Skip to content

Instantly share code, notes, and snippets.

@daschl
Last active November 22, 2016 12:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daschl/45b23b8d26bc3bcfd59abaf5cdd60d33 to your computer and use it in GitHub Desktop.
Save daschl/45b23b8d26bc3bcfd59abaf5cdd60d33 to your computer and use it in GitHub Desktop.
/*
* Copyright (c) 2016 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.couchbase.client.java.util;
import com.couchbase.client.deps.io.netty.util.ReferenceCounted;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.observers.Subscribers;
import rx.subjects.AsyncSubject;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* Defers the execution of a {@link rx.subjects.Subject} and in addition watches for early unsubscription and
* cleans up buffers if the content is {@link ReferenceCounted}.
*
* Implementation Details:
*
* This implementation is very similar to {@link Observable#defer(Func0)} in that it takes a hot observable like
* a subject and defers the execution of it until someone subscribes. The problem with vanilla defer is that if
* an early unsubscribe happens (like a downstream timeout firing) the message from the hot observable is not
* properly consumed anymore which can lead to buffer leaks if it contains a pooled resource.
*
* To mitigate this, another subscription is added to the hot observable which checks, at the time of item emission,
* that the subscription is still present. If it is not the buffers are proactively cleared out, making sure that
* no trace/leak is left behind.
*
* ♬ Wir hom so vü zum tuan
* ♬ Wir hudln und schurdln ummanond
* ♬ Müssen uns sputen des dauert vü zu lang
* ♬ Da hüft ka hupen so kummst a ned schneller dran
* ♬ Und a ka Fluchen lametier ned gemmas on
* -- from Skero - "Hudeln"
*
* @author Michael Nitschinger
* @since 2.3.6
*/
public class OnSubscribeDeferAndWatch<T> implements Observable.OnSubscribe<T> {
/**
* Defer a hot observable and clean its buffers if needed on early unsubscribe. It currently only works if you
* are deferring a {@link AsyncSubject}.
*
* @param observableFactory the factory of the hot observable.
* @return a deferred observable which handles cleanup of resources on early unsubscribe.
*/
public static <T> Observable<T> deferAndWatch(Func0<? extends Observable<? extends T>> observableFactory) {
return Observable.create(new OnSubscribeDeferAndWatch<T>(observableFactory));
}
private final Func0<? extends Observable<? extends T>> observableFactory;
private OnSubscribeDeferAndWatch(Func0<? extends Observable<? extends T>> observableFactory) {
this.observableFactory = observableFactory;
}
@Override
public void call(Subscriber<? super T> s) {
// Defer execution of the hot observable.
Observable<? extends T> o;
try {
o = observableFactory.call();
} catch (Throwable t) {
Exceptions.throwOrReport(t, s);
return;
}
// For now, make sure we only support AsyncSubjects to make issues explicit down the road.
if (!(o instanceof AsyncSubject)) {
Exceptions.throwOrReport(
new IllegalStateException("Only AsyncSubjects are allowed with DeferAndClean (is "
+ o.getClass().getSimpleName() + ")"), s);
return;
}
// Hook up the consumer subscription and store it in a reference
final AtomicReference<Subscription> sr = new AtomicReference<Subscription>();
final AtomicBoolean emitted = new AtomicBoolean(false);
sr.set(o.doOnNext(new Action1<T>() {
@Override
public void call(T t) {
emitted.set(true);
}
}).unsafeSubscribe(Subscribers.wrap(s)));
// Add the additional subscription to the hot observable which once an item is emitted
// will check if the original subscription is still present and if not it will release
// the buffer.
o.subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
// ignored on purpose
}
@Override
public void onError(Throwable e) {
// ignored on purpose
}
@Override
public void onNext(T t) {
if (t != null && !emitted.get() && sr.get().isUnsubscribed() && t instanceof ReferenceCounted) {
ReferenceCounted rc = (ReferenceCounted) t;
if (rc.refCnt() > 0) {
rc.release();
}
}
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment