Skip to content

Instantly share code, notes, and snippets.

@victorers1
Last active December 11, 2022 18:14
Show Gist options
  • Save victorers1/712dd51add666a5d344d2d162be45816 to your computer and use it in GitHub Desktop.
Save victorers1/712dd51add666a5d344d2d162be45816 to your computer and use it in GitHub Desktop.
Studying Streams in Dart language
import 'dart:async';
class DateTimeGenerator {
DateTimeGenerator() {
final tenSecsAfterNow = DateTime.now().add(Duration(seconds: 10));
// timer with 10 secs lifetime
Timer.periodic(Duration(seconds: 1), (t) {
_date = _date.add(Duration(seconds: 1));
_controller.sink.add(_date);
if (_date.isAfter(tenSecsAfterNow)) {
t.cancel();
}
});
}
DateTime _date = DateTime.now();
final _controller = StreamController<DateTime>();
Stream<DateTime> get stream => _controller.stream;
}
void main() async {
/// Discards reference to DateTimeGenerator obj
final myStream = DateTimeGenerator().stream.asBroadcastStream();
final subscription = myStream.listen(
(date) => print('Date listened: $date'),
onDone: () => print('Stream is done'),
);
/// cancels execution of "onData" and "onDone" of "subscription" obj above
await subscription.cancel();
/// Doesn't throws a Badstate because myStream is a broadcast
await for (final date in myStream) {
print('Date awaited: $date');
}
/// For some reason commands belows are not executed
print('test');
print('First value: ${await myStream.first}');
}
/**
* Output:
Date awaited: 2022-12-01 18:14:29.822319
Date awaited: 2022-12-01 18:14:30.822319
Date awaited: 2022-12-01 18:14:31.822319
Date awaited: 2022-12-01 18:14:32.822319
Date awaited: 2022-12-01 18:14:33.822319
Date awaited: 2022-12-01 18:14:34.822319
Date awaited: 2022-12-01 18:14:35.822319
Date awaited: 2022-12-01 18:14:36.822319
Date awaited: 2022-12-01 18:14:37.822319
Date awaited: 2022-12-01 18:14:38.822319
Date awaited: 2022-12-01 18:14:39.822319
Exited
*/
/**
* Tried to implement this commands but Streams are closed after ".first" is called
print('Last element: ${await myStream.last}');
print(
'Is any element equals now: ${myStream.any((e) => e == DateTime.now())}',
);
print(
'Is every element distinct: ${myStream.distinct((previous, next) => previous == next)}',
);
print('Stream length: ${myStream.length}');
print('Stream converted to String');
myStream.cast<String>().forEach((e) => print(e.substring(10)));
*/
void main() {
// this stream is completed instantly
final myStream = Stream.fromIterable([1, 2, 3, 4, 5]);
// prints are executed
final subscription = myStream.listen(
(data) => print('Data: $data'),
onDone: () => print('Stream is done'),
);
// prints aren't executed because stream has already ended
subscription.onData((data) => print('Data: $data'));
subscription.onDone(() => print('Stream is Done'));
}
import 'dart:async';
class NumberCreator {
NumberCreator() {
Timer.periodic(Duration(seconds: 1), (t) {
_controller.sink.add(_count); // emit new value to the stream
_count++;
});
}
int _count = 1;
final _controller = StreamController<int>();
Stream<int> get stream => _controller.stream;
}
void main() async {
// Never ending stream of integers
final myStream = NumberCreator().stream;
// Streams normally can have only one listener
final subscription = myStream.listen(
(data) => print('Never executed'), // Will be overwrited
onDone: () => print('Stream is done'), // Never executed
);
// onData() is overwrited by the defined on listen() above
subscription.onData((data) => print('Data: $data'));
// Throws a StateError (Bad state: Stream has already been listened to.)
// final subscription2 = myStream.listen(
// (data) => print('Data: $data'),
// );
// Pauses main() function but not the event loop, data is still printed on console
print('main() stuck for 4 secs');
await Future.delayed(Duration(seconds: 4));
// Pauses stream until 'resumeSignal' is completed, after 2 seconds. This pauses console prints
subscription.pause(Future.delayed(Duration(seconds: 2)));
print('main() stuck for 10 secs');
await Future.delayed(Duration(seconds: 10));
// Stream is not being listened after cancel(), but it's still emitting values.
// Therefore, onDone() is not executed
subscription.cancel();
print('subscription canceled');
}
/**
* Output:
main() stuck for 4 secs
Data: 1
Data: 2
Data: 3
Data: 4
main() stuck for 10 secs
Data: 5
Data: 6
Data: 7
Data: 8
Data: 9
Data: 10
Data: 11
Data: 12
Data: 13
Data: 14
subscription canceled
*/
/// Counts to [to] value
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i;
}
}
void main() async {
final Future<String> myStream = countStream(100)
.skip(10) // Ignores the first 10 data from the stream
.where((integer) => integer % 3 == 0) // filters only the multiples of 3
.take(2) // Ignore every element but the 2 firsts
// Expands adds data to the stream
.expand<double>((int integer) {
// For every x, generates an stream with x and other 3 numbers equally distributed between x and x+1
// Example: 6 generates 6, 6.25, 6.5 and 6.75
return <double>[for (var i = 0; i < 4; i++) integer + (i * 0.25)];
})
// Expands adds more data to the stream
.expand<double>((double rational) {
// Example: 6 generates 6, 6.11, 6.22 and 6.33
return <double>[for (var i = 0; i < 4; i++) rational + (i * 0.11)];
})
// "double" is subtype of "num": In practice, this line doesn't affect anything
.cast<num>()
// Fixes lengths of decimal places of double values and converts to string
.map((rational) => '${rational.toStringAsFixed(2)}')
// Creates a single String with each value in a single line
.join('\n');
print(await myStream);
}
/**
* Output:
12.00
12.11
12.22
12.33
12.25
12.36
12.47
12.58
12.50
12.61
12.72
12.83
12.75
12.86
12.97
13.08
15.00
15.11
15.22
15.33
15.25
15.36
15.47
15.58
15.50
15.61
15.72
15.83
15.75
15.86
15.97
16.08
Exited
*/
/// Counts to [to] value
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i;
}
}
void main() async {
/// Needs to be a broadcast because "listen" and "first" both subscribes to the stream
final myStream = countStream(10).asBroadcastStream();
final subscription = myStream.listen((data) => print('Data: $data'));
print('First value: ${await myStream.first}');
/// Throws Bad state: Stream has already been listened to.
print('Last value: ${await myStream.last}');
}
/**
* Output:
Data: 1
First value: 1
Data: 2
Data: 3
Data: 4
Data: 5
Data: 6
Data: 7
Data: 8
Data: 9
Data: 10
Last value: 10
Exited
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment