Skip to content

Instantly share code, notes, and snippets.

@isqad
Last active June 11, 2019 10:51
Show Gist options
  • Save isqad/157d1b2148e61f010ec13777c3a481b1 to your computer and use it in GitHub Desktop.
Save isqad/157d1b2148e61f010ec13777c3a481b1 to your computer and use it in GitHub Desktop.
package ru.funnyhourse.troinichok.ws.messages;
public abstract class Event {
public Event() {}
}
package ru.funnyhourse.troinichok.ws.messages;
import okhttp3.WebSocket;
import okio.ByteString;
public class EventByteStringMessage extends EventConnected {
private final ByteString message;
public EventByteStringMessage(WebSocket webSocket, ByteString message) {
super(webSocket);
this.message = message;
}
public ByteString getMessage() {
return message;
}
}
package ru.funnyhourse.troinichok.ws.messages;
import okhttp3.WebSocket;
public class EventConnected extends Event {
private final WebSocket sender;
public EventConnected(WebSocket sender) {
this.sender = sender;
}
public WebSocket getSender() {
return sender;
}
}
package ru.funnyhourse.troinichok.ws.messages;
public class EventDisconnected extends Event {
private Throwable exception;
public EventDisconnected(Throwable exception) {
super();
this.exception = exception;
}
public Throwable exception() {
return exception;
}
}
package ru.funnyhourse.troinichok.ws.messages;
import okhttp3.WebSocket;
public class EventStringMessage extends EventConnected {
private final String message;
public EventStringMessage(WebSocket sender, String message) {
super(sender);
this.message = message;
}
public String getMessage() {
return message;
}
}
package ru.funnyhourse.troinichok.ws;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.disposables.Disposable;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import ru.funnyhourse.troinichok.model.data.server.UserApi;
import ru.funnyhourse.troinichok.ws.messages.Event;
import ru.funnyhourse.troinichok.ws.messages.EventByteStringMessage;
import ru.funnyhourse.troinichok.ws.messages.EventConnected;
import ru.funnyhourse.troinichok.ws.messages.EventDisconnected;
import ru.funnyhourse.troinichok.ws.messages.EventStringMessage;
public class WsManager {
private String wsUrl;
private OkHttpClient mOkHttpClient;
private Request mRequest;
private final static int NORMAL_CLOSE = 1000;
private final static String NORMAL_CLOSE_TIP = "normal close";
public WsManager(Builder builder) {
wsUrl = builder.wsUrl;
mOkHttpClient = builder.mOkHttpClient;
}
public Flowable<Event> observe() {
return Flowable.create(new FlowableOnSubscribe<Event>() {
@Override
public void subscribe(FlowableEmitter<Event> e) {
WebSocket ws = getClient().newWebSocket(getmRequest(), new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
e.onNext(new EventConnected(webSocket));
}
@Override
public void onMessage(WebSocket webSocket, String text) {
e.onNext(new EventStringMessage(webSocket, text));
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
e.onNext(new EventByteStringMessage(webSocket, bytes));
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
final ServerRequestedCloseException exception = new ServerRequestedCloseException(code, reason);
e.onNext(new EventDisconnected(exception));
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
final ServerRequestedCloseException exception = new ServerRequestedCloseException(code, reason);
e.onNext(new EventDisconnected(exception));
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
if (response != null) {
final ServerHttpError exception = new ServerHttpError(response);
e.onNext(new EventDisconnected(exception));
//e.onError(exception);
} else {
e.onNext(new EventDisconnected(t));
//e.onError(t);
}
}
});
e.setDisposable(new Disposable() {
private boolean isDisposed = false;
@Override
public void dispose() {
if (mOkHttpClient != null) {
mOkHttpClient.dispatcher().cancelAll();
}
isDisposed = ws.close(NORMAL_CLOSE, NORMAL_CLOSE_TIP);
e.onComplete();
}
@Override
public boolean isDisposed() {
return isDisposed;
}
});
}
}, BackpressureStrategy.BUFFER);
}
private OkHttpClient getClient() {
if (mOkHttpClient == null) {
mOkHttpClient = new OkHttpClient.Builder()
.retryOnConnectionFailure(true)
.build();
}
mOkHttpClient.dispatcher().cancelAll();
return mOkHttpClient;
}
private Request getmRequest() {
if (mRequest == null) {
mRequest = new Request.Builder()
.url(wsUrl)
.addHeader("Host", UserApi.API_HOST)
.build();
}
return mRequest;
}
public static final class Builder {
private String wsUrl;
private OkHttpClient mOkHttpClient;
public Builder wsUrl(String val) {
wsUrl = val;
return this;
}
public Builder client(OkHttpClient val) {
mOkHttpClient = val;
return this;
}
public WsManager build() {
return new WsManager(this);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment