Skip to content

Instantly share code, notes, and snippets.

@TekExplorer
Last active June 4, 2024 03:44
Show Gist options
  • Save TekExplorer/090c9e37c47e74e462e07e56ecedc782 to your computer and use it in GitHub Desktop.
Save TekExplorer/090c9e37c47e74e462e07e56ecedc782 to your computer and use it in GitHub Desktop.
Example websocket handler
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
final _wsProvider = FutureProvider((ref) async {
final ws = YourWebsocket(Uri(/** ... */));
ref.onDispose(ws.dispose);
await ws.connect();
return ws;
});
final streamProvider = StreamProvider<String>((ref) async* {
final ws = await ref.watch(_wsProvider.future);
yield* ws.stream;
});
final streamConnectionStateProvider =
StreamProvider<ConnectionState>((ref) async* {
final ws = await ref.watch(_wsProvider.future);
yield* ws.connectionStateStream;
});
enum ConnectionState {
connected,
connecting,
disconnected,
}
class YourWebsocket {
YourWebsocket(this._url);
final Uri _url;
static const _heartBeatDuration = Duration(seconds: 10);
static const _pongCountdownDuration = Duration(minutes: 1);
Stream<String> get stream => _streamController.stream;
final _streamController = StreamController<String>();
Stream<ConnectionState> get connectionStateStream =>
_connectionStateController.stream;
final _connectionStateController = StreamController<ConnectionState>();
late WebSocketChannel _websocketChannel;
// a BehaviorSubject would be cleaner, as you can access
// it's last known value for use in the checks below
ConnectionState __connectionState = ConnectionState.disconnected;
ConnectionState get _connectionState => __connectionState;
set _connectionState(ConnectionState value) {
_connectionStateController.add(value);
__connectionState = value;
}
Future<void> connect() async {
if (_connectionState != ConnectionState.disconnected) return;
// beat immediately to guarantee reconnection if connection fails
_heartBeat();
debugPrint('Connecting...');
_connectionState = ConnectionState.connecting;
_websocketChannel = WebSocketChannel.connect(_url);
await _websocketChannel.ready;
// READY //
// disconnect if we don't get a 'pong' for a while
_pong();
_websocketChannel.stream.listen(
(event) {
if (event is! String) return;
_connectionState = ConnectionState.connected;
if (event == 'pong') {
_pong();
return; // no reason to let listeners know.
}
_streamController.add(event);
},
cancelOnError: true,
onError: (Object error, StackTrace stackTrace) {
// maybe handle errors
debugPrint('Lost WebSocket connection because of an error');
_connectionState = ConnectionState.disconnected;
},
onDone: () {
debugPrint('Lost WebSocket connection because it was closed');
_connectionState = ConnectionState.disconnected;
},
);
}
/// nukes and reconnects if a 'pong' was not received in a while
Timer? _pongAwaiterTimer;
/// Starts a countdown that will nuke the websocket.
///
/// Countdown gets pushed back every time this is called
void _pong() {
_pongAwaiterTimer?.cancel();
_pongAwaiterTimer = Timer(
_pongCountdownDuration,
_onDisconnected,
);
}
/// sends a ping every while
Timer? _heartBeatTimer;
/// Every little while, send a 'ping'.
/// Every little while, if we are disconnected, cancel timers and reconnect.
/// Running this again will reset the timer
void _heartBeat() {
_heartBeatTimer?.cancel();
_heartBeatTimer = Timer.periodic(
_heartBeatDuration,
(timer) {
if (_connectionState == ConnectionState.connected) {
_websocketChannel.sink.add('ping');
}
if (_connectionState == ConnectionState.disconnected) {
_onDisconnected();
}
},
);
}
/// When you disconnect, clear timers.
/// If the entire websocket was closed (ie, dispose() was called)
/// we stop.
/// Otherwise, try to reconnect.
void _onDisconnected() {
_heartBeatTimer?.cancel();
_heartBeatTimer = null;
_pongAwaiterTimer?.cancel();
_pongAwaiterTimer = null;
if (_streamController.isClosed) return; // we're done here.
connect();
}
/// Disconnects the client-facing stream,
/// the websocket itself, and clear timers.
void dispose() {
_websocketChannel.sink.close();
_streamController.close();
_onDisconnected();
}
}
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:rxdart/rxdart.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
final _wsProvider = FutureProvider((ref) async {
final ws = YourWebsocket(Uri(/** ... */));
ref.onDispose(ws.dispose);
await ws.connect();
return ws;
});
final streamProvider = StreamProvider<String>((ref) async* {
final ws = await ref.watch(_wsProvider.future);
yield* ws.stream;
});
final streamConnectionStateProvider =
StreamProvider<ConnectionState>((ref) async* {
final ws = await ref.watch(_wsProvider.future);
yield* ws.connectionStateStream;
});
enum ConnectionState {
connected,
connecting,
disconnected,
}
class YourWebsocket {
YourWebsocket(this._url);
final Uri _url;
static const _heartBeatDuration = Duration(seconds: 10);
static const _pongCountdownDuration = Duration(minutes: 1);
Stream<String> get stream => _streamController.stream;
final _streamController = StreamController<String>();
ValueStream<ConnectionState> get connectionStateStream =>
_connectionStateController.stream;
final _connectionStateController = BehaviorSubject<ConnectionState>();
late WebSocketChannel _websocketChannel;
Future<void> connect() async {
if (connectionStateStream.value != ConnectionState.disconnected) return;
// beat immediately to guarantee reconnection if connection fails
_heartBeat();
debugPrint('Connecting...');
_connectionStateController.add(ConnectionState.connecting);
_websocketChannel = WebSocketChannel.connect(_url);
await _websocketChannel.ready;
// READY //
// disconnect if we don't get a 'pong' for a while
_pong();
_websocketChannel.stream.listen(
(event) {
if (event is! String) return;
_connectionStateController.add(ConnectionState.connected);
if (event == 'pong') {
_pong();
return; // no reason to let listeners know.
}
_streamController.add(event);
},
cancelOnError: true,
onError: (Object error, StackTrace stackTrace) {
// maybe handle errors
debugPrint('Lost WebSocket connection because of an error');
_connectionStateController.add(ConnectionState.disconnected);
},
onDone: () {
debugPrint('Lost WebSocket connection because it was closed');
_connectionStateController.add(ConnectionState.disconnected);
},
);
}
/// nukes and reconnects if a 'pong' was not received in a while
Timer? _pongAwaiterTimer;
/// Starts a countdown that will nuke the websocket.
///
/// Countdown gets pushed back every time this is called
void _pong() {
_pongAwaiterTimer?.cancel();
_pongAwaiterTimer = Timer(
_pongCountdownDuration,
_onDisconnected,
);
}
/// sends a ping every while
Timer? _heartBeatTimer;
/// Every little while, send a 'ping'.
/// Every little while, if we are disconnected, cancel timers and reconnect.
/// Running this again will reset the timer
void _heartBeat() {
_heartBeatTimer?.cancel();
_heartBeatTimer = Timer.periodic(
_heartBeatDuration,
(timer) {
if (connectionStateStream.value == ConnectionState.connected) {
_websocketChannel.sink.add('ping');
}
if (connectionStateStream.value == ConnectionState.disconnected) {
_onDisconnected();
}
},
);
}
/// When you disconnect, clear timers.
/// If the entire websocket was closed (ie, dispose() was called)
/// we stop.
/// Otherwise, try to reconnect.
void _onDisconnected() {
_heartBeatTimer?.cancel();
_heartBeatTimer = null;
_pongAwaiterTimer?.cancel();
_pongAwaiterTimer = null;
if (_streamController.isClosed) return; // we're done here.
connect();
}
/// Disconnects the client-facing stream,
/// the websocket itself, and clear timers.
void dispose() {
_websocketChannel.sink.close();
_streamController.close();
_onDisconnected();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment