Skip to content

Instantly share code, notes, and snippets.

@technoir42
Last active May 18, 2017 15:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save technoir42/5de78179b4a22d9006f2616f2188699c to your computer and use it in GitHub Desktop.
Save technoir42/5de78179b4a22d9006f2616f2188699c to your computer and use it in GitHub Desktop.
import android.app.Service;
import android.content.Intent;
import android.os.IBinder;
import android.support.annotation.Nullable;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Completable;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
/**
* Service that runs {@link Completable}s sequentially and stops itself if there are no more {@link Completable}s in queue.
*/
public abstract class RxIntentService extends Service {
private final AtomicInteger queueSize = new AtomicInteger();
private final ReplaySubject<Intent> subject = ReplaySubject.create();
private Subscription subscription;
@Nullable
@Override
public IBinder onBind(Intent intent) {
return null;
}
@Override
public void onCreate() {
super.onCreate();
subscription = subject
.doOnNext(intent -> queueSize.incrementAndGet())
.onBackpressureBuffer()
.flatMapCompletable(intent -> onHandleIntent(intent)
.onErrorComplete()
.doAfterTerminate(() -> {
if (queueSize.decrementAndGet() == 0) {
stopSelf();
}
}), false, 1)
.subscribeOn(Schedulers.newThread())
.subscribe();
}
@Override
public void onDestroy() {
super.onDestroy();
subscription.unsubscribe();
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
subject.onNext(intent);
return START_NOT_STICKY;
}
protected abstract Completable onHandleIntent(Intent intent);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment