Skip to content

Instantly share code, notes, and snippets.

@ncthbrt
Last active February 18, 2018 14:55
Show Gist options
  • Save ncthbrt/73a424291e3f4ee9d856d13d2ed07835 to your computer and use it in GitHub Desktop.
Save ncthbrt/73a424291e3f4ee9d856d13d2ed07835 to your computer and use it in GitHub Desktop.
Playing around with stream ideas
type stream('a, 'e, 'sourceType);
/*
* A cold stream is one which flow control is handled by the consumer.
* The consumer invokes a function to get the next value.
*/
module Cold: {
type t;
type result('a, 'e) =
| Ok('a)
| Error('e)
| Complete;
let make: (unit => result('a, 'e)) => stream('a, 'e, t);
};
/*
* A hot stream is one in which flow control is handled by the producer.
* The producer produces events on its own volition and it is up to the consumer to
* consume them expediently
*/
module Hot: {
type t;
type callback('t) = ('t => unit);
let make: (~next: callback('a) => unit, ~error:callback('e) => unit, ~complete:callback(unit) => unit) => stream('a, 'e, t);
};
let fromPromise: Repromise.t('a, 'e) => stream('a, 'e, Hot.t);
let single: 'a => stream('a, 'e, Cold.t);
let flatMap: ('a => stream('b, 'e, 'sourceType), stream('a, 'e, 'sourceType)) => stream('b, 'e, 'sourceType);
let map: ('a => 'b, stream('a,'e, 'sourceType)) => stream('b, 'e, 'sourceType);
let filter: ('a => bool, stream('a, 'e, 'sourceType)) => stream('a, 'e, 'sourceType);
let aggregate: (('acc,'a) => 'acc, 'acc, stream('a, 'e, 'sourceType)) => stream('acc, 'e, 'sourceType);
let reduce: (('acc,'a) => 'acc, 'acc, stream('a, 'e, 'sourceType)) => Repromise.t('acc, 'e);
let merge: (stream('a, 'e, 'sourceType), stream('a, 'e, 'sourceType)) => stream('a, 'e, 'sourceType);
let zip: (stream('a, 'e, 'sourceType), stream('b, 'e, 'sourceType)) => stream(('a, 'b), 'e, 'sourceType);
let buffer: (~size: int=?, stream('a, 'e, Hot.t)) => stream('a, 'e, Cold.t);
let unbuffer: (stream('a, 'e, Cold.t)) => stream('a, 'e, Hot.t);
let observe: ('a => 'ignored, stream('a, 'e, 'sourceType)) => Repromise.t(unit, 'e);
let drain: (stream('a, 'e, 'sourceType)) => Repromise.t(unit, 'e);
let catch: ('e => 'a, stream('a, 'e, 'sourceType)) => stream('a, 'e2, 'sourceType);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment