Last active
June 4, 2024 03:44
-
-
Save TekExplorer/090c9e37c47e74e462e07e56ecedc782 to your computer and use it in GitHub Desktop.
Example websocket handler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'package:flutter/foundation.dart'; | |
import 'package:flutter_riverpod/flutter_riverpod.dart'; | |
import 'package:web_socket_channel/web_socket_channel.dart'; | |
final _wsProvider = FutureProvider((ref) async { | |
final ws = YourWebsocket(Uri(/** ... */)); | |
ref.onDispose(ws.dispose); | |
await ws.connect(); | |
return ws; | |
}); | |
final streamProvider = StreamProvider<String>((ref) async* { | |
final ws = await ref.watch(_wsProvider.future); | |
yield* ws.stream; | |
}); | |
final streamConnectionStateProvider = | |
StreamProvider<ConnectionState>((ref) async* { | |
final ws = await ref.watch(_wsProvider.future); | |
yield* ws.connectionStateStream; | |
}); | |
enum ConnectionState { | |
connected, | |
connecting, | |
disconnected, | |
} | |
class YourWebsocket { | |
YourWebsocket(this._url); | |
final Uri _url; | |
static const _heartBeatDuration = Duration(seconds: 10); | |
static const _pongCountdownDuration = Duration(minutes: 1); | |
Stream<String> get stream => _streamController.stream; | |
final _streamController = StreamController<String>(); | |
Stream<ConnectionState> get connectionStateStream => | |
_connectionStateController.stream; | |
final _connectionStateController = StreamController<ConnectionState>(); | |
late WebSocketChannel _websocketChannel; | |
// a BehaviorSubject would be cleaner, as you can access | |
// it's last known value for use in the checks below | |
ConnectionState __connectionState = ConnectionState.disconnected; | |
ConnectionState get _connectionState => __connectionState; | |
set _connectionState(ConnectionState value) { | |
_connectionStateController.add(value); | |
__connectionState = value; | |
} | |
Future<void> connect() async { | |
if (_connectionState != ConnectionState.disconnected) return; | |
// beat immediately to guarantee reconnection if connection fails | |
_heartBeat(); | |
debugPrint('Connecting...'); | |
_connectionState = ConnectionState.connecting; | |
_websocketChannel = WebSocketChannel.connect(_url); | |
await _websocketChannel.ready; | |
// READY // | |
// disconnect if we don't get a 'pong' for a while | |
_pong(); | |
_websocketChannel.stream.listen( | |
(event) { | |
if (event is! String) return; | |
_connectionState = ConnectionState.connected; | |
if (event == 'pong') { | |
_pong(); | |
return; // no reason to let listeners know. | |
} | |
_streamController.add(event); | |
}, | |
cancelOnError: true, | |
onError: (Object error, StackTrace stackTrace) { | |
// maybe handle errors | |
debugPrint('Lost WebSocket connection because of an error'); | |
_connectionState = ConnectionState.disconnected; | |
}, | |
onDone: () { | |
debugPrint('Lost WebSocket connection because it was closed'); | |
_connectionState = ConnectionState.disconnected; | |
}, | |
); | |
} | |
/// nukes and reconnects if a 'pong' was not received in a while | |
Timer? _pongAwaiterTimer; | |
/// Starts a countdown that will nuke the websocket. | |
/// | |
/// Countdown gets pushed back every time this is called | |
void _pong() { | |
_pongAwaiterTimer?.cancel(); | |
_pongAwaiterTimer = Timer( | |
_pongCountdownDuration, | |
_onDisconnected, | |
); | |
} | |
/// sends a ping every while | |
Timer? _heartBeatTimer; | |
/// Every little while, send a 'ping'. | |
/// Every little while, if we are disconnected, cancel timers and reconnect. | |
/// Running this again will reset the timer | |
void _heartBeat() { | |
_heartBeatTimer?.cancel(); | |
_heartBeatTimer = Timer.periodic( | |
_heartBeatDuration, | |
(timer) { | |
if (_connectionState == ConnectionState.connected) { | |
_websocketChannel.sink.add('ping'); | |
} | |
if (_connectionState == ConnectionState.disconnected) { | |
_onDisconnected(); | |
} | |
}, | |
); | |
} | |
/// When you disconnect, clear timers. | |
/// If the entire websocket was closed (ie, dispose() was called) | |
/// we stop. | |
/// Otherwise, try to reconnect. | |
void _onDisconnected() { | |
_heartBeatTimer?.cancel(); | |
_heartBeatTimer = null; | |
_pongAwaiterTimer?.cancel(); | |
_pongAwaiterTimer = null; | |
if (_streamController.isClosed) return; // we're done here. | |
connect(); | |
} | |
/// Disconnects the client-facing stream, | |
/// the websocket itself, and clear timers. | |
void dispose() { | |
_websocketChannel.sink.close(); | |
_streamController.close(); | |
_onDisconnected(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'package:flutter/foundation.dart'; | |
import 'package:flutter_riverpod/flutter_riverpod.dart'; | |
import 'package:rxdart/rxdart.dart'; | |
import 'package:web_socket_channel/web_socket_channel.dart'; | |
final _wsProvider = FutureProvider((ref) async { | |
final ws = YourWebsocket(Uri(/** ... */)); | |
ref.onDispose(ws.dispose); | |
await ws.connect(); | |
return ws; | |
}); | |
final streamProvider = StreamProvider<String>((ref) async* { | |
final ws = await ref.watch(_wsProvider.future); | |
yield* ws.stream; | |
}); | |
final streamConnectionStateProvider = | |
StreamProvider<ConnectionState>((ref) async* { | |
final ws = await ref.watch(_wsProvider.future); | |
yield* ws.connectionStateStream; | |
}); | |
enum ConnectionState { | |
connected, | |
connecting, | |
disconnected, | |
} | |
class YourWebsocket { | |
YourWebsocket(this._url); | |
final Uri _url; | |
static const _heartBeatDuration = Duration(seconds: 10); | |
static const _pongCountdownDuration = Duration(minutes: 1); | |
Stream<String> get stream => _streamController.stream; | |
final _streamController = StreamController<String>(); | |
ValueStream<ConnectionState> get connectionStateStream => | |
_connectionStateController.stream; | |
final _connectionStateController = BehaviorSubject<ConnectionState>(); | |
late WebSocketChannel _websocketChannel; | |
Future<void> connect() async { | |
if (connectionStateStream.value != ConnectionState.disconnected) return; | |
// beat immediately to guarantee reconnection if connection fails | |
_heartBeat(); | |
debugPrint('Connecting...'); | |
_connectionStateController.add(ConnectionState.connecting); | |
_websocketChannel = WebSocketChannel.connect(_url); | |
await _websocketChannel.ready; | |
// READY // | |
// disconnect if we don't get a 'pong' for a while | |
_pong(); | |
_websocketChannel.stream.listen( | |
(event) { | |
if (event is! String) return; | |
_connectionStateController.add(ConnectionState.connected); | |
if (event == 'pong') { | |
_pong(); | |
return; // no reason to let listeners know. | |
} | |
_streamController.add(event); | |
}, | |
cancelOnError: true, | |
onError: (Object error, StackTrace stackTrace) { | |
// maybe handle errors | |
debugPrint('Lost WebSocket connection because of an error'); | |
_connectionStateController.add(ConnectionState.disconnected); | |
}, | |
onDone: () { | |
debugPrint('Lost WebSocket connection because it was closed'); | |
_connectionStateController.add(ConnectionState.disconnected); | |
}, | |
); | |
} | |
/// nukes and reconnects if a 'pong' was not received in a while | |
Timer? _pongAwaiterTimer; | |
/// Starts a countdown that will nuke the websocket. | |
/// | |
/// Countdown gets pushed back every time this is called | |
void _pong() { | |
_pongAwaiterTimer?.cancel(); | |
_pongAwaiterTimer = Timer( | |
_pongCountdownDuration, | |
_onDisconnected, | |
); | |
} | |
/// sends a ping every while | |
Timer? _heartBeatTimer; | |
/// Every little while, send a 'ping'. | |
/// Every little while, if we are disconnected, cancel timers and reconnect. | |
/// Running this again will reset the timer | |
void _heartBeat() { | |
_heartBeatTimer?.cancel(); | |
_heartBeatTimer = Timer.periodic( | |
_heartBeatDuration, | |
(timer) { | |
if (connectionStateStream.value == ConnectionState.connected) { | |
_websocketChannel.sink.add('ping'); | |
} | |
if (connectionStateStream.value == ConnectionState.disconnected) { | |
_onDisconnected(); | |
} | |
}, | |
); | |
} | |
/// When you disconnect, clear timers. | |
/// If the entire websocket was closed (ie, dispose() was called) | |
/// we stop. | |
/// Otherwise, try to reconnect. | |
void _onDisconnected() { | |
_heartBeatTimer?.cancel(); | |
_heartBeatTimer = null; | |
_pongAwaiterTimer?.cancel(); | |
_pongAwaiterTimer = null; | |
if (_streamController.isClosed) return; // we're done here. | |
connect(); | |
} | |
/// Disconnects the client-facing stream, | |
/// the websocket itself, and clear timers. | |
void dispose() { | |
_websocketChannel.sink.close(); | |
_streamController.close(); | |
_onDisconnected(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment