Skip to content

Instantly share code, notes, and snippets.

@LiamKarlMitchell
Created March 4, 2023 11:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LiamKarlMitchell/b1c724896978683c2a4b439eaaf279d6 to your computer and use it in GitHub Desktop.
Save LiamKarlMitchell/b1c724896978683c2a4b439eaaf279d6 to your computer and use it in GitHub Desktop.
Flutter Bloc Delayed Event Emit - Restartable
// This example will use Restartable event transformer and cancel/ignore previous events that are emitted only accepting the latest event.
// Can kind of see this in console and added a random color on build of text.
// This code is distributed under the MIT License.
// Copyright (c) 2018 Felix Angelov.
// You can find the original at https://github.com/felangel/bloc.
import 'package:flutter/material.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
// import 'package:bloc_concurrency/bloc_concurrency.dart'; (Unsupported import on DartPad) so I'll just copy it below.
import 'dart:math';
// Copied code for restartable EventTransformer as can't import bloc_concurrency on DartPad.
// import 'package:stream_transform/stream_transform.dart'; (Unsupported import on DartPad)
// Copied switchMap extension.
// https://github.com/dart-lang/stream_transform/blob/master/lib/src/switch.dart
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
// Copied asyncExpanded.
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//import 'dart:async';
// import 'switch.dart';
/// Alternatives to [asyncExpand].
///
/// The built in [asyncExpand] will not overlap the inner streams and every
/// event will be sent to the callback individually.
///
/// - [concurrentAsyncExpand] allow overlap and merges inner streams without
/// ordering guarantees.
extension AsyncExpand<T> on Stream<T> {
/// Like [asyncExpand] but the [convert] callback may be called for an element
/// before the [Stream] emitted by the previous element has closed.
///
/// Events on the result stream will be emitted in the order they are emitted
/// by the sub streams, which may not match the order of this stream.
///
/// Errors from [convert], the source stream, or any of the sub streams are
/// forwarded to the result stream.
///
/// The result stream will not close until the source stream closes and all
/// sub streams have closed.
///
/// If the source stream is a broadcast stream, the result will be as well,
/// regardless of the types of streams created by [convert]. In this case,
/// some care should be taken:
/// - If [convert] returns a single subscription stream it may be listened to
/// and never canceled.
/// - For any period of time where there are no listeners on the result
/// stream, any sub streams from previously emitted events will be ignored,
/// regardless of whether they emit further events after a listener is added
/// back.
///
/// See also:
/// - [switchMap], which cancels subscriptions to the previous sub stream
/// instead of concurrently emitting events from all sub streams.
Stream<S> concurrentAsyncExpand<S>(Stream<S> Function(T) convert) {
final controller = isBroadcast
? StreamController<S>.broadcast(sync: true)
: StreamController<S>(sync: true);
controller.onListen = () {
final subscriptions = <StreamSubscription<dynamic>>[];
final outerSubscription = map(convert).listen((inner) {
if (isBroadcast && !inner.isBroadcast) {
inner = inner.asBroadcastStream();
}
final subscription =
inner.listen(controller.add, onError: controller.addError);
subscription.onDone(() {
subscriptions.remove(subscription);
if (subscriptions.isEmpty) controller.close();
});
subscriptions.add(subscription);
}, onError: controller.addError);
outerSubscription.onDone(() {
subscriptions.remove(outerSubscription);
if (subscriptions.isEmpty) controller.close();
});
subscriptions.add(outerSubscription);
if (!isBroadcast) {
controller
..onPause = () {
for (final subscription in subscriptions) {
subscription.pause();
}
}
..onResume = () {
for (final subscription in subscriptions) {
subscription.resume();
}
};
}
controller.onCancel = () {
if (subscriptions.isEmpty) return null;
var cancels = [for (var s in subscriptions) s.cancel()]
// Handle opt-out nulls
..removeWhere((Object? f) => f == null);
return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
}
}
// End of AsyncExpanded
/// A utility to take events from the most recent sub stream returned by a
/// callback.
extension Switch<T> on Stream<T> {
/// Maps events to a Stream and emits values from the most recently created
/// Stream.
///
/// When the source emits a value it will be converted to a [Stream] using
/// [convert] and the output will switch to emitting events from that result.
/// Like [asyncExpand] but the [Stream] emitted by a previous element
/// will be ignored as soon as the source stream emits a new event.
///
/// This means that the source stream is not paused until a sub stream
/// returned from the [convert] callback is done. Instead, the subscription
/// to the sub stream is canceled as soon as the source stream emits a new event.
///
/// Errors from [convert], the source stream, or any of the sub streams are
/// forwarded to the result stream.
///
/// The result stream will not close until the source stream closes and
/// the current sub stream have closed.
///
/// If the source stream is a broadcast stream, the result will be as well,
/// regardless of the types of streams created by [convert]. In this case,
/// some care should be taken:
///
/// * If [convert] returns a single subscription stream it may be listened to
/// and never canceled.
///
/// See also:
/// - [concurrentAsyncExpand], which emits events from all sub streams
/// concurrently instead of cancelling subscriptions to previous subs streams.
Stream<S> switchMap<S>(Stream<S> Function(T) convert) {
return map(convert).switchLatest();
}
}
/// A utility to take events from the most recent sub stream.
extension SwitchLatest<T> on Stream<Stream<T>> {
/// Emits values from the most recently emitted Stream.
///
/// When the source emits a stream, the output will switch to emitting events
/// from that stream.
///
/// Whether the source stream is a single-subscription stream or a
/// broadcast stream, the result stream will be the same kind of stream,
/// regardless of the types of streams emitted.
Stream<T> switchLatest() {
var controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
controller.onListen = () {
StreamSubscription<T>? innerSubscription;
var outerStreamDone = false;
void listenToInnerStream(Stream<T> innerStream) {
assert(innerSubscription == null);
var subscription = innerStream
.listen(controller.add, onError: controller.addError, onDone: () {
innerSubscription = null;
if (outerStreamDone) controller.close();
});
// If a pause happens during an innerSubscription.cancel,
// we still listen to the next stream when the cancel is done.
// Then we immediately pause it again here.
if (controller.isPaused) subscription.pause();
innerSubscription = subscription;
}
var addError = controller.addError;
final outerSubscription = listen(null, onError: addError, onDone: () {
outerStreamDone = true;
if (innerSubscription == null) controller.close();
});
outerSubscription.onData((innerStream) async {
var currentSubscription = innerSubscription;
if (currentSubscription == null) {
listenToInnerStream(innerStream);
return;
}
innerSubscription = null;
outerSubscription.pause();
try {
await currentSubscription.cancel();
} catch (error, stack) {
controller.addError(error, stack);
} finally {
if (!isBroadcast && !controller.hasListener) {
// Result single-subscription stream subscription was cancelled
// while waiting for previous innerStream cancel.
//
// Ensure that the last received stream is also listened to and
// cancelled, then do nothing further.
innerStream.listen(null).cancel().ignore();
} else {
outerSubscription.resume();
listenToInnerStream(innerStream);
}
}
});
if (!isBroadcast) {
controller
..onPause = () {
innerSubscription?.pause();
outerSubscription.pause();
}
..onResume = () {
innerSubscription?.resume();
outerSubscription.resume();
};
}
controller.onCancel = () {
var sub = innerSubscription;
var cancels = [
if (!outerStreamDone) outerSubscription.cancel(),
if (sub != null) sub.cancel(),
]
// Handle opt-out nulls
..removeWhere((Object? f) => f == null);
if (cancels.isEmpty) return null;
return Future.wait(cancels).then(_ignore);
};
};
return controller.stream;
}
}
/// Helper function to ignore future callback
void _ignore(_, [__]) {}
// End of switchMap extension.
/// Process only one event by cancelling any pending events and
/// processing the new event immediately.
///
/// Avoid using [restartable] if you expect an event to have
/// immediate results -- it should only be used with asynchronous APIs.
///
/// **Note**: there is no event handler overlap and any currently running tasks
/// will be aborted if a new event is added before a prior one completes.
EventTransformer<Event> restartable<Event>() {
return (events, mapper) => events.switchMap(mapper);
}
// End of Restartable.
var rng = Random();
void main() {
Bloc.observer = AppBlocObserver();
runApp(const App());
}
/// Custom [BlocObserver] that observes all bloc and cubit state changes.
class AppBlocObserver extends BlocObserver {
@override
void onChange(BlocBase bloc, Change change) {
super.onChange(bloc, change);
if (bloc is Cubit) print(change);
}
@override
void onTransition(Bloc bloc, Transition transition) {
super.onTransition(bloc, transition);
print(transition);
}
}
/// {@template app}
/// A [StatelessWidget] that:
/// * uses [bloc](https://pub.dev/packages/bloc) and
/// [flutter_bloc](https://pub.dev/packages/flutter_bloc)
/// to manage the state of a counter and the app theme.
/// {@endtemplate}
class App extends StatelessWidget {
/// {@macro app}
const App({Key? key}) : super(key: key);
@override
Widget build(BuildContext context) {
return BlocProvider(
create: (_) => ThemeCubit(),
child: const AppView(),
);
}
}
/// {@template app_view}
/// A [StatelessWidget] that:
/// * reacts to state changes in the [ThemeCubit]
/// and updates the theme of the [MaterialApp].
/// * renders the [CounterPage].
/// {@endtemplate}
class AppView extends StatelessWidget {
/// {@macro app_view}
const AppView({Key? key}) : super(key: key);
@override
Widget build(BuildContext context) {
return BlocBuilder<ThemeCubit, ThemeData>(
builder: (_, theme) {
return MaterialApp(
theme: theme,
home: const CounterPage(),
);
},
);
}
}
/// {@template counter_page}
/// A [StatelessWidget] that:
/// * provides a [CounterBloc] to the [CounterView].
/// {@endtemplate}
class CounterPage extends StatelessWidget {
/// {@macro counter_page}
const CounterPage({Key? key}) : super(key: key);
@override
Widget build(BuildContext context) {
return BlocProvider(
create: (_) => CounterBloc(),
child: const CounterView(),
);
}
}
/// {@template counter_view}
/// A [StatelessWidget] that:
/// * demonstrates how to consume and interact with a [CounterBloc].
/// {@endtemplate}
class CounterView extends StatelessWidget {
/// {@macro counter_view}
const CounterView({Key? key}) : super(key: key);
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: const Text('Counter')),
body: Center(
child: BlocBuilder<CounterBloc, int>(
builder: (context, count) {
return Text('$count', style: Theme.of(context).textTheme.displayLarge?.merge(TextStyle( color: Colors.primaries[rng.nextInt(Colors.primaries.length)] )));
},
),
),
floatingActionButton: Column(
crossAxisAlignment: CrossAxisAlignment.end,
mainAxisAlignment: MainAxisAlignment.end,
children: <Widget>[
FloatingActionButton(
child: const Icon(Icons.add),
onPressed: () => context.read<CounterBloc>().add(Increment()),
),
const SizedBox(height: 4),
FloatingActionButton(
child: const Icon(Icons.remove),
onPressed: () => context.read<CounterBloc>().add(Decrement()),
),
const SizedBox(height: 4),
FloatingActionButton(
child: const Icon(Icons.brightness_6),
onPressed: () => context.read<ThemeCubit>().toggleTheme(),
),
],
),
);
}
}
/// Event being processed by [CounterBloc].
abstract class CounterEvent {}
/// Notifies bloc to increment state.
class Increment extends CounterEvent {}
/// Notifies bloc to decrement state.
class Decrement extends CounterEvent {}
/// {@template counter_bloc}
/// A simple [Bloc] that manages an `int` as its state.
/// {@endtemplate}
class CounterBloc extends Bloc<CounterEvent, int> {
/// {@macro counter_bloc}
CounterBloc() : super(0) {
on<Increment>((event, emit) => Future.delayed(Duration(seconds: rng.nextInt(5)), () {
print('Time has passed.');
emit(state + 1);
}),
/// Specify a custom event transformer from `package:bloc_concurrency`
/// restartable - process only the latest event and cancel previous event handlers
transformer: restartable(),
);
on<Decrement>((event, emit) => Future.delayed(Duration(seconds: rng.nextInt(5)), () {
print('Time has passed.');
emit(state - 1);
}),
/// Specify a custom event transformer from `package:bloc_concurrency`
/// restartable - process only the latest event and cancel previous event handlers
transformer: restartable(),
);
}
}
/// {@template brightness_cubit}
/// A simple [Cubit] that manages the [ThemeData] as its state.
/// {@endtemplate}
class ThemeCubit extends Cubit<ThemeData> {
/// {@macro brightness_cubit}
ThemeCubit() : super(_lightTheme);
static final _lightTheme = ThemeData(
floatingActionButtonTheme: const FloatingActionButtonThemeData(
foregroundColor: Colors.white,
),
brightness: Brightness.light,
);
static final _darkTheme = ThemeData(
floatingActionButtonTheme: const FloatingActionButtonThemeData(
foregroundColor: Colors.black,
),
brightness: Brightness.dark,
);
/// Toggles the current brightness between light and dark.
void toggleTheme() {
emit(state.brightness == Brightness.dark ? _lightTheme : _darkTheme);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment