Skip to content

Instantly share code, notes, and snippets.

@alxhub
Created October 15, 2013 17:33
Show Gist options
  • Save alxhub/6995381 to your computer and use it in GitHub Desktop.
Save alxhub/6995381 to your computer and use it in GitHub Desktop.
Initial version of a StreamCoupler<T>
// Copyright 2013 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
class StreamCoupler<T> implements StreamConsumer<T> {
StreamController<T> _controller;
Completer<bool> _completer;
StreamSubscription<T> _subscription;
bool _paused = false;
final bool _sync;
final bool _propogateDone;
StreamCoupler({bool sync, bool broadcast, bool propogateDone}) :
this._sync = sync, this._propogateDone = propogateDone {
if (broadcast) {
_controller = new StreamController<T>.broadcast(sync: sync);
} else {
_controller = new StreamController<T>(sync: sync, onCancel: _handleCancel,
onPause: _handlePause, onResume: _handleResume);
}
}
Future addStream(Stream<T> stream) {
if (_subscription != null) {
_subscription.cancel();
_subscription = null;
_completer.complete(false);
if (_subscription != null) {
throw new StateError('addStream() called from synchronous Future of ' +
'previous addStream() during another addStream() operation');
}
}
_completer = new Completer<bool>(sync: _sync);
_subscription = stream.listen(_controller.add)
..onError(_controller.addError)
..onDone(_handleDone);
if (_paused) {
_subscription.pause();
}
}
Future close() {
_controller.close();
if (_subscription != null) {
_completer.complete(null);
}
return new Future.value(null);
}
void _handleCancel() {
if (_subscription != null) {
_subscription.cancel();
_subscription = null;
_completer.complete(true);
}
}
void _handlePause() {
_paused = true;
if (_subscription != null) {
_subscription.pause();
}
}
void _handleResume() {
_paused = false;
if (_subscription != null) {
_subscription.resume();
}
}
void _handleDone() {
if (_propogateDone) {
_controller.close();
}
_subscription = null;
// Cache a temp reference to completer, since it might be synchronous and
// might add another stream, causing [_completer] to get set.
var completer = _completer;
_completer = null;
completer.complete(true);
}
Stream<T> get output => _controller.stream;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment