Last active
May 23, 2017 20:42
-
-
Save pchiusano/1fd8316b354a449c4851cd0dca2826c7 to your computer and use it in GitHub Desktop.
Stream processing scoping primitives
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
data Free f r | |
= Pure r | |
| Fail Err | |
| Eval (f r) | |
| forall x . Bind (Free f x) (Either Err x -> Free f r) | |
type Pull f o r = Free (StreamF o) r | |
data StreamF o x where | |
Emits :: [o] -> StreamF o () | |
OpenScope :: StreamF x Scope | |
Acquire :: Maybe ScopeId -> f r -> StreamF x (r, Token) | |
Release :: Token -> StreamF o () | |
CloseScope :: Scope -> StreamF o () | |
-- UnconsAsync | |
translate :: (forall x . f x -> g x) -> Free f a -> Free g a | |
translate = ... | |
onComplete :: Pull f o r -> Pull f o r -> Pull f o r | |
onComplete p1 p2 = ... | |
scope :: Pull f o r -> Pull f o r | |
scope p = Eval OpenScope >>= \scopeId -> translate (nest scopeId) p `onComplete` Eval (CloseScope scopeId) | |
where | |
nest scopeId streamF = case streamF of | |
Acquire tl fr = Acquire (scopeId : tl) fr | |
_ -> streamF | |
type Scope = [GUID] | |
isParent :: Scope -> Scope -> Bool | |
isParent = isPrefixOf | |
-- We now have a TwoWayLatch, AtomicBoolean, and resource map _per scope_. Call the outer map the "scope map". | |
-- To OpenScope, just create a new GUID - can insert into map lazily on first use | |
-- To CloseScope, wait for critical sections to end, then iterate through the `Map Token (f ())` and free remainders | |
-- ALSO close any sub-scopes as determined by `isParent` | |
-- To Release - just iterate through the resource maps of active scopes (will be very few active scopes) | |
-- To Acquire, insert into the scope map, possibly lazily creating the TwoWayLatch, AtomicBoolean, and resource map | |
-- if nothing already exists | |
-- Acquire and CloseScope use same trick as before to avoid leaving dangling resources | |
runFold :: IORef (Map ScopeId (TwoWayLatch, AtomicBoolean, Map Token (f ())) -> z -> (z -> o -> z) -> Pull f o x -> f z | |
-- Concern: what if `OpenScope` is not balanced by a `CloseScope`? | |
-- Could happen if we asynchronously spawn a step that is scoped - scope will get opened, but | |
-- the inner async step could get killed before reaching `CloseScope` point. | |
-- But this is not a problem, since the parent scope will be calling `CloseScope`, which will clean up | |
-- the interrupted async child scope as well. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment