Skip to content

Instantly share code, notes, and snippets.

@AliYusuf95
Last active March 22, 2024 09:42
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 9 You must be signed in to fork a gist
  • Save AliYusuf95/557af8be5f360c95fdf029795291eddb to your computer and use it in GitHub Desktop.
Save AliYusuf95/557af8be5f360c95fdf029795291eddb to your computer and use it in GitHub Desktop.
Websocket implementation using OkHttp3 with {event->data} message format to make your life easier.
public interface OnEventListener {
/**
* Invoked when new message received from websocket with {event, data} structure
*
* @param data Data string received
*/
void onMessage(String data);
}
public interface OnEventResponseListener {
/**
* Invoked when new message received from websocket with {event, data} structure
*
* @param event message event
* @param data data string received
*/
void onMessage(String event, String data);
}
public interface OnMessageListener {
/**
* Invoked when new message received from websocket
*
* @param data Data string received
*/
void onMessage(String data);
}
import okhttp3.Response;
public abstract class OnStateChangeListener {
/**
* Invoked when a web socket has been accepted by the remote peer and may begin transmitting
* messages.
*/
public void onOpen(Response response) {
}
/**
* Invoked when both peers have indicated that no more messages will be transmitted and the
* connection has been successfully released. No further calls to this listener will be made.
*/
public void onClosed(int code, String reason) {
}
/**
* Invoked when a web socket has been closed due to an error reading from or writing to the
* network. Both outgoing and incoming messages may have been lost.
*/
public void onFailure(Throwable t) {
}
/**
* Invoked when a web socket has been closed due to an error and reconnection attempt is started.
*/
public void onReconnect(int attemptsCount, long attemptDelay) {
}
/**
* Invoked when new socket connection status changed.
*
* @param status new socket status
*/
public void onChange(SocketState status) {
}
}
import android.os.Handler;
import android.os.Looper;
import android.support.annotation.NonNull;
import android.util.Log;
import org.json.JSONException;
import org.json.JSONObject;
import java.net.ProtocolException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okhttp3.internal.ws.RealWebSocket;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.ByteString;
/**
* Websocket class based on OkHttp3 with {event->data} message format to make your life easier.
*
* @author Ali Yusuf
* @since 3/13/17
*/
public class Socket {
private final static String TAG = Socket.class.getSimpleName();
private final static String CLOSE_REASON = "End of session";
private final static int MAX_COLLISION = 7;
public final static String EVENT_OPEN = "open";
public final static String EVENT_RECONNECT_ATTEMPT = "reconnecting";
public final static String EVENT_CLOSED = "closed";
/**
* Main socket states
*/
public enum State {
CLOSED, CLOSING, CONNECT_ERROR, RECONNECT_ATTEMPT, RECONNECTING, OPENING, OPEN
}
private static HttpLoggingInterceptor logging =
new HttpLoggingInterceptor()
.setLevel(BuildConfig.DEBUG ? HttpLoggingInterceptor.Level.HEADERS : HttpLoggingInterceptor.Level.NONE);
private static OkHttpClient.Builder httpClient =
new OkHttpClient.Builder()
.addInterceptor(logging);
public static class Builder {
private Request.Builder request;
private Builder(Request.Builder request) {
this.request = request;
}
public static Builder with(@NonNull String url) {
// Silently replace web socket URLs with HTTP URLs.
if (!url.regionMatches(true, 0, "ws:", 0, 3) && !url.regionMatches(true, 0, "wss:", 0, 4))
throw new IllegalArgumentException("web socket url must start with ws or wss, passed url is " + url);
return new Builder(new Request.Builder().url(url));
}
public Builder setPingInterval(long interval, @NonNull TimeUnit unit){
httpClient.pingInterval(interval, unit);
return this;
}
public Builder addHeader(@NonNull String name, @NonNull String value) {
request.addHeader(name, value);
return this;
}
public Socket build() {
return new Socket(request.build());
}
}
/**
* Websocket state
*/
private static State state;
/**
* Websocket main request
*/
private static Request request;
/**
* Websocket connection
*/
private static RealWebSocket realWebSocket;
/**
* Reconnection post delayed handler
*/
private static Handler delayedReconnection;
/**
* Websocket events listeners
*/
private static Map<String,OnEventListener> eventListener;
/**
* Websocket events new message listeners
*/
private static Map<String,OnEventResponseListener> eventResponseListener;
/**
* Message list tobe send onEvent open {@link State#OPEN} connection state
*/
private static Map<String,String> onOpenMessageQueue = new HashMap<>();
/**
* Websocket state change listener
*/
private static OnStateChangeListener onChangeStateListener;
/**
* Websocket new message listener
*/
private static OnMessageListener messageListener;
/**
* Number of reconnection attempts
*/
private static int reconnectionAttempts;
private static boolean skipOnFailure;
private Socket(Request request) {
Socket.request = request;
state = State.CLOSED;
eventListener = new HashMap<>();
eventResponseListener = new HashMap<>();
delayedReconnection = new Handler(Looper.getMainLooper());
skipOnFailure = false;
}
/**
* Start socket connection if i's not already started
*/
public Socket connect() {
if (httpClient == null) {
throw new IllegalStateException("Make sure to use Socket.Builder before using Socket#connect.");
}
if (realWebSocket == null) {
realWebSocket = (RealWebSocket) httpClient.build().newWebSocket(request, webSocketListener);
changeState(State.OPENING);
} else if (state == State.CLOSED) {
realWebSocket.connect(httpClient.build());
changeState(State.OPENING);
}
return this;
}
/**
* Set listener which fired every time message received with contained data.
*
* @param listener message on arrive listener
*/
public Socket onEvent(@NonNull String event, @NonNull OnEventListener listener){
eventListener.put(event,listener);
return this;
}
/**
* Set listener which fired every time message received with contained data.
*
* @param listener message on arrive listener
*/
public Socket onEventResponse(@NonNull String event, @NonNull OnEventResponseListener listener){
eventResponseListener.put(event,listener);
return this;
}
/**
* Send message in {event->data} format
*
* @param event event name that you want sent message to
* @param data message data in JSON format
* @return true if the message send/on socket send quest; false otherwise
*/
public boolean send(@NonNull String event, @NonNull String data){
try {
JSONObject text = new JSONObject();
text.put("event", event);
text.put("data", new JSONObject(data));
Log.v(TAG,"Try to send data "+text.toString());
return realWebSocket.send(text.toString());
} catch (JSONException e) {
Log.e(TAG,"Try to send data with wrong JSON format, data: "+data);
}
return false;
}
/**
* Set state listener which fired every time {@link Socket#state} changed.
*
* @param listener state change listener
*/
public Socket setOnChangeStateListener(@NonNull OnStateChangeListener listener) {
onChangeStateListener = listener;
return this;
}
/**
* Message listener will be called in any message received even if it's not
* in a {event -> data} format.
*
* @param listener message listener
*/
public Socket setMessageListener(@NonNull OnMessageListener listener) {
messageListener = listener;
return this;
}
public void removeEventListener(@NonNull String event) {
eventListener.remove(event);
onOpenMessageQueue.remove(event);
}
/**
* Clear all socket listeners in one line
*/
public void clearListeners() {
eventListener.clear();
messageListener = null;
onChangeStateListener = null;
}
/**
* Send normal close request to the host
*/
public void close() {
if (realWebSocket != null) {
realWebSocket.close(1000, CLOSE_REASON);
}
}
/**
* Send close request to the host
*/
public void close(int code, @NonNull String reason) {
if (realWebSocket != null) {
realWebSocket.close(code, reason);
}
}
/**
* Terminate the socket connection permanently
*/
public void terminate() {
skipOnFailure = true; // skip onFailure callback
if (realWebSocket != null) {
realWebSocket.cancel(); // close connection
realWebSocket = null; // clear socket object
}
}
/**
* Add message in a queue if the socket not open and send them
* if the socket opened
*
* @param event event name that you want sent message to
* @param data message data in JSON format
*/
public void sendOnOpen(@NonNull String event, @NonNull String data) {
if (state != State.OPEN)
onOpenMessageQueue.put(event,data);
else
send(event,data);
}
/**
* Retrieve current socket connection state {@link State}
*/
public State getState() {
return state;
}
/**
* Change current state and call listener method with new state
* {@link OnStateChangeListener#onChange(Socket, State)}
* @param newState new state
*/
private void changeState(State newState) {
state = newState;
if (onChangeStateListener != null) {
onChangeStateListener.onChange(Socket.this, state);
}
}
/**
* Try to reconnect to the websocket after delay time using <i>Exponential backoff</i> method.
* @see <a href="https://en.wikipedia.org/wiki/Exponential_backoff"></a>
*/
private void reconnect() {
if (state != State.CONNECT_ERROR) // connection not closed !!
return;
changeState(State.RECONNECT_ATTEMPT);
if (realWebSocket != null) {
// Cancel websocket connection
realWebSocket.cancel();
// Clear websocket object
realWebSocket = null;
}
if (eventListener.get(EVENT_RECONNECT_ATTEMPT) != null) {
eventListener.get(EVENT_RECONNECT_ATTEMPT).onMessage(Socket.this, EVENT_RECONNECT_ATTEMPT);
}
// Calculate delay time
int collision = reconnectionAttempts > MAX_COLLISION ? MAX_COLLISION : reconnectionAttempts;
long delayTime = Math.round((Math.pow(2, collision)-1)/2) * 1000;
// Remove any pending posts of callbacks
delayedReconnection.removeCallbacksAndMessages(null);
// Start new post delay
delayedReconnection.postDelayed(new Runnable() {
@Override
public void run() {
changeState(State.RECONNECTING);
reconnectionAttempts++; // Increment connections attempts
connect(); // Establish new connection
}
}, delayTime);
}
private WebSocketListener webSocketListener = new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
Log.v(TAG,"Socket has been opened successfully.");
// reset connections attempts counter
reconnectionAttempts = 0;
// fire open event listener
if (eventListener.get(EVENT_OPEN) != null) {
eventListener.get(EVENT_OPEN).onMessage(Socket.this, EVENT_OPEN);
}
// Send data in queue
for (String event : onOpenMessageQueue.keySet()) {
send(event, onOpenMessageQueue.get(event));
}
// clear queue
onOpenMessageQueue.clear();
changeState(State.OPEN);
}
/**
* Accept only Json data with format:
* <b> {"event":"event name","data":{some data ...}} </b>
*/
@Override
public void onMessage(WebSocket webSocket, String text) {
// print received message in log
Log.v(TAG, "New Message received "+text);
// call message listener
if (messageListener != null)
messageListener.onMessage(Socket.this, text);
try {
// Parse message text
JSONObject response = new JSONObject(text);
String event = response.getString("event");
JSONObject data = response.getJSONObject("data");
// call event listener with received data
if (eventResponseListener.get(event) != null) {
eventResponseListener.get(event).onMessage(Socket.this, event, data);
}
// call event listener
if (eventListener.get(event) != null) {
eventListener.get(event).onMessage(Socket.this, event);
}
} catch (JSONException e) {
// Message text not in JSON format or don't have {event}|{data} object
Log.e(TAG, "Unknown message format.");
}
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
// TODO: some action
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
Log.v(TAG,"Close request from server with reason '"+reason+"'");
changeState(State.CLOSING);
webSocket.close(1000,reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
Log.v(TAG,"Socket connection closed with reason '"+reason+"'");
changeState(State.CLOSED);
if (eventListener.get(EVENT_CLOSED) != null) {
eventListener.get(EVENT_CLOSED).onMessage(Socket.this, EVENT_CLOSED);
}
}
/**
* This method call if:
* - Fail to verify websocket GET request => Throwable {@link ProtocolException}
* - Can't establish websocket connection after upgrade GET request => response null, Throwable {@link Exception}
* - First GET request had been failed => response null, Throwable {@link java.io.IOException}
* - Fail to send Ping => response null, Throwable {@link java.io.IOException}
* - Fail to send data frame => response null, Throwable {@link java.io.IOException}
* - Fail to read data frame => response null, Throwable {@link java.io.IOException}
*/
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
if (!skipOnFailure) {
skipOnFailure = false; // reset flag
Log.v(TAG, "Socket connection fail, try to reconnect. (" + reconnectionAttempts + ")");
changeState(State.CONNECT_ERROR);
reconnect();
}
}
};
public abstract static class OnMessageListener {
public abstract void onMessage (String data);
/**
* Method called from socket to execute listener implemented in
* {@link #onMessage(String)} on main thread
*
* @param socket Socket that receive the message
* @param data Data string received
*/
private void onMessage (Socket socket, final String data) {
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
onMessage(data);
}
});
}
}
public abstract static class OnEventListener {
public abstract void onMessage (String event);
private void onMessage (Socket socket, final String event) {
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
onMessage(event);
}
});
}
}
public abstract static class OnEventResponseListener extends OnEventListener {
/**
* Method need to override in listener usage
*/
public abstract void onMessage (String event, String data);
/**
* Just override the inherited method
*/
@Override
public void onMessage(String event) {}
/**
* Method called from socket to execute listener implemented in
* {@link #onMessage(String, String)} on main thread
*
* @param socket Socket that receive the message
* @param event Message received event
* @param data Data received in the message
*/
private void onMessage (Socket socket, final String event, final JSONObject data) {
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
onMessage(event, data.toString());
onMessage(event);
}
});
}
}
public abstract static class OnStateChangeListener {
/**
* Method need to override in listener usage
*/
public abstract void onChange (State status);
/**
* Method called from socket to execute listener implemented in
* {@link #onChange(State)} on main thread
*
* @param socket Socket that receive the message
* @param status new status
*/
private void onChange (Socket socket, final State status){
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
onChange(status);
}
});
}
}
}
import android.support.annotation.NonNull;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.logging.HttpLoggingInterceptor;
/**
* Builder class to build websocket connection
*/
public class SocketBuilder {
private Request.Builder request;
private HttpLoggingInterceptor logging =
new HttpLoggingInterceptor()
.setLevel(HttpLoggingInterceptor.Level.HEADERS);
private OkHttpClient.Builder httpClient =
new OkHttpClient.Builder()
.addInterceptor(logging);
private SocketBuilder(Request.Builder request) {
this.request = request;
}
public static SocketBuilder with(@NonNull String url) {
// Silently replace web socket URLs with HTTP URLs.
if (!url.regionMatches(true, 0, "ws:", 0, 3) && !url.regionMatches(true, 0, "wss:", 0, 4))
throw new IllegalArgumentException("web socket url must start with ws or wss, passed url is " + url);
return new SocketBuilder(new Request.Builder().url(url));
}
public SocketBuilder setPingInterval(long interval, @NonNull TimeUnit unit){
httpClient.pingInterval(interval, unit);
return this;
}
public SocketBuilder addHeader(@NonNull String name, @NonNull String value) {
request.addHeader(name, value);
return this;
}
public Socket build() {
return Socket.init(httpClient, request.build());
}
}
import okhttp3.Response;
final class SocketEvents {
final public static class MessageEvent {
String message;
MessageEvent(String message) {
this.message = message;
}
}
final public static class BaseMessageEvent {
String name;
BaseMessageEvent(String name) {
this.name = name;
}
}
final public static class ResponseMessageEvent {
String name;
String data;
ResponseMessageEvent(String name, String data) {
this.name = name;
this.data = data;
}
}
final public static class OpenStatusEvent {
Response response;
OpenStatusEvent(Response response) {
this.response = response;
}
}
final public static class CloseStatusEvent {
int code;
String reason;
CloseStatusEvent(int code, String reason) {
this.code = code;
this.reason = reason;
}
}
final public static class FailureStatusEvent {
Throwable throwable;
FailureStatusEvent(Throwable throwable) {
this.throwable = throwable;
}
}
final public static class ReconnectStatusEvent {
int attemptsCount;
long attemptDelay;
ReconnectStatusEvent(int attemptsCount, long attemptDelay) {
this.attemptsCount = attemptsCount;
this.attemptDelay = attemptDelay;
}
}
final public static class ChangeStatusEvent {
SocketState status;
ChangeStatusEvent(SocketState status) {
this.status = status;
}
}
}
import android.support.annotation.NonNull;
/**
* Class to add listeners to specific activity/fragment
*/
public class SocketListenersBinder {
private Object mLifecycle;
private Socket mSocket;
SocketListenersBinder(Object lifecycle, Socket socket) {
mLifecycle = lifecycle;
mSocket = socket;
}
/**
* Set state listener which fired every time {@link Socket#mState} changed.
*
* @param listener state change listener
*/
public SocketListenersBinder addOnChangeStateListener(@NonNull OnStateChangeListener listener) {
// OpenStatusEvent
mSocket.addEventSubscription(mLifecycle, OpenStatusEvent.class,
e -> listener.onOpen(e.response), listener);
// CloseStatusEvent
mSocket.addEventSubscription(mLifecycle, CloseStatusEvent.class,
e -> listener.onClosed(e.code, e.reason), listener);
// FailureStatusEvent
mSocket.addEventSubscription(mLifecycle, FailureStatusEvent.class,
e -> listener.onFailure(e.throwable), listener);
// ReconnectStatusEvent
mSocket.addEventSubscription(mLifecycle, ReconnectStatusEvent.class,
e -> listener.onReconnect(e.attemptsCount, e.attemptDelay), listener);
// ChangeStatusEvent
mSocket.addEventSubscription(mLifecycle, ChangeStatusEvent.class,
e -> listener.onChange(e.status), listener);
return this;
}
/**
* Message listener will be called in any message received even if it's not
* in a {event -> data} format.
*
* @param listener message listener
*/
public SocketListenersBinder addOnMessageListener(@NonNull OnMessageListener listener) {
mSocket.addEventSubscription(mLifecycle, MessageEvent.class, e -> listener.onMessage(e.message), listener);
return this;
}
/**
* Set listener which fired every time message received with contained data.
*
* @param listener message on arrive listener
*/
public SocketListenersBinder addOnEventListener(@NonNull String event, @NonNull OnEventListener listener) {
mSocket.addEventSubscription(mLifecycle, BaseMessageEvent.class, e -> {
if (!event.equals(e.name)) return; // skip if not same event name
listener.onMessage(e.name);
}, listener);
return this;
}
/**
* Set listener which fired every time message received with contained data.
*
* @param listener message on arrive listener
*/
public SocketListenersBinder setOnEventResponseListener(@NonNull String event, @NonNull OnEventResponseListener listener) {
mSocket.addEventSubscription(mLifecycle, ResponseMessageEvent.class, e -> {
if (!event.equals(e.name)) return; // skip if not same event name
listener.onMessage(e.name, e.data);
}, listener);
return this;
}
/**
* Remove listener from being receive new calls.
*
* @param listener message on arrive listener
*/
public void removeListener(@NonNull OnStateChangeListener listener) {
// remove listeners
mSocket.removeEventSubscriptionListener(mLifecycle, OpenStatusEvent.class, listener);
mSocket.removeEventSubscriptionListener(mLifecycle, CloseStatusEvent.class, listener);
mSocket.removeEventSubscriptionListener(mLifecycle, FailureStatusEvent.class, listener);
mSocket.removeEventSubscriptionListener(mLifecycle, ReconnectStatusEvent.class, listener);
mSocket.removeEventSubscriptionListener(mLifecycle, ChangeStatusEvent.class, listener);
}
/**
* Remove listener from being receive new calls.
*
* @param listener listener to be deleted
*/
public void removeListener(@NonNull OnMessageListener listener) {
mSocket.removeEventSubscriptionListener(mLifecycle, MessageEvent.class, listener);
}
/**
* Remove listener from being receive new calls.
*
* @param listener listener to be deleted
*/
public void removeListener(@NonNull OnEventListener listener) {
mSocket.removeEventSubscriptionListener(mLifecycle, BaseMessageEvent.class, listener);
}
/**
* Remove listener from being receive new calls.
*
* @param listener listener to be deleted
*/
public void removeListener(@NonNull OnEventResponseListener listener) {
mSocket.removeEventSubscriptionListener(mLifecycle, ResponseMessageEvent.class, listener);
}
}
/*
* For logging I use `com.orhanobut:logger` Logger
*/
import android.os.Handler;
import android.os.Looper;
import android.support.annotation.NonNull;
import org.json.JSONException;
import org.json.JSONObject;
import java.net.ProtocolException;
import java.util.HashMap;
import java.util.Map;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okhttp3.internal.ws.RealWebSocket;
import okio.ByteString;
/**
* Websocket class based on OkHttp3 with {event->data} message format to make your life easier.
*
* @author Ali Yusuf
*/
public class Socket {
private final static String TAG = Socket.class.getSimpleName();
private final static String CLOSE_REASON = "End of session";
private final static int MAX_COLLISION = 7;
private static Socket mInstance = null;
/**
* Websocket state
*/
private SocketState mState;
/**
* Websocket main request
*/
private Request mRequest;
/**
* Websocket http client
*/
private OkHttpClient.Builder mHttpClient;
/**
* Websocket connection
*/
private RealWebSocket mRealWebSocket;
/**
* Reconnection post delayed handler
*/
private Handler mHandler;
/**
* Stores number of reconnecting attempts
*/
private int reconnectionAttempts;
/**
* Indicate if it's termination to stop reconnecting
*/
private boolean isForceTermination;
/**
* Socket event bus
*/
private PublishSubject<Object> eventBus = PublishSubject.create();
/**
* Map that's help to keep track with hole lifecycle,
* used to cancel all lifecycle subscriptions.
*
* lifecycle -> [events] map
*/
private Map<Object, CompositeDisposable> sSubscriptionsMap = new HashMap<>();
/**
* Map that's help to keep track with lifecycle subscriptions with corresponding
* event and listener. Used to cancel particular subscription or reset it.
*
* lifecycle -> {event -> {listener -> subscription}} map
*/
private Map<Object, Map<Class, Map<Object, Disposable>>> sListenerBinderMap = new HashMap<>();
Socket() {}
@NonNull
public static Socket getInstance() {
if(mInstance == null) {
throw new AssertionError("Make sure to use SocketBuilder before using Socket#getInstance.");
}
return mInstance;
}
static Socket init(OkHttpClient.Builder httpClient, Request request) {
mInstance = new Socket();
mInstance.mHttpClient = httpClient;
mInstance.mRequest = request;
mInstance.mState = SocketState.CLOSED;
mInstance.mHandler = new Handler(Looper.getMainLooper());
mInstance.isForceTermination = false;
return mInstance;
}
/**
* Start socket connection if it's not already started
*/
public void connect() {
if (mInstance.mHttpClient == null || mInstance.mRequest == null) {
throw new IllegalStateException("Make sure to use SocketBuilder before using Socket#connect.");
}
if (mRealWebSocket == null || mState == SocketState.CLOSED) {
mRealWebSocket = (RealWebSocket) mHttpClient.build().newWebSocket(mRequest, webSocketListener);
changeState(SocketState.OPENING);
}
}
/**
* Send message in {event->data} format
*
* @param event event name that you want sent message to
* @param data message data object
* @return true if the message send/on socket send quest; false otherwise
*/
public boolean send(@NonNull String event, @NonNull Object data){
return send(event, data.toString());
}
/**
* Send message in {event->data} format
*
* @param event event name that you want sent message to
* @param data message data in JSON format
* @return true if the message send/on socket send quest; false otherwise
*/
public boolean send(@NonNull String event, @NonNull String data){
if (mRealWebSocket != null && mState == SocketState.OPEN) {
try {
JSONObject text = new JSONObject();
text.put("event", event);
text.put("data", new JSONObject(data));
Logger.t(TAG).v("Try to send data: \n%s", text.toString());
return mRealWebSocket.send(text.toString());
} catch (JSONException e) {
Logger.t(TAG).e(e, "Try to send data with wrong JSON format");
}
}
return false;
}
/**
* Set global listener which fired every time message received with contained data.
*
* @param listener message on arrive listener
*/
public Socket addOnEventListener(@NonNull String event, @NonNull OnEventListener listener){
with(this).addOnEventListener(event, listener);
return this;
}
/**
* Set global listener which fired every time message received with contained data.
*
* @param listener message on arrive listener
*/
public Socket addOnEventResponseListener(@NonNull String event, @NonNull OnEventResponseListener listener){
with(this).setOnEventResponseListener(event, listener);
return this;
}
/**
* Set global state listener which fired every time {@link Socket#mState} changed.
*
* @param listener state change listener
*/
public Socket addOnChangeStateListener(@NonNull OnStateChangeListener listener) {
with(this).addOnChangeStateListener(listener);
return this;
}
/**
* Set global message listener which will be called in any message received even if it's not
* in a {event -> data} format.
*
* @param listener message listener
*/
public Socket addMessageListener(@NonNull OnMessageListener listener) {
with(this).addOnMessageListener(listener);
return this;
}
/**
* Send normal close request to the host
*/
public void close() {
if (mRealWebSocket != null) {
mRealWebSocket.close(1000, CLOSE_REASON);
}
}
/**
* Send close request to the host
*/
public void close(int code, @NonNull String reason) {
if (mRealWebSocket != null) {
mRealWebSocket.close(code, reason);
}
}
/**
* Terminate the socket connection permanently
*/
public void terminate() {
isForceTermination = true; // skip onFailure callback
if (mRealWebSocket != null) {
mRealWebSocket.cancel(); // close connection
mRealWebSocket = null; // clear socket object
}
changeState(SocketState.CLOSED);
postEvent(new SocketEvents.CloseStatusEvent(1006, ""));
}
/**
* Retrieve current socket connection state {@link SocketState}
*/
public SocketState getState() {
return mState;
}
/**
* Change current state and call listener method with new state
* {@link OnStateChangeListener#onChange(SocketState)}
* @param newState new state
*/
private void changeState(SocketState newState) {
mState = newState;
postEvent(new SocketEvents.ChangeStatusEvent(newState));
}
/**
* Try to reconnect to the websocket after delay time using <i>Exponential backoff</i> method.
* @see <a href="https://en.wikipedia.org/wiki/Exponential_backoff"></a>
*/
private void reconnect() {
if (mState != SocketState.CONNECT_ERROR) // connection not closed !!
return;
changeState(SocketState.RECONNECT_ATTEMPT);
if (mRealWebSocket != null) {
// Cancel websocket connection
mRealWebSocket.cancel();
// Clear websocket object
mRealWebSocket = null;
}
// Calculate delay time
int collision = reconnectionAttempts > MAX_COLLISION ? MAX_COLLISION : reconnectionAttempts;
long delayTime = Math.round((Math.pow(2, collision)-1)/2) * 1000;
postEvent(new SocketEvents.ReconnectStatusEvent(reconnectionAttempts + 1, delayTime));
// Remove any pending posts of callbacks
mHandler.removeCallbacksAndMessages(null);
// Start new post delay
mHandler.postDelayed(() -> {
changeState(SocketState.RECONNECTING);
reconnectionAttempts++; // Increment connections attempts
connect(); // Establish new connection
}, delayTime);
}
private WebSocketListener webSocketListener = new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
Logger.t(TAG).v("Socket has been opened successfully.");
// reset connections attempts counter
reconnectionAttempts = 0;
// fire open event listener
changeState(SocketState.OPEN);
postEvent(new SocketEvents.OpenStatusEvent(response));
}
/**
* Accept only Json data with format:
* <b> {"event":"event name","data":{some data ...}} </b>
*/
@Override
public void onMessage(WebSocket webSocket, String text) {
// print received message in log
Logger.t(TAG).v( "New Message received \n%s", text);
// call message listener
postEvent(new SocketEvents.BaseMessageEvent(text));
try {
// Parse message text
JSONObject response = new JSONObject(text);
String event = response.getString("event");
JSONObject data = response.getJSONObject("data");
// call event listener with received data
postEvent(new SocketEvents.ResponseMessageEvent(event, data.toString()));
// call event listener
postEvent(new SocketEvents.BaseMessageEvent(event));
} catch (JSONException e) {
// Message text not in JSON format or don't have {event}|{data} object
Logger.t(TAG).e(e,"Unknown message format.");
}
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
// TODO: some action
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
Logger.t(TAG).v("Close request from server with reason '%s'", reason);
changeState(SocketState.CLOSING);
webSocket.close(1000,reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
Logger.t(TAG).v("Close request from server with reason '%s'", reason);
changeState(SocketState.CLOSED);
postEvent(new SocketEvents.CloseStatusEvent(code, reason));
}
/**
* This method call if:
* - Fail to verify websocket GET request => Throwable {@link ProtocolException}
* - Can't establish websocket connection after upgrade GET request => response null, Throwable {@link Exception}
* - First GET request had been failed => response null, Throwable {@link java.io.IOException}
* - Fail to send Ping => response null, Throwable {@link java.io.IOException}
* - Fail to send data frame => response null, Throwable {@link java.io.IOException}
* - Fail to read data frame => response null, Throwable {@link java.io.IOException}
*/
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
if (!isForceTermination) {
isForceTermination = false; // reset flag
Logger.t(TAG).v( "Socket connection fail, try to reconnect. (" + reconnectionAttempts + ")");
changeState(SocketState.CONNECT_ERROR);
reconnect();
}
postEvent(new SocketEvents.FailureStatusEvent(t));
}
};
/**
* State subscription this lifecycle to socket events and listen for updates on that event.
*
* Note: Make sure to call {@link Socket#unsubscribe(Object)} to avoid memory leaks.
*/
public SocketListenersBinder with(Object lifecycle){
return new SocketListenersBinder(lifecycle, this);
}
/**
* Post an event for all subscribers of that event.
*/
private void postEvent(@NonNull Object event) {
if (eventBus.hasObservers()){
eventBus.onNext(event);
}
}
/**
* Unregisters this object from the listeners bus, removing all subscriptions.
* This should be called when the object is going to go out of memory.
*/
public void unsubscribe(Object lifecycle){
CompositeDisposable compositeSubscription = sSubscriptionsMap.remove(lifecycle);
if (compositeSubscription != null) {
compositeSubscription.dispose();
// clear lifecycle subscriptions of event
sListenerBinderMap.remove(lifecycle);
}
}
/**
* Get the CompositeDisposable or create it if it's not already in memory.
*/
@NonNull
private CompositeDisposable getCompositeSubscription(@NonNull Object object) {
CompositeDisposable compositeSubscription = sSubscriptionsMap.get(object);
if (compositeSubscription == null) {
compositeSubscription = new CompositeDisposable();
sSubscriptionsMap.put(object, compositeSubscription);
}
return compositeSubscription;
}
/**
* Get the event -> disposable map of the specific lifecycle.
*/
private Map<Class, Map<Object, Disposable>> getListenerBinderMap(Object lifecycle) {
Map<Class, Map<Object, Disposable>> disposableMap = sListenerBinderMap.get(lifecycle);
if (disposableMap == null) {
disposableMap = new HashMap<>();
sListenerBinderMap.put(lifecycle, disposableMap);
}
return disposableMap;
}
/**
* Add event subscription to the specified lifecycle and listen for updates on that event,
* each listener subscription must be unique one each lifecycle, event.
* Old subscription of same listener {@code listener} will be disposed if exist.
*/
<T> void addEventSubscription(Object lifecycle, Class<T> eventClass, @NonNull Consumer<T> consumer, Object listener) {
Map<Object, Disposable> disposableMap = getListenerBinderMap(lifecycle).get(eventClass);
if (disposableMap == null) {
disposableMap = new HashMap<>();
} else {
// remove old subscription if exist
removeEventSubscriptionListener(lifecycle, eventClass, listener);
}
// add event subscription to the bus event
Disposable disposable = eventBus.filter(o -> (o != null)) // Filter out null objects, better safe than sorry
.filter(eventClass::isInstance)
.cast(eventClass) // Cast it for easier usage
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
getCompositeSubscription(lifecycle).add(disposable);
// update lifecycle subscriptions
disposableMap.put(listener, disposable);
getListenerBinderMap(lifecycle).put(eventClass, disposableMap);
}
/**
* Remove all event subscriptions of the specified lifecycle.
*/
<T> void removeEventSubscriptions(Object lifecycle, Class<T> eventClass) {
// clear lifecycle subscriptions of event
Map<Object, Disposable> disposableMap = sListenerBinderMap.get(lifecycle).remove(eventClass);
if (disposableMap != null) {
for (Disposable disposable : disposableMap.values()) {
if (disposable != null) {
getCompositeSubscription(lifecycle).remove(disposable); // stop subscription
}
}
}
}
/**
* Remove event listener subscription of the specified lifecycle.
*/
<T> void removeEventSubscriptionListener(Object lifecycle, Class<T> eventClass, Object listener) {
// get subscription
Map<Object, Disposable> disposableMap = sListenerBinderMap.get(lifecycle).get(eventClass);
if (disposableMap != null) {
Disposable disposable = disposableMap.remove(listener);
if (disposable != null) {
getCompositeSubscription(lifecycle).remove(disposable); // stop subscription
}
}
}
}
/**
* Main socket connection states
*/
public enum SocketState {
CLOSED, CLOSING, CONNECT_ERROR, RECONNECT_ATTEMPT, RECONNECTING, OPENING, OPEN
}
@mcMineyC
Copy link

mcMineyC commented Jul 2, 2023

@AliYusuf95 THANK YOU SO MUCH BRO!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! You have no idea the amount of time this saved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment