Skip to content

Instantly share code, notes, and snippets.

@Mike278
Created June 1, 2021 02:44
Show Gist options
  • Save Mike278/f21c92e562428af26af58128d0209b00 to your computer and use it in GitHub Desktop.
Save Mike278/f21c92e562428af26af58128d0209b00 to your computer and use it in GitHub Desktop.
rxdart 0.27 upgrade
import 'dart:async';
import 'package:meta/meta.dart';
import 'package:moor/ffi.dart';
import 'package:moor/moor.dart';
import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';
class TestDb extends GeneratedDatabase {
TestDb() : super(SqlTypeSystem.withDefaults(), VmDatabase.memory());
@override final List<TableInfo> allTables = const [];
@override final int schemaVersion = 1;
}
Future<void> testCase({
@required Stream<int> Function() createOuter,
@required Stream<int> Function() createInner,
}) async {
final log = <int>[];
final timeout = Duration(milliseconds: 100);
final a = createOuter();
final b = a.switchMap((_) => createInner());
b.listen(log.add);
await b.first.then(log.add)
.timeout(timeout, onTimeout: () => fail('1st should complete'));
expect(log, [2, 2]);
b.listen(log.add);
await b.first.then(log.add)
.timeout(timeout, onTimeout: () => fail('2nd should complete'));
expect(log, [2, 2, 2, 2]);
}
void main() {
group('rxdart upgrade', () {
test("1: moor", () async {
final db = TestDb();
Stream<int> selectInt(int i) => db
.customSelect('select $i a')
.map((row) => row.read<int>('a'))
.watchSingle();
await testCase(
createOuter: () => selectInt(1),
createInner: () => selectInt(2),
);
});
test("2: rxdart", () async {
final outer = BehaviorSubject<int>();
final tc = testCase(
createOuter: () => outer,
createInner: () {
final inner = BehaviorSubject<int>();
Future.delayed(Duration(milliseconds: 10)).then((_) => inner.add(2));
return inner;
}
);
await Future.delayed(Duration(milliseconds: 10));
outer.add(1);
await tc;
});
test("3: rxdart+_SyncProxyStream", () async {
final outer = BehaviorSubject<int>();
final tc = testCase(
createOuter: () => _SyncProxyStream(outer, () {}),
createInner: () {
final inner = BehaviorSubject<int>();
Future.delayed(Duration(milliseconds: 10)).then((_) => inner.add(2));
return inner;
}
);
await Future.delayed(Duration(milliseconds: 10));
outer.add(1);
await tc;
});
test("4: rxdart+DoOnSubscribeStream", () async {
final outer = BehaviorSubject<int>();
final tc = testCase(
createOuter: () => DoOnSubscribeStream(outer, () {}),
createInner: () {
final inner = BehaviorSubject<int>();
Future.delayed(Duration(milliseconds: 10)).then((_) => inner.add(2));
return inner;
}
);
await Future.delayed(Duration(milliseconds: 10));
outer.add(1);
await tc;
});
test("5: rxdart+_SyncProxyStream (other example)", () async {
final controller = StreamController<void>.broadcast();
final log = <int>[];
final stream = _SyncProxyStream(controller.stream, () {
log.add(log.length);
print('new subscription');
});
var switched = stream.switchMap((event) => Stream.value(event));
switched.listen(null);
switched.listen(null);
var mapped = stream.map((event) => event);
mapped.listen(null);
mapped.listen(null);
expect(log, [0, 1, 2, 3]);
});
test("6: rxdart+DoOnSubscribeStream (other example)", () async {
final controller = StreamController<void>.broadcast();
final log = <int>[];
final stream = DoOnSubscribeStream(controller.stream, () {
log.add(log.length);
print('new subscription');
});
var switched = stream.switchMap((event) => Stream.value(event));
switched.listen(null);
switched.listen(null);
var mapped = stream.map((event) => event);
mapped.listen(null);
mapped.listen(null);
expect(log, [0, 1, 2, 3]);
});
});
}
class _SyncProxyStream<T> extends Stream<T> {
final Stream<T> _inner;
final void Function() onSubscribe;
_SyncProxyStream(this._inner, this.onSubscribe);
@override
bool get isBroadcast => _inner.isBroadcast;
@override
StreamSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError}) {
onSubscribe();
final inner = _inner.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
return _SyncProxyStreamSubscription(inner);
}
}
class _SyncProxyStreamSubscription<T> extends StreamSubscription<T> {
final StreamSubscription<T> _inner;
_SyncProxyStreamSubscription(this._inner);
@override
Future<E> asFuture<E>([E futureValue]) {
return _inner.asFuture(futureValue);
}
@override
Future<void> cancel() {
return _inner.cancel();
}
@override
bool get isPaused => _inner.isPaused;
@override
void onData(void Function(T data) handleData) {
_inner.onData(handleData);
}
@override
void onDone(void Function() handleDone) {
_inner.onDone(handleDone);
}
@override
void onError(Function handleError) {
_inner.onError(handleError);
}
@override
void pause([Future<void> resumeSignal]) {
_inner.pause(resumeSignal);
}
@override
void resume() {
_inner.resume();
}
}
class DoOnSubscribeStream<T> extends Stream<T> {
final Stream<T> inner;
final void Function() onSubscribe;
DoOnSubscribeStream(this.inner, this.onSubscribe);
@override
bool get isBroadcast => inner.isBroadcast;
@override
StreamSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError}) {
onSubscribe();
return inner.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment