Skip to content

Instantly share code, notes, and snippets.

Last active Dec 18, 2017
What would you like to do?
Rx Wrapper around Bluetooth Low Energy scan process
import android.bluetooth.BluetoothAdapter;
import android.bluetooth.BluetoothDevice;
import android.text.TextUtils;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
Rx Wrapper around Bluetooth Low Energy scan process
public class RxScanner {
final BluetoothAdapter adapter;
private LeCallback callback;
public RxScanner(BluetoothAdapter adapter) {
this.adapter = adapter;
Scan observable, subscribe to start scan, unsubscribe to finish
public Observable<DeviceEntry> observe() {
return Observable.create(new Observable.OnSubscribe<DeviceEntry>() {
public void call(final Subscriber<? super DeviceEntry> subscriber) {
callback = new LeCallback(subscriber);
.doOnUnsubscribe(new Action0() {
public void call() {
private static class LeCallback implements BluetoothAdapter.LeScanCallback {
final Subscriber<? super DeviceEntry> subscriber;
private LeCallback(Subscriber<? super DeviceEntry> subscriber) {
this.subscriber = subscriber;
public void onLeScan(final BluetoothDevice device, int rssi, final byte[] scanRecord) {
if (isCompleted) {
subscriber.onNext(new DeviceEntry(device, scanRecord));
private volatile boolean isCompleted = false;
public void onComplete() {
isCompleted = true;

This comment has been minimized.

Copy link
Owner Author

@ousenko ousenko commented Dec 18, 2017

Don't use this in production, it's rather a top-level sketch of how we could implement scanning in Rx way

The code above has following problems:

  • RxJava 1, obviously
  • in RxJava 1 you would be better off creating that kind of Observable via Observable#fromEmitter
  • not thread safe: Observable#onCall might operate on different thread, and it wants to update reference to RxScanner#callback
  • not thread safe: bluetooth adapter is not guaranteed to invoke callbacks on the same thread where adapter#startLeScan is called
  • anything else?

These problems could easily be solved by using RxJava 1's Observable#fromEmitter or RxJava 2's Flowable#create (remember the backpressure, onLeScan is crazy in some implementations), and by using locks / synchronization

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment