Skip to content

Instantly share code, notes, and snippets.

@AliYusuf95
Last active March 22, 2024 09:42
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 9 You must be signed in to fork a gist
  • Save AliYusuf95/557af8be5f360c95fdf029795291eddb to your computer and use it in GitHub Desktop.
Save AliYusuf95/557af8be5f360c95fdf029795291eddb to your computer and use it in GitHub Desktop.
Websocket implementation using OkHttp3 with {event->data} message format to make your life easier.
public interface OnEventListener {
/**
* Invoked when new message received from websocket with {event, data} structure
*
* @param data Data string received
*/
void onMessage(String data);
}
public interface OnEventResponseListener {
/**
* Invoked when new message received from websocket with {event, data} structure
*
* @param event message event
* @param data data string received
*/
void onMessage(String event, String data);
}
public interface OnMessageListener {
/**
* Invoked when new message received from websocket
*
* @param data Data string received
*/
void onMessage(String data);
}
import okhttp3.Response;
public abstract class OnStateChangeListener {
/**
* Invoked when a web socket has been accepted by the remote peer and may begin transmitting
* messages.
*/
public void onOpen(Response response) {
}
/**
* Invoked when both peers have indicated that no more messages will be transmitted and the
* connection has been successfully released. No further calls to this listener will be made.
*/
public void onClosed(int code, String reason) {
}
/**
* Invoked when a web socket has been closed due to an error reading from or writing to the
* network. Both outgoing and incoming messages may have been lost.
*/
public void onFailure(Throwable t) {
}
/**
* Invoked when a web socket has been closed due to an error and reconnection attempt is started.
*/
public void onReconnect(int attemptsCount, long attemptDelay) {
}
/**
* Invoked when new socket connection status changed.
*
* @param status new socket status
*/
public void onChange(SocketState status) {
}
}
import android.os.Handler;
import android.os.Looper;
import android.support.annotation.NonNull;
import android.util.Log;
import org.json.JSONException;
import org.json.JSONObject;
import java.net.ProtocolException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okhttp3.internal.ws.RealWebSocket;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.ByteString;
/**
* Websocket class based on OkHttp3 with {event->data} message format to make your life easier.
*
* @author Ali Yusuf
* @since 3/13/17
*/
public class Socket {
private final static String TAG = Socket.class.getSimpleName();
private final static String CLOSE_REASON = "End of session";
private final static int MAX_COLLISION = 7;
public final static String EVENT_OPEN = "open";
public final static String EVENT_RECONNECT_ATTEMPT = "reconnecting";
public final static String EVENT_CLOSED = "closed";
/**
* Main socket states
*/
public enum State {
CLOSED, CLOSING, CONNECT_ERROR, RECONNECT_ATTEMPT, RECONNECTING, OPENING, OPEN
}
private static HttpLoggingInterceptor logging =
new HttpLoggingInterceptor()
.setLevel(BuildConfig.DEBUG ? HttpLoggingInterceptor.Level.HEADERS : HttpLoggingInterceptor.Level.NONE);
private static OkHttpClient.Builder httpClient =
new OkHttpClient.Builder()
.addInterceptor(logging);
public static class Builder {
private Request.Builder request;
private Builder(Request.Builder request) {
this.request = request;
}
public static Builder with(@NonNull String url) {
// Silently replace web socket URLs with HTTP URLs.
if (!url.regionMatches(true, 0, "ws:", 0, 3) && !url.regionMatches(true, 0, "wss:", 0, 4))
throw new IllegalArgumentException("web socket url must start with ws or wss, passed url is " + url);
return new Builder(new Request.Builder().url(url));
}
public Builder setPingInterval(long interval, @NonNull TimeUnit unit){
httpClient.pingInterval(interval, unit);
return this;
}
public Builder addHeader(@NonNull String name, @NonNull String value) {
request.addHeader(name, value);
return this;
}
public Socket build() {
return new Socket(request.build());
}
}
/**
* Websocket state
*/
private static State state;
/**
* Websocket main request
*/
private static Request request;
/**
* Websocket connection
*/
private static RealWebSocket realWebSocket;
/**
* Reconnection post delayed handler
*/
private static Handler delayedReconnection;
/**
* Websocket events listeners
*/
private static Map<String,OnEventListener> eventListener;
/**
* Websocket events new message listeners
*/
private static Map<String,OnEventResponseListener> eventResponseListener;
/**
* Message list tobe send onEvent open {@link State#OPEN} connection state
*/
private static Map<String,String> onOpenMessageQueue = new HashMap<>();
/**
* Websocket state change listener
*/
private static OnStateChangeListener onChangeStateListener;
/**
* Websocket new message listener
*/
private static OnMessageListener messageListener;
/**
* Number of reconnection attempts
*/
private static int reconnectionAttempts;
private static boolean skipOnFailure;
private Socket(Request request) {
Socket.request = request;
state = State.CLOSED;
eventListener = new HashMap<>();
eventResponseListener = new HashMap<>();
delayedReconnection = new Handler(Looper.getMainLooper());
skipOnFailure = false;
}
/**
* Start socket connection if i's not already started
*/
public Socket connect() {
if (httpClient == null) {
throw new IllegalStateException("Make sure to use Socket.Builder before using Socket#connect.");
}
if (realWebSocket == null) {
realWebSocket = (RealWebSocket) httpClient.build().newWebSocket(request, webSocketListener);
changeState(State.OPENING);
} else if (state == State.CLOSED) {
realWebSocket.connect(httpClient.build());
changeState(State.OPENING);
}
return this;
}
/**
* Set listener which fired every time message received with contained data.
*
* @param listener message on arrive listener
*/
public Socket onEvent(@NonNull String event, @NonNull OnEventListener listener){
eventListener.put(event,listener);
return this;
}
/**
* Set listener which fired every time message received with contained data.
*
* @param listener message on arrive listener
*/
public Socket onEventResponse(@NonNull String event, @NonNull OnEventResponseListener listener){
eventResponseListener.put(event,listener);
return this;
}
/**
* Send message in {event->data} format
*
* @param event event name that you want sent message to
* @param data message data in JSON format
* @return true if the message send/on socket send quest; false otherwise
*/
public boolean send(@NonNull String event, @NonNull String data){
try {
JSONObject text = new JSONObject();
text.put("event", event);
text.put("data", new JSONObject(data));
Log.v(TAG,"Try to send data "+text.toString());
return realWebSocket.send(text.toString());
} catch (JSONException e) {
Log.e(TAG,"Try to send data with wrong JSON format, data: "+data);
}
return false;
}
/**
* Set state listener which fired every time {@link Socket#state} changed.
*
* @param listener state change listener
*/
public Socket setOnChangeStateListener(@NonNull OnStateChangeListener listener) {
onChangeStateListener = listener;
return this;
}
/**
* Message listener will be called in any message received even if it's not
* in a {event -> data} format.
*
* @param listener message listener
*/
public Socket setMessageListener(@NonNull OnMessageListener listener) {
messageListener = listener;
return this;
}
public void removeEventListener(@NonNull String event) {
eventListener.remove(event);
onOpenMessageQueue.remove(event);
}
/**
* Clear all socket listeners in one line
*/
public void clearListeners() {
eventListener.clear();
messageListener = null;
onChangeStateListener = null;
}
/**
* Send normal close request to the host
*/
public void close() {
if (realWebSocket != null) {
realWebSocket.close(1000, CLOSE_REASON);
}
}
/**
* Send close request to the host
*/
public void close(int code, @NonNull String reason) {
if (realWebSocket != null) {
realWebSocket.close(code, reason);
}
}
/**
* Terminate the socket connection permanently
*/
public void terminate() {
skipOnFailure = true; // skip onFailure callback
if (realWebSocket != null) {
realWebSocket.cancel(); // close connection
realWebSocket = null; // clear socket object
}
}
/**
* Add message in a queue if the socket not open and send them
* if the socket opened
*
* @param event event name that you want sent message to
* @param data message data in JSON format
*/
public void sendOnOpen(@NonNull String event, @NonNull String data) {
if (state != State.OPEN)
onOpenMessageQueue.put(event,data);
else
send(event,data);
}
/**
* Retrieve current socket connection state {@link State}
*/
public State getState() {
return state;
}
/**
* Change current state and call listener method with new state
* {@link OnStateChangeListener#onChange(Socket, State)}
* @param newState new state
*/
private void changeState(State newState) {
state = newState;
if (onChangeStateListener != null) {
onChangeStateListener.onChange(Socket.this, state);
}
}
/**
* Try to reconnect to the websocket after delay time using <i>Exponential backoff</i> method.
* @see <a href="https://en.wikipedia.org/wiki/Exponential_backoff"></a>
*/
private void reconnect() {
if (state != State.CONNECT_ERROR) // connection not closed !!
return;
changeState(State.RECONNECT_ATTEMPT);
if (realWebSocket != null) {
// Cancel websocket connection
realWebSocket.cancel();
// Clear websocket object
realWebSocket = null;
}
if (eventListener.get(EVENT_RECONNECT_ATTEMPT) != null) {
eventListener.get(EVENT_RECONNECT_ATTEMPT).onMessage(Socket.this, EVENT_RECONNECT_ATTEMPT);
}
// Calculate delay time
int collision = reconnectionAttempts > MAX_COLLISION ? MAX_COLLISION : reconnectionAttempts;
long delayTime = Math.round((Math.pow(2, collision)-1)/2) * 1000;
// Remove any pending posts of callbacks
delayedReconnection.removeCallbacksAndMessages(null);
// Start new post delay
delayedReconnection.postDelayed(new Runnable() {
@Override
public void run() {
changeState(State.RECONNECTING);
reconnectionAttempts++; // Increment connections attempts
connect(); // Establish new connection
}
}, delayTime);
}
private WebSocketListener webSocketListener = new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
Log.v(TAG,"Socket has been opened successfully.");
// reset connections attempts counter
reconnectionAttempts = 0;
// fire open event listener
if (eventListener.get(EVENT_OPEN) != null) {
eventListener.get(EVENT_OPEN).onMessage(Socket.this, EVENT_OPEN);
}
// Send data in queue
for (String event : onOpenMessageQueue.keySet()) {
send(event, onOpenMessageQueue.get(event));
}
// clear queue
onOpenMessageQueue.clear();
changeState(State.OPEN);
}
/**
* Accept only Json data with format:
* <b> {"event":"event name","data":{some data ...}} </b>
*/
@Override
public void onMessage(WebSocket webSocket, String text) {
// print received message in log
Log.v(TAG, "New Message received "+text);
// call message listener
if (messageListener != null)
messageListener.onMessage(Socket.this, text);
try {
// Parse message text
JSONObject response = new JSONObject(text);
String event = response.getString("event");
JSONObject data = response.getJSONObject("data");
// call event listener with received data
if (eventResponseListener.get(event) != null) {
eventResponseListener.get(event).onMessage(Socket.this, event, data);
}
// call event listener
if (eventListener.get(event) != null) {
eventListener.get(event).onMessage(Socket.this, event);
}
} catch (JSONException e) {
// Message text not in JSON format or don't have {event}|{data} object
Log.e(TAG, "Unknown message format.");
}
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
// TODO: some action
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
Log.v(TAG,"Close request from server with reason '"+reason+"'");
changeState(State.CLOSING);
webSocket.close(1000,reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
Log.v(TAG,"Socket connection closed with reason '"+reason+"'");
changeState(State.CLOSED);
if (eventListener.get(EVENT_CLOSED) != null) {
eventListener.get(EVENT_CLOSED).onMessage(Socket.this, EVENT_CLOSED);
}
}
/**
* This method call if:
* - Fail to verify websocket GET request => Throwable {@link ProtocolException}
* - Can't establish websocket connection after upgrade GET request => response null, Throwable {@link Exception}
* - First GET request had been failed => response null, Throwable {@link java.io.IOException}
* - Fail to send Ping => response null, Throwable {@link java.io.IOException}
* - Fail to send data frame => response null, Throwable {@link java.io.IOException}
* - Fail to read data frame => response null, Throwable {@link java.io.IOException}
*/
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
if (!skipOnFailure) {
skipOnFailure = false; // reset flag
Log.v(TAG, "Socket connection fail, try to reconnect. (" + reconnectionAttempts + ")");
changeState(State.CONNECT_ERROR);
reconnect();
}
}
};
public abstract static class OnMessageListener {
public abstract void onMessage (String data);
/**
* Method called from socket to execute listener implemented in
* {@link #onMessage(String)} on main thread
*
* @param socket Socket that receive the message
* @param data Data string received
*/
private void onMessage (Socket socket, final String data) {
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
onMessage(data);
}
});
}
}
public abstract static class OnEventListener {
public abstract void onMessage (String event);
private void onMessage (Socket socket, final String event) {
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
onMessage(event);
}
});
}
}
public abstract static class OnEventResponseListener extends OnEventListener {
/**
* Method need to override in listener usage
*/
public abstract void onMessage (String event, String data);
/**
* Just override the inherited method
*/
@Override
public void onMessage(String event) {}
/**
* Method called from socket to execute listener implemented in
* {@link #onMessage(String, String)} on main thread
*
* @param socket Socket that receive the message
* @param event Message received event
* @param data Data received in the message
*/
private void onMessage (Socket socket, final String event, final JSONObject data) {
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
onMessage(event, data.toString());
onMessage(event);
}
});
}
}
public abstract static class OnStateChangeListener {
/**
* Method need to override in listener usage
*/
public abstract void onChange (State status);
/**
* Method called from socket to execute listener implemented in
* {@link #onChange(State)} on main thread
*
* @param socket Socket that receive the message
* @param status new status
*/
private void onChange (Socket socket, final State status){
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
onChange(status);
}
});
}
}
}
import android.support.annotation.NonNull;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.logging.HttpLoggingInterceptor;
/**
* Builder class to build websocket connection
*/
public class SocketBuilder {
private Request.Builder request;
private HttpLoggingInterceptor logging =
new HttpLoggingInterceptor()
.setLevel(HttpLoggingInterceptor.Level.HEADERS);
private OkHttpClient.Builder httpClient =
new OkHttpClient.Builder()
.addInterceptor(logging);
private SocketBuilder(Request.Builder request) {
this.request = request;
}
public static SocketBuilder with(@NonNull String url) {
// Silently replace web socket URLs with HTTP URLs.
if (!url.regionMatches(true, 0, "ws:", 0, 3) && !url.regionMatches(true, 0, "wss:", 0, 4))
throw new IllegalArgumentException("web socket url must start with ws or wss, passed url is " + url);
return new SocketBuilder(new Request.Builder().url(url));
}
public SocketBuilder setPingInterval(long interval, @NonNull TimeUnit unit){
httpClient.pingInterval(interval, unit);
return this;
}
public SocketBuilder addHeader(@NonNull String name, @NonNull String value) {
request.addHeader(name, value);
return this;
}
public Socket build() {
return Socket.init(httpClient, request.build());
}
}
import okhttp3.Response;
final class SocketEvents {
final public static class MessageEvent {
String message;
MessageEvent(String message) {
this.message = message;
}
}
final public static class BaseMessageEvent {
String name;
BaseMessageEvent(String name) {
this.name = name;
}
}
final public static class ResponseMessageEvent {
String name;
String data;
ResponseMessageEvent(String name, String data) {
this.name = name;
this.data = data;
}
}
final public static class OpenStatusEvent {
Response response;
OpenStatusEvent(Response response) {
this.response = response;
}
}
final public static class CloseStatusEvent {
int code;
String reason;
CloseStatusEvent(int code, String reason) {
this.code = code;
this.reason = reason;
}
}
final public static class FailureStatusEvent {
Throwable throwable;
FailureStatusEvent(Throwable throwable) {
this.throwable = throwable;
}
}
final public static class ReconnectStatusEvent {
int attemptsCount;
long attemptDelay;
ReconnectStatusEvent(int attemptsCount, long attemptDelay) {
this.attemptsCount = attemptsCount;
this.attemptDelay = attemptDelay;
}
}
final public static class ChangeStatusEvent {
SocketState status;
ChangeStatusEvent(SocketState status) {
this.status = status;
}
}
}
import android.support.annotation.NonNull;
/**
* Class to add listeners to specific activity/fragment
*/
public class SocketListenersBinder {
private Object mLifecycle;
private Socket mSocket;
SocketListenersBinder(Object lifecycle, Socket socket) {
mLifecycle = lifecycle;
mSocket = socket;
}
/**
* Set state listener which fired every time {@link Socket#mState} changed.
*
* @param listener state change listener
*/
public SocketListenersBinder addOnChangeStateListener(@NonNull OnStateChangeListener listener) {
// OpenStatusEvent
mSocket.addEventSubscription(mLifecycle, OpenStatusEvent.class,
e -> listener.onOpen(e.response), listener);
// CloseStatusEvent
mSocket.addEventSubscription(mLifecycle, CloseStatusEvent.class,
e -> listener.onClosed(e.code, e.reason), listener);
// FailureStatusEvent
mSocket.addEventSubscription(mLifecycle, FailureStatusEvent.class,
e -> listener.onFailure(e.throwable), listener);
// ReconnectStatusEvent
mSocket.addEventSubscription(mLifecycle, ReconnectStatusEvent.class,
e -> listener.onReconnect(e.attemptsCount, e.attemptDelay), listener);
// ChangeStatusEvent
mSocket.addEventSubscription(mLifecycle, ChangeStatusEvent.class,
e -> listener.onChange(e.status), listener);
return this;
}
/**
* Message listener will be called in any message received even if it's not
* in a {event -> data} format.
*
* @param listener message listener
*/
public SocketListenersBinder addOnMessageListener(@NonNull OnMessageListener listener) {
mSocket.addEventSubscription(mLifecycle, MessageEvent.class, e -> listener.onMessage(e.message), listener);
return this;
}
/**
* Set listener which fired every time message received with contained data.
*
* @param listener message on arrive listener
*/
public SocketListenersBinder addOnEventListener(@NonNull String event, @NonNull OnEventListener listener) {
mSocket.addEventSubscription(mLifecycle, BaseMessageEvent.class, e -> {
if (!event.equals(e.name)) return; // skip if not same event name
listener.onMessage(e.name);
}, listener);
return this;
}
/**
* Set listener which fired every time message received with contained data.
*
* @param listener message on arrive listener
*/
public SocketListenersBinder setOnEventResponseListener(@NonNull String event, @NonNull OnEventResponseListener listener) {
mSocket.addEventSubscription(mLifecycle, ResponseMessageEvent.class, e -> {
if (!event.equals(e.name)) return; // skip if not same event name
listener.onMessage(e.name, e.data);
}, listener);
return this;
}
/**
* Remove listener from being receive new calls.
*
* @param listener message on arrive listener
*/
public void removeListener(@NonNull OnStateChangeListener listener) {
// remove listeners
mSocket.removeEventSubscriptionListener(mLifecycle, OpenStatusEvent.class, listener);
mSocket.removeEventSubscriptionListener(mLifecycle, CloseStatusEvent.class, listener);
mSocket.removeEventSubscriptionListener(mLifecycle, FailureStatusEvent.class, listener);
mSocket.removeEventSubscriptionListener(mLifecycle, ReconnectStatusEvent.class, listener);
mSocket.removeEventSubscriptionListener(mLifecycle, ChangeStatusEvent.class, listener);
}
/**
* Remove listener from being receive new calls.
*
* @param listener listener to be deleted
*/
public void removeListener(@NonNull OnMessageListener listener) {
mSocket.removeEventSubscriptionListener(mLifecycle, MessageEvent.class, listener);
}
/**
* Remove listener from being receive new calls.
*
* @param listener listener to be deleted
*/
public void removeListener(@NonNull OnEventListener listener) {
mSocket.removeEventSubscriptionListener(mLifecycle, BaseMessageEvent.class, listener);
}
/**
* Remove listener from being receive new calls.
*
* @param listener listener to be deleted
*/
public void removeListener(@NonNull OnEventResponseListener listener) {
mSocket.removeEventSubscriptionListener(mLifecycle, ResponseMessageEvent.class, listener);
}
}
/*
* For logging I use `com.orhanobut:logger` Logger
*/
import android.os.Handler;
import android.os.Looper;
import android.support.annotation.NonNull;
import org.json.JSONException;
import org.json.JSONObject;
import java.net.ProtocolException;
import java.util.HashMap;
import java.util.Map;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okhttp3.internal.ws.RealWebSocket;
import okio.ByteString;
/**
* Websocket class based on OkHttp3 with {event->data} message format to make your life easier.
*
* @author Ali Yusuf
*/
public class Socket {
private final static String TAG = Socket.class.getSimpleName();
private final static String CLOSE_REASON = "End of session";
private final static int MAX_COLLISION = 7;
private static Socket mInstance = null;
/**
* Websocket state
*/
private SocketState mState;
/**
* Websocket main request
*/
private Request mRequest;
/**
* Websocket http client
*/
private OkHttpClient.Builder mHttpClient;
/**
* Websocket connection
*/
private RealWebSocket mRealWebSocket;
/**
* Reconnection post delayed handler
*/
private Handler mHandler;
/**
* Stores number of reconnecting attempts
*/
private int reconnectionAttempts;
/**
* Indicate if it's termination to stop reconnecting
*/
private boolean isForceTermination;
/**
* Socket event bus
*/
private PublishSubject<Object> eventBus = PublishSubject.create();
/**
* Map that's help to keep track with hole lifecycle,
* used to cancel all lifecycle subscriptions.
*
* lifecycle -> [events] map
*/
private Map<Object, CompositeDisposable> sSubscriptionsMap = new HashMap<>();
/**
* Map that's help to keep track with lifecycle subscriptions with corresponding
* event and listener. Used to cancel particular subscription or reset it.
*
* lifecycle -> {event -> {listener -> subscription}} map
*/
private Map<Object, Map<Class, Map<Object, Disposable>>> sListenerBinderMap = new HashMap<>();
Socket() {}
@NonNull
public static Socket getInstance() {
if(mInstance == null) {
throw new AssertionError("Make sure to use SocketBuilder before using Socket#getInstance.");
}
return mInstance;
}
static Socket init(OkHttpClient.Builder httpClient, Request request) {
mInstance = new Socket();
mInstance.mHttpClient = httpClient;
mInstance.mRequest = request;
mInstance.mState = SocketState.CLOSED;
mInstance.mHandler = new Handler(Looper.getMainLooper());
mInstance.isForceTermination = false;
return mInstance;
}
/**
* Start socket connection if it's not already started
*/
public void connect() {
if (mInstance.mHttpClient == null || mInstance.mRequest == null) {
throw new IllegalStateException("Make sure to use SocketBuilder before using Socket#connect.");
}
if (mRealWebSocket == null || mState == SocketState.CLOSED) {
mRealWebSocket = (RealWebSocket) mHttpClient.build().newWebSocket(mRequest, webSocketListener);
changeState(SocketState.OPENING);
}
}
/**
* Send message in {event->data} format
*
* @param event event name that you want sent message to
* @param data message data object
* @return true if the message send/on socket send quest; false otherwise
*/
public boolean send(@NonNull String event, @NonNull Object data){
return send(event, data.toString());
}
/**
* Send message in {event->data} format
*
* @param event event name that you want sent message to
* @param data message data in JSON format
* @return true if the message send/on socket send quest; false otherwise
*/
public boolean send(@NonNull String event, @NonNull String data){
if (mRealWebSocket != null && mState == SocketState.OPEN) {
try {
JSONObject text = new JSONObject();
text.put("event", event);
text.put("data", new JSONObject(data));
Logger.t(TAG).v("Try to send data: \n%s", text.toString());
return mRealWebSocket.send(text.toString());
} catch (JSONException e) {
Logger.t(TAG).e(e, "Try to send data with wrong JSON format");
}
}
return false;
}
/**
* Set global listener which fired every time message received with contained data.
*
* @param listener message on arrive listener
*/
public Socket addOnEventListener(@NonNull String event, @NonNull OnEventListener listener){
with(this).addOnEventListener(event, listener);
return this;
}
/**
* Set global listener which fired every time message received with contained data.
*
* @param listener message on arrive listener
*/
public Socket addOnEventResponseListener(@NonNull String event, @NonNull OnEventResponseListener listener){
with(this).setOnEventResponseListener(event, listener);
return this;
}
/**
* Set global state listener which fired every time {@link Socket#mState} changed.
*
* @param listener state change listener
*/
public Socket addOnChangeStateListener(@NonNull OnStateChangeListener listener) {
with(this).addOnChangeStateListener(listener);
return this;
}
/**
* Set global message listener which will be called in any message received even if it's not
* in a {event -> data} format.
*
* @param listener message listener
*/
public Socket addMessageListener(@NonNull OnMessageListener listener) {
with(this).addOnMessageListener(listener);
return this;
}
/**
* Send normal close request to the host
*/
public void close() {
if (mRealWebSocket != null) {
mRealWebSocket.close(1000, CLOSE_REASON);
}
}
/**
* Send close request to the host
*/
public void close(int code, @NonNull String reason) {
if (mRealWebSocket != null) {
mRealWebSocket.close(code, reason);
}
}
/**
* Terminate the socket connection permanently
*/
public void terminate() {
isForceTermination = true; // skip onFailure callback
if (mRealWebSocket != null) {
mRealWebSocket.cancel(); // close connection
mRealWebSocket = null; // clear socket object
}
changeState(SocketState.CLOSED);
postEvent(new SocketEvents.CloseStatusEvent(1006, ""));
}
/**
* Retrieve current socket connection state {@link SocketState}
*/
public SocketState getState() {
return mState;
}
/**
* Change current state and call listener method with new state
* {@link OnStateChangeListener#onChange(SocketState)}
* @param newState new state
*/
private void changeState(SocketState newState) {
mState = newState;
postEvent(new SocketEvents.ChangeStatusEvent(newState));
}
/**
* Try to reconnect to the websocket after delay time using <i>Exponential backoff</i> method.
* @see <a href="https://en.wikipedia.org/wiki/Exponential_backoff"></a>
*/
private void reconnect() {
if (mState != SocketState.CONNECT_ERROR) // connection not closed !!
return;
changeState(SocketState.RECONNECT_ATTEMPT);
if (mRealWebSocket != null) {
// Cancel websocket connection
mRealWebSocket.cancel();
// Clear websocket object
mRealWebSocket = null;
}
// Calculate delay time
int collision = reconnectionAttempts > MAX_COLLISION ? MAX_COLLISION : reconnectionAttempts;
long delayTime = Math.round((Math.pow(2, collision)-1)/2) * 1000;
postEvent(new SocketEvents.ReconnectStatusEvent(reconnectionAttempts + 1, delayTime));
// Remove any pending posts of callbacks
mHandler.removeCallbacksAndMessages(null);
// Start new post delay
mHandler.postDelayed(() -> {
changeState(SocketState.RECONNECTING);
reconnectionAttempts++; // Increment connections attempts
connect(); // Establish new connection
}, delayTime);
}
private WebSocketListener webSocketListener = new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
Logger.t(TAG).v("Socket has been opened successfully.");
// reset connections attempts counter
reconnectionAttempts = 0;
// fire open event listener
changeState(SocketState.OPEN);
postEvent(new SocketEvents.OpenStatusEvent(response));
}
/**
* Accept only Json data with format:
* <b> {"event":"event name","data":{some data ...}} </b>
*/
@Override
public void onMessage(WebSocket webSocket, String text) {
// print received message in log
Logger.t(TAG).v( "New Message received \n%s", text);
// call message listener
postEvent(new SocketEvents.BaseMessageEvent(text));
try {
// Parse message text
JSONObject response = new JSONObject(text);
String event = response.getString("event");
JSONObject data = response.getJSONObject("data");
// call event listener with received data
postEvent(new SocketEvents.ResponseMessageEvent(event, data.toString()));
// call event listener
postEvent(new SocketEvents.BaseMessageEvent(event));
} catch (JSONException e) {
// Message text not in JSON format or don't have {event}|{data} object
Logger.t(TAG).e(e,"Unknown message format.");
}
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
// TODO: some action
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
Logger.t(TAG).v("Close request from server with reason '%s'", reason);
changeState(SocketState.CLOSING);
webSocket.close(1000,reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
Logger.t(TAG).v("Close request from server with reason '%s'", reason);
changeState(SocketState.CLOSED);
postEvent(new SocketEvents.CloseStatusEvent(code, reason));
}
/**
* This method call if:
* - Fail to verify websocket GET request => Throwable {@link ProtocolException}
* - Can't establish websocket connection after upgrade GET request => response null, Throwable {@link Exception}
* - First GET request had been failed => response null, Throwable {@link java.io.IOException}
* - Fail to send Ping => response null, Throwable {@link java.io.IOException}
* - Fail to send data frame => response null, Throwable {@link java.io.IOException}
* - Fail to read data frame => response null, Throwable {@link java.io.IOException}
*/
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
if (!isForceTermination) {
isForceTermination = false; // reset flag
Logger.t(TAG).v( "Socket connection fail, try to reconnect. (" + reconnectionAttempts + ")");
changeState(SocketState.CONNECT_ERROR);
reconnect();
}
postEvent(new SocketEvents.FailureStatusEvent(t));
}
};
/**
* State subscription this lifecycle to socket events and listen for updates on that event.
*
* Note: Make sure to call {@link Socket#unsubscribe(Object)} to avoid memory leaks.
*/
public SocketListenersBinder with(Object lifecycle){
return new SocketListenersBinder(lifecycle, this);
}
/**
* Post an event for all subscribers of that event.
*/
private void postEvent(@NonNull Object event) {
if (eventBus.hasObservers()){
eventBus.onNext(event);
}
}
/**
* Unregisters this object from the listeners bus, removing all subscriptions.
* This should be called when the object is going to go out of memory.
*/
public void unsubscribe(Object lifecycle){
CompositeDisposable compositeSubscription = sSubscriptionsMap.remove(lifecycle);
if (compositeSubscription != null) {
compositeSubscription.dispose();
// clear lifecycle subscriptions of event
sListenerBinderMap.remove(lifecycle);
}
}
/**
* Get the CompositeDisposable or create it if it's not already in memory.
*/
@NonNull
private CompositeDisposable getCompositeSubscription(@NonNull Object object) {
CompositeDisposable compositeSubscription = sSubscriptionsMap.get(object);
if (compositeSubscription == null) {
compositeSubscription = new CompositeDisposable();
sSubscriptionsMap.put(object, compositeSubscription);
}
return compositeSubscription;
}
/**
* Get the event -> disposable map of the specific lifecycle.
*/
private Map<Class, Map<Object, Disposable>> getListenerBinderMap(Object lifecycle) {
Map<Class, Map<Object, Disposable>> disposableMap = sListenerBinderMap.get(lifecycle);
if (disposableMap == null) {
disposableMap = new HashMap<>();
sListenerBinderMap.put(lifecycle, disposableMap);
}
return disposableMap;
}
/**
* Add event subscription to the specified lifecycle and listen for updates on that event,
* each listener subscription must be unique one each lifecycle, event.
* Old subscription of same listener {@code listener} will be disposed if exist.
*/
<T> void addEventSubscription(Object lifecycle, Class<T> eventClass, @NonNull Consumer<T> consumer, Object listener) {
Map<Object, Disposable> disposableMap = getListenerBinderMap(lifecycle).get(eventClass);
if (disposableMap == null) {
disposableMap = new HashMap<>();
} else {
// remove old subscription if exist
removeEventSubscriptionListener(lifecycle, eventClass, listener);
}
// add event subscription to the bus event
Disposable disposable = eventBus.filter(o -> (o != null)) // Filter out null objects, better safe than sorry
.filter(eventClass::isInstance)
.cast(eventClass) // Cast it for easier usage
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
getCompositeSubscription(lifecycle).add(disposable);
// update lifecycle subscriptions
disposableMap.put(listener, disposable);
getListenerBinderMap(lifecycle).put(eventClass, disposableMap);
}
/**
* Remove all event subscriptions of the specified lifecycle.
*/
<T> void removeEventSubscriptions(Object lifecycle, Class<T> eventClass) {
// clear lifecycle subscriptions of event
Map<Object, Disposable> disposableMap = sListenerBinderMap.get(lifecycle).remove(eventClass);
if (disposableMap != null) {
for (Disposable disposable : disposableMap.values()) {
if (disposable != null) {
getCompositeSubscription(lifecycle).remove(disposable); // stop subscription
}
}
}
}
/**
* Remove event listener subscription of the specified lifecycle.
*/
<T> void removeEventSubscriptionListener(Object lifecycle, Class<T> eventClass, Object listener) {
// get subscription
Map<Object, Disposable> disposableMap = sListenerBinderMap.get(lifecycle).get(eventClass);
if (disposableMap != null) {
Disposable disposable = disposableMap.remove(listener);
if (disposable != null) {
getCompositeSubscription(lifecycle).remove(disposable); // stop subscription
}
}
}
}
/**
* Main socket connection states
*/
public enum SocketState {
CLOSED, CLOSING, CONNECT_ERROR, RECONNECT_ATTEMPT, RECONNECTING, OPENING, OPEN
}
@AliYusuf95
Copy link
Author

AliYusuf95 commented Mar 29, 2017

Usage example:

Socket socket = Socket.Builder.with(WEBSOCKET_BASE_URL).build();
socket.connect();
socket.onEvent(Socket.EVENT_OPEN, socketOpenListener);
socket.onEvent(Socket.EVENT_RECONNECT_ATTEMPT, .....);
socket.onEvent(Socket.EVENT_CLOSED, .....);
socket.onEventResponse("Some event", socketPairListener);
socket.send("Some event", "{"some data":"in JSON format"}");
socket.sendOnOpen("Some event", "{"some data":"in JSON format"}");

@BehnamMaboodi
Copy link

Thanks for this usefull class.
Please make such class for server side too

@Volpestyle
Copy link

Thank you for this!

@saugataray1234
Copy link

Hi Ali,
Thanks for sharing the valuable code. But here i'm facing one problem. Using this class, after creating the web socket connection, if i close the app forcefully then EVENT_CLOSED is not getting triggered. here i need one listener which will be invoked automatically as soon as the app being closed forcefully. could you please tell me the solution?

Thanks once again for your support.

i'm eagerly waiting for your answer.

@AliYusuf95
Copy link
Author

Hi Ali,
Thanks for sharing the valuable code. But here i'm facing one problem. Using this class, after creating the web socket connection, if i close the app forcefully then EVENT_CLOSED is not getting triggered. here i need one listener which will be invoked automatically as soon as the app being closed forcefully. could you please tell me the solution?

Thanks once again for your support.

i'm eagerly waiting for your answer.

Actually I did a lot of changes on this code by using JavaRx, I'll update the code soon.

@AliYusuf95
Copy link
Author

Check out the new file SocketRx.java
This is the new Implementation Including these files:

Socket Event listeners
OnEventListener.java
OnEventResponseListener.java
OnMessageListener.java
OnStateChangeListener.java

Socket connection builder
SocketBuilder.java

Events Objects Initiated by the Socket
SocketEvents.java

Bind socket events with lifecycle e.g. Activity
SocketListenersBinder.java

Socket connection status enum
SocketState.java

Usage Example

/*
 * Application level connection
 */
public class MyApplication extends Application {
    @Override
    public void onCreate() {
        super.onCreate();

        // init websocket
        Socket mSocket = SocketBuilder.with(WEBSOCKET_URL)
                .setPingInterval(5, TimeUnit.SECONDS).build();

        // add ws states listeners
        mSocket.addOnChangeStateListener(new OnStateChangeListener() {
            // Socket connection events
            @Override
            public void onChange(SocketState status) {
                switch (status) {
                    case OPEN:
                        // new OnlineEvent();
                        break;
                    case CLOSING: case CLOSED: case RECONNECTING:
                    case RECONNECT_ATTEMPT: case CONNECT_ERROR:
                        // new OfflineEvent();
                        break;
                }
            }
            @Override
            public void onClosed(int code, String reason) {
                // socket should be always connected
                // Even it's closed, open the connection again
                mSocket.connect();
            }
        }).addOnEventListener(Constants.SocketEvents.DISCONNECT, data -> {
            // Custom Event
            // new DisconnectEvent();
        }).addOnEventListener(Constants.SocketEvents.GO_ONLINE, data -> {
            // Custom Event
            // new GoOnlineEvent();
        });
    }
}


/*
 * Use the connection with BaseActivity
 */
public abstract class BaseActivity extends AppCompatActivity {

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {

        // make sure socket connection is established
        Socket.getInstance().connect();
    }

    @Override
    protected void onStart() {
        super.onStart();
        // listen to webSocket events
        Socket.getInstance().with(BaseActivity.this).setOnEventResponseListener("Some Event", (event, data) -> {
            Data mData = parseJsonData(data);
            Socket.getInstance().send("Send Event", "{ Some Json Data}");
        });
    }


    @Override
    protected void onDestroy() {
        super.onDestroy();
        // IMPORTANT
        // Stop listening to socket events for this activity
        Socket.getInstance().unsubscribe(this);
    }

}

@mcMineyC
Copy link

mcMineyC commented Jul 2, 2023

@AliYusuf95 THANK YOU SO MUCH BRO!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! You have no idea the amount of time this saved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment