Last active
May 16, 2019 00:32
-
-
Save jrfondren/abeaef41b69b149945d325c34936f697 to your computer and use it in GitHub Desktop.
512ms
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncdispatch, asyncnet, times | |
when defined(tryfix): | |
# modified asyncstream | |
# begin -old--old--old--old--old--old--old--old--old--old--old--old--old--old-- | |
import asyncfutures, deques | |
type | |
FutureStream*[T] = ref object | |
queue: Deque[T] | |
finished: bool | |
cb: proc () {.closure, gcsafe.} | |
proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = | |
result = FutureStream[T](finished: false, cb: nil) | |
result.queue = initDeque[T]() | |
proc complete*[T](future: FutureStream[T]) = | |
future.finished = true | |
if not future.cb.isNil: | |
future.cb() | |
proc `callback=`*[T](future: FutureStream[T], | |
cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) = | |
future.cb = proc () = cb(future) | |
if future.queue.len > 0 or future.finished: | |
callSoon(future.cb) | |
proc finished*[T](future: FutureStream[T]): bool = | |
result = future.finished and future.queue.len == 0 | |
proc write*[T](future: FutureStream[T], value: T): Future[void] = | |
result = newFuture[void]("FutureStream.put") | |
if future.finished: | |
let msg = "FutureStream is finished and so no longer accepts new data." | |
result.fail(newException(ValueError, msg)) | |
return | |
future.queue.addLast(value) | |
if not future.cb.isNil: future.cb() | |
result.complete() | |
proc read*[T](future: FutureStream[T]): Future[(bool, T)] = | |
var resFut = newFuture[(bool, T)]("FutureStream.take") | |
let savedCb = future.cb | |
var newCb = | |
proc (fs: FutureStream[T]) = | |
if resFut.finished: return | |
future.cb = nil | |
var res: (bool, T) | |
if finished(fs): | |
res[0] = false | |
else: | |
res[0] = true | |
res[1] = fs.queue.popFirst() | |
resFut.complete(res) | |
if not savedCb.isNil: savedCb() | |
# begin -new--new--new--new--new--new--new--new--new--new--new--new--new--new-- | |
if future.queue.len > 0 or future.finished: | |
newCb(future) | |
# end -new--new--new--new--new--new--new--new--new--new--new--new--new--new-- | |
else: | |
future.callback = newCb | |
return resFut | |
# not from asyncstreams but copied here for *this* asyncstreams | |
proc readAll*(future: FutureStream[string]): Future[string] {.async.} = | |
result = "" | |
while true: | |
let (hasValue, value) = await future.read() | |
if hasValue: | |
result.add(value) | |
else: | |
break | |
# end -old--old--old--old--old--old--old--old--old--old--old--old--old--old-- | |
else: | |
import asyncstreams | |
let time = now() | |
proc greeter {.async.} = | |
let serv = newAsyncSocket() | |
serv.setSockOpt(OptReuseAddr, true) | |
serv.bindAddr(Port(4001)) | |
serv.listen() | |
let client = await serv.accept() | |
waitFor client.send("hello\n") | |
client.close() | |
asyncCheck greeter() | |
echo "connecting: ", now() - time | |
let client = newAsyncSocket() | |
waitFor client.connect("localhost", Port(4001)) | |
let resp = waitFor client.recv(4000) | |
client.close() | |
let stream = newFutureStream[string]("client.connect") | |
echo "writing to stream: ", now() - time | |
waitFor stream.write(resp) | |
echo "completing stream: ", now() - time | |
stream.complete() | |
echo "reading stream: ", now() - time | |
when defined(readAll): | |
echo waitFor stream.readAll | |
else: | |
echo waitFor stream.read | |
echo "done: ", now() - time |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
vs.