Skip to content

Instantly share code, notes, and snippets.

@devboy
Created November 24, 2014 17:07
Show Gist options
  • Save devboy/96f5d029a15d8db38faa to your computer and use it in GitHub Desktop.
Save devboy/96f5d029a15d8db38faa to your computer and use it in GitHub Desktop.
Some Throttle Agent in F#
namespace FSharp.Async
open System
open System.Collections.Generic
open System.Collections.Concurrent
type Agent<'T> = MailboxProcessor<'T>
[<ReferenceEquality>]
type Token = | Token of unit with
interface IComparable with
member x.CompareTo(y) =
if Object.ReferenceEquals(x,y)
then 0 else -1
type private ThrottleAgentMessage<'T> =
| Push of Token * 'T * AsyncReplyChannel<unit>
| Pull of Token * AsyncReplyChannel<'T>
| Free of Token * AsyncReplyChannel<unit>
member x.Token =
match x with | Push(t,_,_) | Pull(t,_) | Free(t,_) -> t
type ThrottleAgent<'T> (capacity:int) =
let locked = new Dictionary<Token,'T>(capacity)
let unlocked = new Dictionary<Token,'T>()
let agent = Agent.Start(fun agent -> async {
let update () = async {
let agentAction (f:unit->unit) = Some(async{ f() })
let hasCapacity ()= (locked.Count < capacity)
let isLocked = locked.ContainsKey
let isUnlocked = unlocked.ContainsKey
let tryEnqueue msg =
match msg with
| Free(t, ch) when isLocked t && not <| isUnlocked t ->
agentAction (fun ()-> let v = locked.[t]
locked.Remove(t) |> ignore
unlocked.Add(msg.Token,v)
ch.Reply())
| Pull(t, ch) when not <| isLocked t && isUnlocked t ->
agentAction (fun ()-> let v = unlocked.[t]
unlocked.Remove(t) |> ignore
ch.Reply(v))
| Push(t, v, ch) when not <| isLocked t && hasCapacity() ->
agentAction (fun ()-> locked.Add(msg.Token,v) |> ignore
ch.Reply())
| _ -> None
while true do do! agent.Scan tryEnqueue
}
return! update() })
member x.PushAsync (v:'T) (t:Token) =
agent.PostAndAsyncReply(fun ch -> Push(t, v, ch))
member x.PullAsync (t:Token) =
agent.PostAndAsyncReply(fun ch -> Pull(t, ch))
member x.FreeAsync (t:Token) =
agent.PostAndAsyncReply(fun ch -> Free(t, ch))
type AsyncThrottleAgent<'T> (capacity:int) =
let results = new ConcurrentDictionary<Token,'T>()
let throttle = new ThrottleAgent<Async<'T>>(capacity);
let agent = Agent.Start(fun agent -> async {
let update ()= async {
let agentAction (f:unit->unit) = Some(async{ f() })
let tryEnqueue msg =
match msg with
| Push(t, v, ch) when not <| results.ContainsKey t ->
agentAction(fun ()-> results.TryAdd(t, v) |> ignore
ch.Reply() )
| Pull(t,ch) when results.ContainsKey t ->
agentAction(fun ()-> let v = results.[t]
results.TryRemove t |> ignore
ch.Reply(v))
| Free(t,ch) -> agentAction(fun ()-> ch.Reply())
| _ -> None
while true do do! agent.Scan tryEnqueue
}
return! update()})
member x.PushAsync (v:Async<'T>) (t:Token) = async {
do! throttle.PushAsync v t
let! d = v
do! throttle.FreeAsync t
do! agent.PostAndAsyncReply(fun ch -> Push(t, d, ch))
do throttle.PullAsync t |> ignore
}
member x.PullAsync (t:Token) =
agent.PostAndAsyncReply(fun ch -> Pull(t, ch))
member x.PerformAsync (v:Async<'T>) (t:Token) = async {
do! x.PushAsync v t
return! x.PullAsync t
}
//Example
module Examples =
let throttle = AsyncThrottleAgent<string>(15)
seq { for x in 0 .. 100 do yield async {
try return! throttle.PerformAsync <| FSharp.Data.Http.AsyncRequestString("http://www.github.com", silentHttpErrors=true) <| Token()
with ex -> return ex.Message
}
}
|> Async.Parallel
|> Async.RunSynchronously
|> printfn "results: %A"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment