-
-
Save Mike278/f21c92e562428af26af58128d0209b00 to your computer and use it in GitHub Desktop.
rxdart 0.27 upgrade
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'package:meta/meta.dart'; | |
import 'package:moor/ffi.dart'; | |
import 'package:moor/moor.dart'; | |
import 'package:rxdart/rxdart.dart'; | |
import 'package:test/test.dart'; | |
class TestDb extends GeneratedDatabase { | |
TestDb() : super(SqlTypeSystem.withDefaults(), VmDatabase.memory()); | |
@override final List<TableInfo> allTables = const []; | |
@override final int schemaVersion = 1; | |
} | |
Future<void> testCase({ | |
@required Stream<int> Function() createOuter, | |
@required Stream<int> Function() createInner, | |
}) async { | |
final log = <int>[]; | |
final timeout = Duration(milliseconds: 100); | |
final a = createOuter(); | |
final b = a.switchMap((_) => createInner()); | |
b.listen(log.add); | |
await b.first.then(log.add) | |
.timeout(timeout, onTimeout: () => fail('1st should complete')); | |
expect(log, [2, 2]); | |
b.listen(log.add); | |
await b.first.then(log.add) | |
.timeout(timeout, onTimeout: () => fail('2nd should complete')); | |
expect(log, [2, 2, 2, 2]); | |
} | |
void main() { | |
group('rxdart upgrade', () { | |
test("1: moor", () async { | |
final db = TestDb(); | |
Stream<int> selectInt(int i) => db | |
.customSelect('select $i a') | |
.map((row) => row.read<int>('a')) | |
.watchSingle(); | |
await testCase( | |
createOuter: () => selectInt(1), | |
createInner: () => selectInt(2), | |
); | |
}); | |
test("2: rxdart", () async { | |
final outer = BehaviorSubject<int>(); | |
final tc = testCase( | |
createOuter: () => outer, | |
createInner: () { | |
final inner = BehaviorSubject<int>(); | |
Future.delayed(Duration(milliseconds: 10)).then((_) => inner.add(2)); | |
return inner; | |
} | |
); | |
await Future.delayed(Duration(milliseconds: 10)); | |
outer.add(1); | |
await tc; | |
}); | |
test("3: rxdart+_SyncProxyStream", () async { | |
final outer = BehaviorSubject<int>(); | |
final tc = testCase( | |
createOuter: () => _SyncProxyStream(outer, () {}), | |
createInner: () { | |
final inner = BehaviorSubject<int>(); | |
Future.delayed(Duration(milliseconds: 10)).then((_) => inner.add(2)); | |
return inner; | |
} | |
); | |
await Future.delayed(Duration(milliseconds: 10)); | |
outer.add(1); | |
await tc; | |
}); | |
test("4: rxdart+DoOnSubscribeStream", () async { | |
final outer = BehaviorSubject<int>(); | |
final tc = testCase( | |
createOuter: () => DoOnSubscribeStream(outer, () {}), | |
createInner: () { | |
final inner = BehaviorSubject<int>(); | |
Future.delayed(Duration(milliseconds: 10)).then((_) => inner.add(2)); | |
return inner; | |
} | |
); | |
await Future.delayed(Duration(milliseconds: 10)); | |
outer.add(1); | |
await tc; | |
}); | |
test("5: rxdart+_SyncProxyStream (other example)", () async { | |
final controller = StreamController<void>.broadcast(); | |
final log = <int>[]; | |
final stream = _SyncProxyStream(controller.stream, () { | |
log.add(log.length); | |
print('new subscription'); | |
}); | |
var switched = stream.switchMap((event) => Stream.value(event)); | |
switched.listen(null); | |
switched.listen(null); | |
var mapped = stream.map((event) => event); | |
mapped.listen(null); | |
mapped.listen(null); | |
expect(log, [0, 1, 2, 3]); | |
}); | |
test("6: rxdart+DoOnSubscribeStream (other example)", () async { | |
final controller = StreamController<void>.broadcast(); | |
final log = <int>[]; | |
final stream = DoOnSubscribeStream(controller.stream, () { | |
log.add(log.length); | |
print('new subscription'); | |
}); | |
var switched = stream.switchMap((event) => Stream.value(event)); | |
switched.listen(null); | |
switched.listen(null); | |
var mapped = stream.map((event) => event); | |
mapped.listen(null); | |
mapped.listen(null); | |
expect(log, [0, 1, 2, 3]); | |
}); | |
}); | |
} | |
class _SyncProxyStream<T> extends Stream<T> { | |
final Stream<T> _inner; | |
final void Function() onSubscribe; | |
_SyncProxyStream(this._inner, this.onSubscribe); | |
@override | |
bool get isBroadcast => _inner.isBroadcast; | |
@override | |
StreamSubscription<T> listen(void Function(T event) onData, | |
{Function onError, void Function() onDone, bool cancelOnError}) { | |
onSubscribe(); | |
final inner = _inner.listen(onData, | |
onError: onError, onDone: onDone, cancelOnError: cancelOnError); | |
return _SyncProxyStreamSubscription(inner); | |
} | |
} | |
class _SyncProxyStreamSubscription<T> extends StreamSubscription<T> { | |
final StreamSubscription<T> _inner; | |
_SyncProxyStreamSubscription(this._inner); | |
@override | |
Future<E> asFuture<E>([E futureValue]) { | |
return _inner.asFuture(futureValue); | |
} | |
@override | |
Future<void> cancel() { | |
return _inner.cancel(); | |
} | |
@override | |
bool get isPaused => _inner.isPaused; | |
@override | |
void onData(void Function(T data) handleData) { | |
_inner.onData(handleData); | |
} | |
@override | |
void onDone(void Function() handleDone) { | |
_inner.onDone(handleDone); | |
} | |
@override | |
void onError(Function handleError) { | |
_inner.onError(handleError); | |
} | |
@override | |
void pause([Future<void> resumeSignal]) { | |
_inner.pause(resumeSignal); | |
} | |
@override | |
void resume() { | |
_inner.resume(); | |
} | |
} | |
class DoOnSubscribeStream<T> extends Stream<T> { | |
final Stream<T> inner; | |
final void Function() onSubscribe; | |
DoOnSubscribeStream(this.inner, this.onSubscribe); | |
@override | |
bool get isBroadcast => inner.isBroadcast; | |
@override | |
StreamSubscription<T> listen(void Function(T event) onData, | |
{Function onError, void Function() onDone, bool cancelOnError}) { | |
onSubscribe(); | |
return inner.listen(onData, | |
onError: onError, onDone: onDone, cancelOnError: cancelOnError); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment