Skip to content

Instantly share code, notes, and snippets.

@7sharp9
Created May 17, 2011 00:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save 7sharp9/975676 to your computer and use it in GitHub Desktop.
Save 7sharp9/975676 to your computer and use it in GitHub Desktop.
desugared async
namespace Pipelets
open System
open System.Reflection
open System.Collections.Concurrent
open FSharp.Control
type pipelet<'a,'b>(processor, router: seq<IPipeletInput<'b>> * 'b -> seq<IPipeletInput<'b>>, capacity, ?overflow, ?blockingTime) =
let buffer = BlockingQueueAgent<_> capacity
let routes = ref List.empty<IPipeletInput<'b>>
let queuedOrRunning = ref false
let blocktime = defaultArg blockingTime 250
let consumerlock = new Object()
let getandprocess =
async.Delay(fun () ->
async.Bind(buffer.AsyncTryGet(blocktime), fun y -> async.Return( y |> Option.map processor)))
//async {
//let! taken = buffer.AsyncTryGet(blocktime)
//return taken |> Option.map processor
//}
let consumerloop =
let rec loop =
async.Delay(fun () ->
async.Bind(getandprocess, fun y ->
if y.IsSome then
do y.Value |> Seq.iter (fun z ->
match !routes with
| [] -> ()
| _ -> do router(!routes, z) |> Seq.iter (fun r -> r.Insert z ))
async.Bind(loop, fun () -> async.Zero())
else
lock consumerlock (fun() ->
queuedOrRunning := false)
async.Zero()))
loop
member this.ClearRoutes = routes := []
interface IPipeletInput<'a> with
member this.Insert payload =
Async.Start(
async.Delay(fun () ->
async.Bind(buffer.AsyncTryAdd(payload, blocktime), fun y ->
if y.IsSome then
//begin consumer loop
if not !queuedOrRunning then
lock consumerlock (fun() ->
Async.Start(consumerloop)
queuedOrRunning := true)
else if overflow.IsSome then
payload |> overflow.Value
async.Zero())))
// try removed for simplicity :)
// with
// | _ as exc ->
// if overflow.IsSome then
// payload |> overflow.Value
// })
// interface IPipeletInput<'a> with
// member this.Insert payload =
// Async.Start(async {
// try
// let! result = buffer.AsyncTryAdd(payload, blocktime)
// if result.IsSome then
// //begin consumer loop
// if not !queuedOrRunning then
// lock consumerlock (fun() ->
// Async.Start(consumerloop)
// queuedOrRunning := true)
// else if overflow.IsSome then
// payload |> overflow.Value
// with
// | _ as exc ->
// if overflow.IsSome then
// payload |> overflow.Value
// })
interface IPipeletConnect<'b> with
member this.Attach (stage) =
let current = !routes
routes := stage :: current
member this.Detach (stage) =
let current = !routes
routes := List.filter (fun el -> el <> stage) current
static member Attach (a:IPipeletConnect<_>) (b) = a.Attach b;b
static member Detach (a: IPipeletConnect<_>) (b) = a.Detach b;a
///Connect operator
static member (++>) (a:IPipeletConnect<_>, b) = a.Attach (b);b
///Detach operator
static member (-->) (a:IPipeletConnect<_>, b) = a.Detach b;a
///Insert into leftoperator
static member (<<--) (a:IPipeletInput<_>, b:'b) = a.Insert b
///Insert into right operator
static member (-->>) (b,a:IPipeletInput<_>) = a.Insert b
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment