Created
July 8, 2021 07:01
-
-
Save comatory/7f716573db0d39a22b823aaa467f5b6e to your computer and use it in GitHub Desktop.
Dart / Flutter "Bad state: Stream has already been listened to" without broadcast stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* NOTE: This approach demonstrates how to recreate streams when | |
your listeners are being recreated. | |
It is useful when you cannot or do not want to use broadcast | |
streams. Downside to broadcast streams is that it is not | |
guaranteed that your listener will receive values emitted | |
by the stream before it was registered. | |
*/ | |
import 'dart:async'; | |
import 'dart:math'; | |
// [StreamService] manages state of your streams. Each listener | |
// must have id which is used in [_streamControllers] map to | |
// look up relevant stream controller. | |
class StreamService { | |
final Map<String, StreamController<int>?> _streamControllers = {}; | |
Stream<int> getNamedStream(String id) { | |
final controller = _getController(id); | |
return controller.stream; | |
} | |
// Will get existing stream controller by [id] or create a new | |
// one if it does not exist | |
StreamController<int> _getController(String id) { | |
final controller = _streamControllers[id] ?? _createController(); | |
_streamControllers[id] = controller; | |
return controller; | |
} | |
void push(String id) { | |
final controller = _getController(id); | |
final rand = Random(); | |
final value = rand.nextInt(1000); | |
controller.add(value); | |
} | |
// This method can be called by listener so | |
// memory leaks are avoided. This is a cleanup | |
// method that will make sure the stream controller | |
// is removed safely | |
void disposeController(String id) { | |
final controller = _streamControllers[id]; | |
if (controller == null) { | |
throw Exception('Controller $id is not registered.'); | |
} | |
controller.close(); | |
_streamControllers.remove(id); | |
print('Removed controller $id'); | |
} | |
// This method should be called when you want to remove | |
// all controllers. It should be called before the instance | |
// of this class is garbage collected / removed from memory. | |
void dispose() { | |
_streamControllers.forEach((id, controller) { | |
controller?.close(); | |
print('Removed controller $id during dispose phase'); | |
}); | |
_streamControllers.clear(); | |
} | |
StreamController<int> _createController() { | |
return StreamController<int>(); | |
} | |
} | |
class ManagedListener { | |
ManagedListener({ | |
required this.id, | |
required StreamService streamService, | |
}) { | |
_streamService = streamService; | |
} | |
final String id; | |
late StreamService _streamService; | |
StreamSubscription<int>? _subscription; | |
void register() { | |
_subscription = _streamService.getNamedStream(id).listen(_handleStreamChange); | |
} | |
void dispose() { | |
_subscription?.cancel(); | |
_streamService.disposeController(id); | |
} | |
void _handleStreamChange(int n) { | |
print('[$id]: streamed $n'); | |
} | |
} | |
void main(List<String> arguments) async { | |
final streamService = StreamService(); | |
final listener1Id = 'id_1'; | |
final listener2Id = 'id_2'; | |
final listener1 = ManagedListener(id: listener1Id, streamService: streamService); | |
listener1.register(); | |
streamService.push(listener1Id); | |
streamService.push(listener1Id); | |
streamService.push(listener1Id); | |
await Future.delayed(const Duration(seconds: 1)); | |
final listener2 = ManagedListener(id: listener2Id, streamService: streamService); | |
listener2.register(); | |
streamService.push(listener2Id); | |
streamService.push(listener2Id); | |
await Future.delayed(const Duration(seconds: 1)); | |
listener1.dispose(); | |
listener2.dispose(); | |
streamService.dispose(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment