Skip to content

Instantly share code, notes, and snippets.

@jrfondren jrfondren/delay.nim
Last active May 16, 2019

Embed
What would you like to do?
512ms
import asyncdispatch, asyncnet, times
when defined(tryfix):
# modified asyncstream
# begin -old--old--old--old--old--old--old--old--old--old--old--old--old--old--
import asyncfutures, deques
type
FutureStream*[T] = ref object
queue: Deque[T]
finished: bool
cb: proc () {.closure, gcsafe.}
proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
result = FutureStream[T](finished: false, cb: nil)
result.queue = initDeque[T]()
proc complete*[T](future: FutureStream[T]) =
future.finished = true
if not future.cb.isNil:
future.cb()
proc `callback=`*[T](future: FutureStream[T],
cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) =
future.cb = proc () = cb(future)
if future.queue.len > 0 or future.finished:
callSoon(future.cb)
proc finished*[T](future: FutureStream[T]): bool =
result = future.finished and future.queue.len == 0
proc write*[T](future: FutureStream[T], value: T): Future[void] =
result = newFuture[void]("FutureStream.put")
if future.finished:
let msg = "FutureStream is finished and so no longer accepts new data."
result.fail(newException(ValueError, msg))
return
future.queue.addLast(value)
if not future.cb.isNil: future.cb()
result.complete()
proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
var resFut = newFuture[(bool, T)]("FutureStream.take")
let savedCb = future.cb
var newCb =
proc (fs: FutureStream[T]) =
if resFut.finished: return
future.cb = nil
var res: (bool, T)
if finished(fs):
res[0] = false
else:
res[0] = true
res[1] = fs.queue.popFirst()
resFut.complete(res)
if not savedCb.isNil: savedCb()
# begin -new--new--new--new--new--new--new--new--new--new--new--new--new--new--
if future.queue.len > 0 or future.finished:
newCb(future)
# end -new--new--new--new--new--new--new--new--new--new--new--new--new--new--
else:
future.callback = newCb
return resFut
# not from asyncstreams but copied here for *this* asyncstreams
proc readAll*(future: FutureStream[string]): Future[string] {.async.} =
result = ""
while true:
let (hasValue, value) = await future.read()
if hasValue:
result.add(value)
else:
break
# end -old--old--old--old--old--old--old--old--old--old--old--old--old--old--
else:
import asyncstreams
let time = now()
proc greeter {.async.} =
let serv = newAsyncSocket()
serv.setSockOpt(OptReuseAddr, true)
serv.bindAddr(Port(4001))
serv.listen()
let client = await serv.accept()
waitFor client.send("hello\n")
client.close()
asyncCheck greeter()
echo "connecting: ", now() - time
let client = newAsyncSocket()
waitFor client.connect("localhost", Port(4001))
let resp = waitFor client.recv(4000)
client.close()
let stream = newFutureStream[string]("client.connect")
echo "writing to stream: ", now() - time
waitFor stream.write(resp)
echo "completing stream: ", now() - time
stream.complete()
echo "reading stream: ", now() - time
when defined(readAll):
echo waitFor stream.readAll
else:
echo waitFor stream.read
echo "done: ", now() - time
@jrfondren

This comment has been minimized.

Copy link
Owner Author

commented May 16, 2019

# nim c --hints:off --run delay
connecting: 1 millisecond and 505 microseconds
writing to stream: 2 milliseconds and 851 microseconds
completing stream: 2 milliseconds and 894 microseconds
reading stream: 2 milliseconds and 906 microseconds
(Field0: true, Field1: "hello\n")
done: 504 milliseconds and 576 microseconds

# nim c --hints:off -d:readAll --run delay
connecting: 1 millisecond and 421 microseconds
writing to stream: 2 milliseconds and 774 microseconds
completing stream: 2 milliseconds and 815 microseconds
reading stream: 2 milliseconds and 834 microseconds
hello

done: 508 milliseconds and 93 microseconds

vs.

# nim c --hints:off -d:tryfix --run delay
connecting: 1 millisecond and 480 microseconds
writing to stream: 2 milliseconds and 834 microseconds
completing stream: 2 milliseconds and 870 microseconds
reading stream: 2 milliseconds and 883 microseconds
(Field0: true, Field1: "hello\n")
done: 2 milliseconds and 908 microseconds

# nim c --hints:off -d:tryfix -d:readAll --run delay
connecting: 1 millisecond and 381 microseconds
writing to stream: 2 milliseconds and 580 microseconds
completing stream: 2 milliseconds and 617 microseconds
reading stream: 2 milliseconds and 632 microseconds
hello

done: 2 milliseconds and 685 microseconds
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.