Created
February 28, 2017 10:32
-
-
Save cboudereau/99a0ed32a7ca7ed2ae7773eac2beaf91 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module PATH = | |
open System | |
let private divider = | |
match Environment.OSVersion.Platform with | |
| PlatformID.Unix | |
| PlatformID.MacOSX -> ":" | |
| _ -> ";" | |
let private zmqFolder = __SOURCE_DIRECTORY__ | |
let private oldPATH = ref "" | |
// temporarily add location of native libs to global environment | |
let hijack () = | |
oldPATH := Environment.GetEnvironmentVariable "PATH" | |
let newPATH = sprintf "%s%s%s" !oldPATH divider zmqFolder | |
Environment.SetEnvironmentVariable ("PATH",newPATH) | |
// undo changes to global environment | |
let release () = | |
if not <| String.IsNullOrWhiteSpace !oldPATH then | |
Environment.SetEnvironmentVariable ("PATH",!oldPATH) | |
oldPATH := "" | |
PATH.hijack () | |
#r @"..\packages\fszmq\lib\net40\fszmq.dll" | |
open fszmq | |
printfn "libzmq version: %A" ZMQ.version | |
open System | |
open System.IO | |
let [<Literal>] InputFile = "d:\input.log" | |
(* | |
File is a sequence of : | |
Payload Length | |
Date (28B length) | |
Payload | |
order is not important. So we could read messages and receive in any order | |
*) | |
let buildFile totalSize minPayload maxPayload output = | |
let content = "Hello"B | |
let nextPayLoad = | |
let r = new Random() | |
fun () -> r.Next(minPayload, maxPayload) | |
use bw = new BinaryWriter(File.OpenWrite(output)) | |
let rec write currentSize = | |
let payload = nextPayLoad () | |
let times = payload/content.Length | |
let messageSize = times * content.Length | |
printfn "size : %i" messageSize | |
bw.Write(messageSize) | |
bw.Write(System.DateTime.UtcNow.ToString("O")) | |
bw.Flush() | |
for _ in 1..times do | |
bw.Write(content, 0, content.Length) | |
let currentSize = currentSize + messageSize | |
printfn "%i / %i" currentSize totalSize | |
if currentSize < totalSize then | |
write currentSize | |
write 0 | |
buildFile (530*1024*1024) 700 2000000 InputFile | |
let serverRaw filePath = | |
use fs = File.OpenWrite(filePath) | |
use context = new Context () | |
use responder = Context.rep context | |
Socket.bind responder "tcp://*:5555" | |
use msg = new Message() | |
let sw = System.Diagnostics.Stopwatch.StartNew() | |
let rec recvAll i = | |
seq { | |
Message.recv msg responder | |
let data = Message.data msg | |
printfn "recv %i" i | |
yield data | |
if Message.hasMore msg then yield! recvAll (i+1) } | |
recvAll 0 |> Seq.iter(fun buffer -> fs.Write(buffer, 0, buffer.Length)) | |
printfn "finished in %O !" sw.Elapsed | |
let client filePath = | |
use context = new Context () | |
use requester = Context.req context | |
Socket.connect requester "tcp://localhost:5555" | |
printfn "client connected!" | |
use fs = File.OpenRead(filePath) | |
let buffer = Array.zeroCreate 81920 | |
let rec read i = | |
let l = fs.Read(buffer, 0, buffer.Length) | |
if l > 0 then | |
printfn "client %i" i | |
if l < buffer.Length then buffer |> Array.take l |> Socket.send requester | |
else Socket.sendMore requester buffer |> ignore | |
read (i+1) | |
read 0 | |
async { | |
async { serverRaw @"d:\received.log" } |> Async.Start | |
async { client InputFile } |> Async.Start | |
} |> Async.RunSynchronously |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment