Skip to content

Instantly share code, notes, and snippets.

@ousenko
Last active December 18, 2017 07:45
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ousenko/cd8dd0495b5c26045e90 to your computer and use it in GitHub Desktop.
Save ousenko/cd8dd0495b5c26045e90 to your computer and use it in GitHub Desktop.
Rx Wrapper around Bluetooth Low Energy scan process
import android.bluetooth.BluetoothAdapter;
import android.bluetooth.BluetoothDevice;
import android.text.TextUtils;
import rx.Observable;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
/**
Rx Wrapper around Bluetooth Low Energy scan process
*/
public class RxScanner {
final BluetoothAdapter adapter;
private LeCallback callback;
public RxScanner(BluetoothAdapter adapter) {
this.adapter = adapter;
}
/**
Scan observable, subscribe to start scan, unsubscribe to finish
*/
public Observable<DeviceEntry> observe() {
return Observable.create(new Observable.OnSubscribe<DeviceEntry>() {
@Override
public void call(final Subscriber<? super DeviceEntry> subscriber) {
callback = new LeCallback(subscriber);
adapter.startLeScan(callback);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
adapter.stopLeScan(callback);
callback.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
private static class LeCallback implements BluetoothAdapter.LeScanCallback {
final Subscriber<? super DeviceEntry> subscriber;
private LeCallback(Subscriber<? super DeviceEntry> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onLeScan(final BluetoothDevice device, int rssi, final byte[] scanRecord) {
if (isCompleted) {
return;
}
subscriber.onNext(new DeviceEntry(device, scanRecord));
}
private volatile boolean isCompleted = false;
public void onComplete() {
isCompleted = true;
subscriber.onCompleted();
}
}
}
@ousenko
Copy link
Author

ousenko commented Dec 18, 2017

Don't use this in production, it's rather a top-level sketch of how we could implement scanning in Rx way

The code above has following problems:

  • RxJava 1, obviously
  • in RxJava 1 you would be better off creating that kind of Observable via Observable#fromEmitter
  • not thread safe: Observable#onCall might operate on different thread, and it wants to update reference to RxScanner#callback
  • not thread safe: bluetooth adapter is not guaranteed to invoke callbacks on the same thread where adapter#startLeScan is called
  • anything else?

These problems could easily be solved by using RxJava 1's Observable#fromEmitter or RxJava 2's Flowable#create (remember the backpressure, onLeScan is crazy in some implementations), and by using locks / synchronization

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment