Last active
April 24, 2019 15:24
-
-
Save swlaschin/4ffac1a64bbfeab383cc3f0e7b09f169 to your computer and use it in GitHub Desktop.
Async example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace AsyncExample | |
// requires Result.fs from https://github.com/swlaschin/DomainModelingMadeFunctional/blob/master/src/OrderTaking/Result.fs | |
open System | |
open System.Net | |
module SimpleAsyncExample = | |
let makeAsync name = async { | |
printfn "Started %s" name | |
do! Async.Sleep(1000) | |
printfn "Ended %s" name | |
} | |
let asyncA = makeAsync "A" | |
let asyncB = makeAsync "B" | |
// nothing happens until you run the async | |
asyncA |> Async.RunSynchronously | |
asyncB |> Async.RunSynchronously | |
asyncA |> Async.StartImmediate | |
asyncB |> Async.StartImmediate | |
module ComposableAsyncExample = | |
// you can build up a tree of asyncs, none of | |
// which run until you explicitly run the last one | |
let readFile fileName = async { | |
printfn "Started reading file %s" fileName | |
do! Async.Sleep(1000) | |
let fileContent = sprintf "file content for %s" fileName | |
printfn "Ended reading %s" fileName | |
return fileContent | |
} | |
let processData (data:string) = | |
data.ToUpper() | |
let writeDb data = async { | |
printfn "Started writing data %s" data | |
do! Async.Sleep(1000) | |
printfn "Ended writing data %s" data | |
return () | |
} | |
let pipeline = async { | |
let! data = readFile "xyz.txt" | |
let processedData = processData data | |
do! writeDb processedData | |
} | |
// nothing happens until you run the pipeline | |
pipeline |> Async.RunSynchronously | |
//============================ | |
// do in parallel | |
//============================ | |
let filenames = [1..10] |> List.map (sprintf "file%i") | |
let singlePipeline fileName = async { | |
let! data = readFile fileName | |
let processedData = processData data | |
do! writeDb processedData | |
} | |
let tasksToRun = | |
filenames | |
|> List.map singlePipeline | |
|> Async.Parallel | |
tasksToRun |> Async.RunSynchronously | |
//let dotNetTask = tasksToRun |> Async.StartAsTask | |
//============================ | |
// do in parallel inside the pipeline | |
//============================ | |
let multiPipeline = async { | |
let! fileDatas = | |
filenames | |
|> List.map readFile | |
|> Async.Parallel | |
let processedDatas = | |
fileDatas | |
|> List.ofArray | |
|> List.map processData | |
let! results = | |
processedDatas | |
|> List.map writeDb | |
|> Async.Parallel | |
return () | |
} | |
multiPipeline |> Async.RunSynchronously | |
module DownloadExample = | |
// a list of sites to fetch | |
let sites = [ | |
"http://www.bing.com"; | |
"http://www.google.com"; | |
"http://www.microsoft.com"; | |
"http://www.amazon.com"; | |
"http://www.yahoo.com" | |
"http://www.bbc.co.uk" | |
"http://www.lexisnexis.co.uk" | |
"http://www.fsharpworks.com" | |
] | |
let fetchUrl url = | |
let uri = Uri(url) | |
let req = WebRequest.Create(uri) | |
use resp = req.GetResponse() | |
use stream = resp.GetResponseStream() | |
use reader = new IO.StreamReader(stream) | |
let html = reader.ReadToEnd() | |
printfn "finished downloading %s" url | |
(* | |
#time // turn interactive timer on | |
sites // start with the list of sites | |
|> List.map fetchUrl // loop through each site and download | |
#time | |
*) | |
let fetchUrlAsync url = | |
async { | |
let uri = Uri(url) | |
let req = WebRequest.Create(uri) | |
// new keyword "use!" | |
use! resp = req.GetResponseAsync() |> Async.AwaitTask | |
use stream = resp.GetResponseStream() | |
use reader = new IO.StreamReader(stream) | |
let! html = reader.ReadToEndAsync() |> Async.AwaitTask | |
printfn "finished downloading %s" url | |
return () | |
} | |
(* | |
#time // turn interactive timer on | |
sites | |
|> List.map fetchUrlAsync // make a list of async tasks | |
|> Async.Parallel // set up the tasks to run in parallel | |
|> Async.RunSynchronously // start them off | |
#time | |
*) | |
fetchUrlAsync "http://dfadfadfafd.com" |> Async.RunSynchronously | |
let errorExample = async { | |
let! dataOrError = | |
fetchUrlAsync "http://dfadfadfafd.com" | |
|> Async.Catch | |
return dataOrError | |
} | |
errorExample | |
|> Async.RunSynchronously // start them off | |
|> fun result -> | |
match result with | |
| Choice1Of2 success -> | |
printfn "Success" | |
| Choice2Of2 exn -> | |
match exn with | |
| :? System.AggregateException as aggr -> | |
for ex in aggr.InnerExceptions do | |
printfn "%s" ex.Message | |
| _ -> | |
printfn "%s" exn.Message | |
module AsyncResultExample = | |
// AsyncResult is same as Async<Result<...>> | |
// You have to "lift" everything up | |
// Example | |
// 1. Download a URL into a JSON file | |
// 2. Decode the JSON into a Customer DTO | |
// 3. Convert the DTO into a valid Customer | |
// 4. Store the Customer in a database | |
// for video of this, see https://www.youtube.com/watch?v=Nrp_LZ-XGsY&t=3585 | |
type CustomerDto = { | |
Name : string | |
} | |
type Customer = { | |
Name : string | |
} | |
type PipelineError = | |
| BadUri of string | |
| BadJson of string | |
| BadDto of string | |
| BadCustomerId of string | |
let download uri = async { | |
printfn "Started reading URI:%s" uri | |
do! Async.Sleep(1000) | |
let result = | |
if uri.StartsWith("baduri") then | |
Error (BadUri uri) | |
else | |
let json = sprintf "json content for %s" uri | |
printfn "...Success: reading URI:%s" uri | |
Ok json | |
return result | |
} | |
let decodeJson (json:string) = | |
printfn "Started decoding json:%s" json | |
let dtoOrError = | |
if json.Contains("badjson") then | |
Error (BadJson json) | |
else | |
let customerDto :CustomerDto = {Name=json} | |
printfn "...Success: decoding json:%s" json | |
Ok customerDto | |
dtoOrError | |
let dtoToDomain (dto:CustomerDto) = | |
printfn "Converting from DTO:%s" dto.Name | |
let customerOrError = | |
if dto.Name.Contains("baddto") then | |
Error (BadDto dto.Name) | |
else | |
let customer :Customer = {Name=dto.Name} | |
printfn "...Success: Converting from DTO:%s" dto.Name | |
Ok customer | |
customerOrError | |
let storeInDb (customer:Customer) = async { | |
printfn "Started writing customer: %s" customer.Name | |
do! Async.Sleep(1000) | |
let result = | |
if customer.Name.Contains("badcustid") then | |
Error (BadCustomerId customer.Name) | |
else | |
printfn "...Success: Writing customer: %s" customer.Name | |
Ok () | |
return result | |
} | |
let pipeline uri = asyncResult { | |
let! json = download uri | |
let! dto = | |
decodeJson json | |
|> AsyncResult.ofResult | |
let! customer = | |
dtoToDomain dto | |
|> AsyncResult.ofResult | |
let! dbResult = storeInDb customer | |
printfn "Pipeline succeeded" | |
return dbResult | |
} | |
// nothing happens until you run the pipeline | |
pipeline "good" |> Async.RunSynchronously | |
pipeline "baduri" |> Async.RunSynchronously | |
pipeline "badjson" |> Async.RunSynchronously | |
pipeline "baddto" |> Async.RunSynchronously | |
pipeline "badcustid" |> Async.RunSynchronously | |
let pipelineUsingBind uri = | |
// in AsyncResult world | |
let jsonAR = download uri | |
// in Result world | |
let decodeCustomer jsonOrError = | |
jsonOrError | |
|> Result.bind decodeJson | |
|> Result.bind dtoToDomain | |
// lift to AsyncResult world | |
let decodeCustomerAR = | |
decodeCustomer |> Async.map | |
jsonAR | |
|> decodeCustomerAR | |
|> AsyncResult.bind storeInDb | |
|> AsyncResult.map (fun success -> | |
printfn "Pipeline succeeded" ) | |
pipelineUsingBind "good" |> Async.RunSynchronously | |
pipelineUsingBind "baddto" |> Async.RunSynchronously | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment