Skip to content

Instantly share code, notes, and snippets.

@haarts
Last active October 28, 2019 13:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save haarts/657885fa7c047cbcab7e3bb3e1b320db to your computer and use it in GitHub Desktop.
Save haarts/657885fa7c047cbcab7e3bb3e1b320db to your computer and use it in GitHub Desktop.
A transformer to only allow unique entries. `Unique(memory: 1)` is equivalent to `distinct()`.
class Unique<T> extends StreamTransformerBase<T, T> {
final int memory;
const Unique({this.memory = 10});
Stream<T> bind(Stream<T> stream) => Stream<T>.eventTransformed(
stream, (EventSink sink) => UniqueSink<T>(sink, memory: memory));
}
class UniqueSink<T> implements EventSink<T> {
final EventSink<T> _output;
final Set<T> _seen = {};
final int memory;
final int head;
UniqueSink(this._output, {this.memory = 10}) : head = (memory / 5).ceil();
void add(T data) {
// add it if we haven't seen it yet
if (!_seen.contains(data)) {
_seen.add(data);
_output.add(data);
}
// LRU cache
if (_seen.length > memory + head) {
_seen.removeAll(_seen.take((head)).toList());
print(_seen.toList());
}
}
void addError(e, [st]) => _output.addError(e, st);
void close() => _output.close();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment