Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Last active September 9, 2021 22:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pchiusano/63edd75b5834f76921fcc6d731a34856 to your computer and use it in GitHub Desktop.
Save pchiusano/63edd75b5834f76921fcc6d731a34856 to your computer and use it in GitHub Desktop.
Distributed data structure construction kit and map reduce
unique ability Located d where
location : d a -> Location {}
unique ability Storage.Distributed d where
restore : d a -> a
save : a -> d a
saveNear : d x -> a -> d a
unique type Kit d m a
= Empty
| One a m
| Two Nat m (d (Kit d m a)) (d (Kit d m a))
Kit.size : Kit d m a -> Nat
Kit.size = cases
Empty -> 0
One _ _ -> 1
Two n _ _ _ -> n
Kit.metric : m -> Kit d m a -> m
Kit.metric m = cases
Empty -> m
One _ m -> m
Two _ m _ _ -> m
Kit.append : (m -> m ->{g} m) -> Kit d m a -> Kit d m a ->{Distributed d, g} Kit d m a
Kit.append (<>) = cases
Empty, k -> k
k, Empty -> k
One a0 m0, k2 -> Kit.cons (<>) a0 m0 k2
k1, One aN mN -> Kit.snoc (<>) k1 aN mN
k1@(Two sz1 m1 l1 r1), k2@(Two sz2 m2 l2 r2) ->
if (sz1 * 2 >= sz2) && (sz2 * 2 >= sz1) then
sk1 = saveNear l1 k1
sk2 = saveNear l2 k2
Two (sz1 + sz2) (m1 <> m2) sk1 sk2
else if sz1 > sz2 then
(++) = Kit.append (<>)
restore l1 ++ (restore r1 ++ k2)
else
(++) = Kit.append (<>)
(k1 ++ restore l2) ++ restore r2
Kit.empty = Empty
Kit.index : Nat -> Kit d m a ->{Distributed d} Optional a
Kit.index i = cases
Empty -> None
One a _ -> if i == 0 then Some a else None
Two _ _ l r ->
l' = restore l
sl = size l'
if i < sl then Kit.index i l'
else Kit.index (drop i sl) (restore r)
Kit.cons : (m -> m ->{g} m) -> a -> m -> Kit d m a ->{Distributed d, g} Kit d m a
Kit.cons (<>) a0 m0 k = match k with
Empty -> One a0 m0
One a1 m1 ->
sk0 = save (One a0 m0)
Two 2 (m0 <> m1) sk0 (saveNear sk0 k)
Two 2 m l r ->
sk0 = save (One a0 m0)
Two 3 (m0 <> m) sk0 (saveNear sk0 k)
Two n m l r -> Kit.append (<>) (Kit.cons (<>) a0 m0 (restore l)) (restore r)
Kit.snoc : (m -> m ->{g} m) -> Kit d m a -> a -> m ->{Distributed d, g} Kit d m a
Kit.snoc (<>) k aN mN = match k with
Empty -> One aN mN
One a0 m0 ->
sk0 = save (One aN mN)
Two 2 (m0 <> mN) (saveNear sk0 k) sk0
Two 2 m l r ->
sk0 = save (One aN mN)
Two 3 (m <> mN) (saveNear sk0 k) sk0
Two n m l r -> Kit.append (<>) (restore l) (Kit.snoc (<>) (restore r) aN mN)
Kit.fromList : (a -> m) -> (m -> m -> m) -> [a] -> Kit d m a
Kit.fromList f (<>) =
go acc = cases
[] -> acc
a +: as -> go (Kit.snoc (<>) acc a (f a)) as
go Empty
Kit.fromChunkedList : Nat -> (a -> m) -> (m -> m -> m) -> [a] -> Kit d m a
Kit.fromChunkedList chunkSize f (<>) =
go acc chunkacc = cases
[] -> acc
hd +: tl ->
if size chunkacc >= chunkSize
then go (append (<>) acc chunkacc) (One hd (f hd)) tl
else go acc (snoc (<>) chunkacc hd (f hd)) tl
go Empty Empty
Kit.mapReduceAt : (forall x . d x -> Location {})
-> Location {Storage.Distributed d}
-> Reducer {Remote t} a b
-> Kit d m a ->{Storage.Distributed d, Remote t} b
Kit.mapReduceAt locOf region reducer = cases
Empty -> Reducer.zero reducer
One a _ -> Reducer.inject reducer a
Two sz _ l r ->
t1 = forkAt (near region (locOf l)) '(mapReduceAt locOf region reducer (restore l))
t2 = forkAt (near region (locOf r)) '(mapReduceAt locOf region reducer (restore r))
Reducer.op reducer (task.await t1) (task.await t2)
Remote.retry : Nat -> '{Remote t} a -> '{Remote t} a
Remote.retry n r = '(Remote.retry! n r)
Remote.retry! : Nat -> '{Remote t} a ->{Remote t} a
Remote.retry! k a = match Remote.try a with
Right a -> a
Left e | k == 0 -> Remote.fail e
| otherwise -> Remote.retry! (Nat.drop k 1) a
Remote.retrying! : Nat -> '{Remote t,g} a ->{Remote t,g} a
Remote.retrying! numAttempts r =
go = cases
{ a } -> a
{ task.empty! -> k } ->
r = task.empty!
handle k r with go
{ Remote.fail f -> k } ->
Remote.fail f
{ Remote.task.tryComplete e t -> k } ->
r = task.tryComplete e t
handle k r with go
{ Remote.task.tryParent! -> k } ->
r = task.tryParent!
handle k r with go
{ Remote.task.tryJoin t -> k } ->
r = Remote.task.tryJoin t
handle k r with go
{ Remote.tryManyNear n region l -> k } ->
r = Remote.tryManyNear n region l
handle k r with go
{ Remote.tryManyFar n region l -> k } ->
r = Remote.tryManyFar n region l
handle k r with go
{ Remote.task.tryLocation t -> k } ->
r = Remote.task.tryLocation t
handle k r with go
{ Remote.task.tryAwait t -> k } ->
r = Remote.task.tryAwait t
handle k r with go
{ Remote.task.tryIsComplete t -> k } ->
r = Remote.task.tryIsComplete t
handle k r with go
{ Remote.tryDetachAt loc t -> k } ->
r = Remote.try '(Remote.retryDetachAt numAttempts loc t)
handle k r with go
handle !r with go
Remote.racing! : Nat -> '{Remote t,g} a ->{Remote t,g} a
Remote.racing! numAttempts r =
go = cases
{ a } -> a
{ task.empty! -> k } ->
r = task.empty!
handle k r with go
{ Remote.fail f -> k } ->
Remote.fail f
{ Remote.task.tryComplete e t -> k } ->
r = task.tryComplete e t
handle k r with go
{ Remote.task.tryParent! -> k } ->
r = task.tryParent!
handle k r with go
{ Remote.task.tryJoin t -> k } ->
r = Remote.task.tryJoin t
handle k r with go
{ Remote.tryManyNear n region l -> k } ->
r = Remote.tryManyNear n region l
handle k r with go
{ Remote.tryManyFar n region l -> k } ->
r = Remote.tryManyFar n region l
handle k r with go
{ Remote.task.tryLocation t -> k } ->
r = Remote.task.tryLocation t
handle k r with go
{ Remote.task.tryAwait t -> k } ->
r = Remote.task.tryAwait t
handle k r with go
{ Remote.task.tryIsComplete t -> k } ->
r = Remote.task.tryIsComplete t
handle k r with go
{ Remote.tryDetachAt loc t -> k } ->
match Remote.try '(List.replicate numAttempts '(Remote.detachAt loc t)) with
Left e -> handle k (Left e) with go
Right ts ->
r = Remote.raceAt loc ts
handle k (Right r) with go
handle !r with go
Remote.retryDetachAt : Nat -> Location g -> '{g, Exception, Remote t, Fork t g} u ->{Remote t} t u
Remote.retryDetachAt numAttempts loc t =
r = task.empty!
go numAttempts =
trial = Remote.detachAt loc t
match task.tryAwait trial with
Right a -> task.put a r
Left e | numAttempts == 0 -> task.cancel e r
| otherwise -> go (Nat.drop numAttempts 1)
go numAttempts
r
Remote.retryForkAt : Nat -> Location g -> '{g, Exception, Remote t, Fork t g} u ->{Remote t} t u
Remote.retryForkAt numAttempts loc t =
r = task.empty!
go numAttempts =
trial = Remote.forkAt loc t
match task.tryAwait trial with
Right a -> task.put a r
Left e | numAttempts == 0 -> task.cancel e r
| otherwise -> go (Nat.drop numAttempts 1)
go numAttempts
r
Remote.retryForkLinkAt : Nat -> t x -> Location g -> '{g, Exception, Remote t, Fork t g} u ->{Remote t} t u
Remote.retryForkLinkAt numAttempts parent loc t =
r = task.empty!
go numAttempts =
trial = Remote.forkLinkAt parent loc t
match task.tryAwait trial with
Right a -> task.put a r
Left e | numAttempts == 0 -> task.cancel e r
| otherwise -> go (Nat.drop numAttempts 1)
go numAttempts
r
Remote.near : Location g -> Location x ->{Remote t} Location g
Remote.near region loc = match manyNear 1 region loc with
[l] -> l
_ -> region
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment