Skip to content

Instantly share code, notes, and snippets.

@vasily-kirichenko
Created May 9, 2016 08:46
let par (degree: int) (f: 'a -> Async<'b>) : 'a -> Async<'b> =
let agent = MailboxProcessor.Start <| fun mailbox ->
async {
use semaphore = new SemaphoreSlim(degree)
let rec loop () =
async {
let! (x: 'a, rep: AsyncReplyChannel<'b>) = mailbox.Receive()
do! semaphore.WaitAsync() |> Async.AwaitTask
async {
try
let! r = f x
rep.Reply r
finally
semaphore.Release() |> ignore
} |> Async.Start
return! loop ()
}
return! loop ()
}
fun x -> agent.PostAndAsyncReply(fun repCh -> x, repCh)
let searchWithAsyncSeq (file: string) =
use reader = new StreamReader(file)
AsyncSeq.unfoldAsync (fun _ ->
async {
let! line = reader.ReadLineAsync() |> Async.AwaitTask
return Some (line, ())
}) ()
|> AsyncSeq.take 1000000
|> AsyncSeq.chooseAsync (par 4 (fun line ->
async {
match regex.Match line with
| m when m.Success -> return Some m.Groups.["name"].Value
| _ -> return None
}))
|> AsyncSeq.mapAsync (par 4 (fun (name: string) -> async { return normalizeName name }))
|> AsyncSeq.length
|> Async.RunSynchronously
// Real: 00:00:29.538, CPU: 00:01:35.843, GC gen0: 5265, gen1: 7, gen2: 0
// val it : int64 = 24540L
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment