Skip to content

Instantly share code, notes, and snippets.

@NomenSvyat
Last active November 3, 2016 18:02
Show Gist options
  • Save NomenSvyat/93b2d05175c911124b7632f4d79dfc6d to your computer and use it in GitHub Desktop.
Save NomenSvyat/93b2d05175c911124b7632f4d79dfc6d to your computer and use it in GitHub Desktop.
Work with SocketIO callbacks as rx observables. (Need to be redone, because of no info about event it listens to, so it's hard to log events)
public class RxSocketListener<T> implements Ack, Emitter.Listener {
private final Type type;
private final Gson gson;
@NonNull
private LinkedList<T> resultBuffer = new LinkedList<>();
@NonNull
private PublishSubject<T> subject = PublishSubject.create();
public RxSocketListener(Type type, Gson gson) {
this.type = type;
this.gson = gson;
}
@Override
public void call(Object... args) {
if (args == null || args.length == 0) {
onError(new IllegalArgumentException("Argument is null or empty"));
return;
}
if (!(args[0] instanceof String)) {
onError(new IllegalArgumentException("Argument is not a string"));
return;
}
String arg = (String) args[0];
Timber.d("Socket event : %s", arg);
T result;
try {
result = gson.fromJson(arg, type);
} catch (JsonSyntaxException e) {
if (type.equals(arg.getClass())) {
result = (T) arg;
} else {
onError(e);
return;
}
}
emitResult(result);
}
private synchronized void emitResult(T result) {
if (!subject.hasObservers()) {
resultBuffer.addLast(result);
return;
}
emitFromBuffer();
doEmitResult(result);
}
private void emitFromBuffer() {
while (resultBuffer.size() > 0) {
doEmitResult(resultBuffer.pop());
}
}
private synchronized void doEmitResult(T result) {
if (result != null && subject.hasObservers()) {
subject.onNext(result);
}
}
private void onError(Exception exception) {
Timber.e(exception, "Exception on socket Ack callback");
if (subject.hasObservers()) {
subject.onError(exception);
}
}
public synchronized Observable<T> observeResult() {
return subject.asObservable()
.doOnSubscribe(this::emitFromBuffer);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment