Last active
November 3, 2016 18:02
-
-
Save NomenSvyat/93b2d05175c911124b7632f4d79dfc6d to your computer and use it in GitHub Desktop.
Work with SocketIO callbacks as rx observables. (Need to be redone, because of no info about event it listens to, so it's hard to log events)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RxSocketListener<T> implements Ack, Emitter.Listener { | |
private final Type type; | |
private final Gson gson; | |
@NonNull | |
private LinkedList<T> resultBuffer = new LinkedList<>(); | |
@NonNull | |
private PublishSubject<T> subject = PublishSubject.create(); | |
public RxSocketListener(Type type, Gson gson) { | |
this.type = type; | |
this.gson = gson; | |
} | |
@Override | |
public void call(Object... args) { | |
if (args == null || args.length == 0) { | |
onError(new IllegalArgumentException("Argument is null or empty")); | |
return; | |
} | |
if (!(args[0] instanceof String)) { | |
onError(new IllegalArgumentException("Argument is not a string")); | |
return; | |
} | |
String arg = (String) args[0]; | |
Timber.d("Socket event : %s", arg); | |
T result; | |
try { | |
result = gson.fromJson(arg, type); | |
} catch (JsonSyntaxException e) { | |
if (type.equals(arg.getClass())) { | |
result = (T) arg; | |
} else { | |
onError(e); | |
return; | |
} | |
} | |
emitResult(result); | |
} | |
private synchronized void emitResult(T result) { | |
if (!subject.hasObservers()) { | |
resultBuffer.addLast(result); | |
return; | |
} | |
emitFromBuffer(); | |
doEmitResult(result); | |
} | |
private void emitFromBuffer() { | |
while (resultBuffer.size() > 0) { | |
doEmitResult(resultBuffer.pop()); | |
} | |
} | |
private synchronized void doEmitResult(T result) { | |
if (result != null && subject.hasObservers()) { | |
subject.onNext(result); | |
} | |
} | |
private void onError(Exception exception) { | |
Timber.e(exception, "Exception on socket Ack callback"); | |
if (subject.hasObservers()) { | |
subject.onError(exception); | |
} | |
} | |
public synchronized Observable<T> observeResult() { | |
return subject.asObservable() | |
.doOnSubscribe(this::emitFromBuffer); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment