Skip to content

Instantly share code, notes, and snippets.

@cheatfate
Created April 25, 2016 15:03
Show Gist options
  • Save cheatfate/4e9dd1d870b89f6cb958897d399ac26c to your computer and use it in GitHub Desktop.
Save cheatfate/4e9dd1d870b89f6cb958897d399ac26c to your computer and use it in GitHub Desktop.
#
#
# Nim's Runtime Library
# (c) Copyright 2015 Dominik Picheta
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
include "system/inclrtl"
import os, oids, tables, strutils, macros, times
import nativesockets, net
export Port, SocketFlag
#{.injectStmt: newGcInvariant().}
## AsyncDispatch
## *************
##
## This module implements asynchronous IO. This includes a dispatcher,
## a ``Future`` type implementation, and an ``async`` macro which allows
## asynchronous code to be written in a synchronous style with the ``await``
## keyword.
##
## The dispatcher acts as a kind of event loop. You must call ``poll`` on it
## (or a function which does so for you such as ``waitFor`` or ``runForever``)
## in order to poll for any outstanding events. The underlying implementation
## is based on epoll on Linux, IO Completion Ports on Windows and select on
## other operating systems.
##
## The ``poll`` function will not, on its own, return any events. Instead
## an appropriate ``Future`` object will be completed. A ``Future`` is a
## type which holds a value which is not yet available, but which *may* be
## available in the future. You can check whether a future is finished
## by using the ``finished`` function. When a future is finished it means that
## either the value that it holds is now available or it holds an error instead.
## The latter situation occurs when the operation to complete a future fails
## with an exception. You can distinguish between the two situations with the
## ``failed`` function.
##
## Future objects can also store a callback procedure which will be called
## automatically once the future completes.
##
## Futures therefore can be thought of as an implementation of the proactor
## pattern. In this
## pattern you make a request for an action, and once that action is fulfilled
## a future is completed with the result of that action. Requests can be
## made by calling the appropriate functions. For example: calling the ``recv``
## function will create a request for some data to be read from a socket. The
## future which the ``recv`` function returns will then complete once the
## requested amount of data is read **or** an exception occurs.
##
## Code to read some data from a socket may look something like this:
##
## .. code-block::nim
## var future = socket.recv(100)
## future.callback =
## proc () =
## echo(future.read)
##
## All asynchronous functions returning a ``Future`` will not block. They
## will not however return immediately. An asynchronous function will have
## code which will be executed before an asynchronous request is made, in most
## cases this code sets up the request.
##
## In the above example, the ``recv`` function will return a brand new
## ``Future`` instance once the request for data to be read from the socket
## is made. This ``Future`` instance will complete once the requested amount
## of data is read, in this case it is 100 bytes. The second line sets a
## callback on this future which will be called once the future completes.
## All the callback does is write the data stored in the future to ``stdout``.
## The ``read`` function is used for this and it checks whether the future
## completes with an error for you (if it did it will simply raise the
## error), if there is no error however it returns the value of the future.
##
## Asynchronous procedures
## -----------------------
##
## Asynchronous procedures remove the pain of working with callbacks. They do
## this by allowing you to write asynchronous code the same way as you would
## write synchronous code.
##
## An asynchronous procedure is marked using the ``{.async.}`` pragma.
## When marking a procedure with the ``{.async.}`` pragma it must have a
## ``Future[T]`` return type or no return type at all. If you do not specify
## a return type then ``Future[void]`` is assumed.
##
## Inside asynchronous procedures ``await`` can be used to call any
## procedures which return a
## ``Future``; this includes asynchronous procedures. When a procedure is
## "awaited", the asynchronous procedure it is awaited in will
## suspend its execution
## until the awaited procedure's Future completes. At which point the
## asynchronous procedure will resume its execution. During the period
## when an asynchronous procedure is suspended other asynchronous procedures
## will be run by the dispatcher.
##
## The ``await`` call may be used in many contexts. It can be used on the right
## hand side of a variable declaration: ``var data = await socket.recv(100)``,
## in which case the variable will be set to the value of the future
## automatically. It can be used to await a ``Future`` object, and it can
## be used to await a procedure returning a ``Future[void]``:
## ``await socket.send("foobar")``.
##
## Discarding futures
## ------------------
##
## Futures should **never** be discarded. This is because they may contain
## errors. If you do not care for the result of a Future then you should
## use the ``asyncCheck`` procedure instead of the ``discard`` keyword.
##
## Examples
## --------
##
## For examples take a look at the documentation for the modules implementing
## asynchronous IO. A good place to start is the
## `asyncnet module <asyncnet.html>`_.
##
## Limitations/Bugs
## ----------------
##
## * The effect system (``raises: []``) does not work with async procedures.
## * Can't await in a ``except`` body
## * Forward declarations for async procs are broken,
## link includes workaround: https://github.com/nim-lang/Nim/issues/3182.
## * FutureVar[T] needs to be completed manually.
# TODO: Check if yielded future is nil and throw a more meaningful exception
# -- Futures
type
FutureBase* = ref object of RootObj ## Untyped future.
cb: proc () {.closure,gcsafe.}
finished: bool
error*: ref Exception ## Stored exception
errorStackTrace*: string
when not defined(release):
stackTrace: string ## For debugging purposes only.
id: int
fromProc: string
Future*[T] = ref object of FutureBase ## Typed future.
value: T ## Stored value
FutureVar*[T] = distinct Future[T]
FutureError* = object of Exception
cause*: FutureBase
{.deprecated: [PFutureBase: FutureBase, PFuture: Future].}
when not defined(release):
var currentID = 0
proc call_soon(cbproc: proc ())
proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
## Creates a new future.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
new(result)
result.finished = false
when not defined(release):
result.stackTrace = getStackTrace()
result.id = currentID
result.fromProc = fromProc
currentID.inc()
proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
## Create a new ``FutureVar``. This Future type is ideally suited for
## situations where you want to avoid unnecessary allocations of Futures.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
result = FutureVar[T](newFuture[T](fromProc))
proc clean*[T](future: FutureVar[T]) =
## Resets the ``finished`` status of ``future``.
Future[T](future).finished = false
Future[T](future).error = nil
proc checkFinished[T](future: Future[T]) =
## Checks whether `future` is finished. If it is then raises a
## ``FutureError``.
when not defined(release):
if future.finished:
var msg = ""
msg.add("An attempt was made to complete a Future more than once. ")
msg.add("Details:")
msg.add("\n Future ID: " & $future.id)
msg.add("\n Created in proc: " & future.fromProc)
msg.add("\n Stack trace to moment of creation:")
msg.add("\n" & indent(future.stackTrace.strip(), 4))
when T is string:
msg.add("\n Contents (string): ")
msg.add("\n" & indent(future.value.repr, 4))
msg.add("\n Stack trace to moment of secondary completion:")
msg.add("\n" & indent(getStackTrace().strip(), 4))
var err = newException(FutureError, msg)
err.cause = future
raise err
proc complete*[T](future: Future[T], val: T) =
## Completes ``future`` with value ``val``.
#assert(not future.finished, "Future already finished, cannot finish twice.")
checkFinished(future)
assert(future.error == nil)
future.value = val
future.finished = true
if future.cb != nil:
future.cb()
proc complete*(future: Future[void]) =
## Completes a void ``future``.
#assert(not future.finished, "Future already finished, cannot finish twice.")
checkFinished(future)
assert(future.error == nil)
future.finished = true
if future.cb != nil:
future.cb()
proc complete*[T](future: FutureVar[T]) =
## Completes a ``FutureVar``.
template fut: expr = Future[T](future)
checkFinished(fut)
assert(fut.error == nil)
fut.finished = true
if fut.cb != nil:
fut.cb()
proc fail*[T](future: Future[T], error: ref Exception) =
## Completes ``future`` with ``error``.
#assert(not future.finished, "Future already finished, cannot finish twice.")
checkFinished(future)
future.finished = true
future.error = error
future.errorStackTrace =
if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
if future.cb != nil:
future.cb()
else:
# This is to prevent exceptions from being silently ignored when a future
# is discarded.
# TODO: This may turn out to be a bad idea.
# Turns out this is a bad idea.
#raise error
discard
proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
## Sets the callback proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
##
## **Note**: You most likely want the other ``callback`` setter which
## passes ``future`` as a param to the callback.
future.cb = cb
if future.finished:
call_soon(future.cb)
proc `callback=`*[T](future: Future[T],
cb: proc (future: Future[T]) {.closure,gcsafe.}) =
## Sets the callback proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
future.callback = proc () = cb(future)
proc injectStacktrace[T](future: Future[T]) =
# TODO: Come up with something better.
when not defined(release):
var msg = ""
msg.add("\n " & future.fromProc & "'s lead up to read of failed Future:")
if not future.errorStackTrace.isNil and future.errorStackTrace != "":
msg.add("\n" & indent(future.errorStackTrace.strip(), 4))
else:
msg.add("\n Empty or nil stack trace.")
future.error.msg.add(msg)
proc read*[T](future: Future[T]): T =
## Retrieves the value of ``future``. Future must be finished otherwise
## this function will fail with a ``ValueError`` exception.
##
## If the result of the future is an error then that error will be raised.
if future.finished:
if future.error != nil:
injectStacktrace(future)
raise future.error
when T isnot void:
return future.value
else:
# TODO: Make a custom exception type for this?
raise newException(ValueError, "Future still in progress.")
proc readError*[T](future: Future[T]): ref Exception =
## Retrieves the exception stored in ``future``.
##
## An ``ValueError`` exception will be thrown if no exception exists
## in the specified Future.
if future.error != nil: return future.error
else:
raise newException(ValueError, "No error in future.")
proc mget*[T](future: FutureVar[T]): var T =
## Returns a mutable value stored in ``future``.
##
## Unlike ``read``, this function will not raise an exception if the
## Future has not been finished.
result = Future[T](future).value
proc finished*[T](future: Future[T]): bool =
## Determines whether ``future`` has completed.
##
## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
future.finished
proc failed*(future: FutureBase): bool =
## Determines whether ``future`` completed with an error.
return future.error != nil
proc asyncCheck*[T](future: Future[T]) =
## Sets a callback on ``future`` which raises an exception if the future
## finished with an error.
##
## This should be used instead of ``discard`` to discard void futures.
future.callback =
proc () =
if future.failed:
injectStacktrace(future)
raise future.error
proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
## Returns a future which will complete once both ``fut1`` and ``fut2``
## complete.
var retFuture = newFuture[void]("asyncdispatch.`and`")
fut1.callback =
proc () =
if fut2.finished: retFuture.complete()
fut2.callback =
proc () =
if fut1.finished: retFuture.complete()
return retFuture
proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
## Returns a future which will complete once either ``fut1`` or ``fut2``
## complete.
var retFuture = newFuture[void]("asyncdispatch.`or`")
proc cb() =
if not retFuture.finished: retFuture.complete()
fut1.callback = cb
fut2.callback = cb
return retFuture
type
PDispatcherBase = ref object of RootRef
timers: seq[tuple[finishAt: float, fut: Future[void]]]
proc processTimers(p: PDispatcherBase) =
var oldTimers = p.timers
p.timers = @[]
for t in oldTimers:
if epochTime() >= t.finishAt:
t.fut.complete()
else:
p.timers.add(t)
when defined(windows) or defined(nimdoc):
import winlean, sets, hashes
type
CompletionKey = Dword
CompletionData* = object
fd*: AsyncFD # TODO: Rename this.
cb*: proc (fd: AsyncFD, bytesTransferred: Dword,
errcode: OSErrorCode) {.closure,gcsafe.}
PDispatcher* = ref object of PDispatcherBase
ioPort: Handle
handles: HashSet[AsyncFD]
CustomOverlapped = object of OVERLAPPED
data*: CompletionData
PCustomOverlapped* = ref CustomOverlapped
AsyncFD* = distinct int
{.deprecated: [TCompletionKey: CompletionKey, TAsyncFD: AsyncFD,
TCustomOverlapped: CustomOverlapped, TCompletionData: CompletionData].}
proc hash(x: AsyncFD): Hash {.borrow.}
proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
proc newDispatcher*(): PDispatcher =
## Creates a new Dispatcher instance.
new result
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
result.handles = initSet[AsyncFD]()
result.timers = @[]
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
## Retrieves the global thread-local dispatcher.
if gDisp.isNil: gDisp = newDispatcher()
result = gDisp
proc call_soon(cbproc: proc ()) =
discard
#getGlobalDispatcher().dQueue.enqueue(cbproc)
proc register*(fd: AsyncFD) =
## Registers ``fd`` with the dispatcher.
let p = getGlobalDispatcher()
if createIoCompletionPort(fd.Handle, p.ioPort,
cast[CompletionKey](fd), 1) == 0:
raiseOSError(osLastError())
p.handles.incl(fd)
proc verifyPresence(fd: AsyncFD) =
## Ensures that file descriptor has been registered with the dispatcher.
let p = getGlobalDispatcher()
if fd notin p.handles:
raise newException(ValueError,
"Operation performed on a socket which has not been registered with" &
" the dispatcher yet.")
proc poll*(timeout = 500) =
## Waits for completion events and processes them.
let p = getGlobalDispatcher()
if p.handles.len == 0 and p.timers.len == 0:
raise newException(ValueError,
"No handles or timers registered in dispatcher.")
let llTimeout =
if timeout == -1: winlean.INFINITE
else: timeout.int32
var lpNumberOfBytesTransferred: Dword
var lpCompletionKey: ULONG
var customOverlapped: PCustomOverlapped
let res = getQueuedCompletionStatus(p.ioPort,
addr lpNumberOfBytesTransferred, addr lpCompletionKey,
cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
# http://stackoverflow.com/a/12277264/492186
# TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
if res:
# This is useful for ensuring the reliability of the overlapped struct.
assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
customOverlapped.data.cb(customOverlapped.data.fd,
lpNumberOfBytesTransferred, OSErrorCode(-1))
GC_unref(customOverlapped)
else:
let errCode = osLastError()
if customOverlapped != nil:
assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
customOverlapped.data.cb(customOverlapped.data.fd,
lpNumberOfBytesTransferred, errCode)
GC_unref(customOverlapped)
else:
if errCode.int32 == WAIT_TIMEOUT:
# Timed out
discard
else: raiseOSError(errCode)
# Timer processing.
processTimers(p)
var connectExPtr: pointer = nil
var acceptExPtr: pointer = nil
var getAcceptExSockAddrsPtr: pointer = nil
proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
# Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
var bytesRet: Dword
fun = nil
result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
sizeof(GUID).Dword, addr fun, sizeof(pointer).Dword,
addr bytesRet, nil, nil) == 0
proc initAll() =
let dummySock = newNativeSocket()
if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX):
raiseOSError(osLastError())
if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX):
raiseOSError(osLastError())
if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS):
raiseOSError(osLastError())
proc connectEx(s: SocketHandle, name: ptr SockAddr, namelen: cint,
lpSendBuffer: pointer, dwSendDataLength: Dword,
lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool =
if connectExPtr.isNil: raise newException(ValueError, "Need to initialise ConnectEx().")
let fun =
cast[proc (s: SocketHandle, name: ptr SockAddr, namelen: cint,
lpSendBuffer: pointer, dwSendDataLength: Dword,
lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](connectExPtr)
result = fun(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent,
lpOverlapped)
proc acceptEx(listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer,
dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword,
lpOverlapped: POVERLAPPED): bool =
if acceptExPtr.isNil: raise newException(ValueError, "Need to initialise AcceptEx().")
let fun =
cast[proc (listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer,
dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword,
lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](acceptExPtr)
result = fun(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength,
dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived,
lpOverlapped)
proc getAcceptExSockaddrs(lpOutputBuffer: pointer,
dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: Dword,
LocalSockaddr: ptr ptr SockAddr, LocalSockaddrLength: LPInt,
RemoteSockaddr: ptr ptr SockAddr, RemoteSockaddrLength: LPInt) =
if getAcceptExSockAddrsPtr.isNil:
raise newException(ValueError, "Need to initialise getAcceptExSockAddrs().")
let fun =
cast[proc (lpOutputBuffer: pointer,
dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength: Dword, LocalSockaddr: ptr ptr SockAddr,
LocalSockaddrLength: LPInt, RemoteSockaddr: ptr ptr SockAddr,
RemoteSockaddrLength: LPInt) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr)
fun(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength,
RemoteSockaddr, RemoteSockaddrLength)
proc connect*(socket: AsyncFD, address: string, port: Port,
domain = nativesockets.AF_INET): Future[void] =
## Connects ``socket`` to server at ``address:port``.
##
## Returns a ``Future`` which will complete when the connection succeeds
## or an error occurs.
verifyPresence(socket)
var retFuture = newFuture[void]("connect")
# Apparently ``ConnectEx`` expects the socket to be initially bound:
var saddr: Sockaddr_in
saddr.sin_family = int16(toInt(domain))
saddr.sin_port = 0
saddr.sin_addr.s_addr = INADDR_ANY
if bindAddr(socket.SocketHandle, cast[ptr SockAddr](addr(saddr)),
sizeof(saddr).SockLen) < 0'i32:
raiseOSError(osLastError())
var aiList = getAddrInfo(address, port, domain)
var success = false
var lastError: OSErrorCode
var it = aiList
while it != nil:
# "the OVERLAPPED structure must remain valid until the I/O completes"
# http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
)
var ret = connectEx(socket.SocketHandle, it.ai_addr,
sizeof(Sockaddr_in).cint, nil, 0, nil,
cast[POVERLAPPED](ol))
if ret:
# Request to connect completed immediately.
success = true
retFuture.complete()
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
break
else:
lastError = osLastError()
if lastError.int32 == ERROR_IO_PENDING:
# In this case ``ol`` will be deallocated in ``poll``.
success = true
break
else:
GC_unref(ol)
success = false
it = it.ai_next
dealloc(aiList)
if not success:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
return retFuture
proc recv*(socket: AsyncFD, size: int,
flags = {SocketFlag.SafeDisconn}): Future[string] =
## Reads **up to** ``size`` bytes from ``socket``. Returned future will
## complete once all the data requested is read, a part of the data has been
## read, or the socket has disconnected in which case the future will
## complete with a value of ``""``.
##
## **Warning**: The ``Peek`` socket flag is not supported on Windows.
# Things to note:
# * When WSARecv completes immediately then ``bytesReceived`` is very
# unreliable.
# * Still need to implement message-oriented socket disconnection,
# '\0' in the message currently signifies a socket disconnect. Who
# knows what will happen when someone sends that to our socket.
verifyPresence(socket)
assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
var retFuture = newFuture[string]("recv")
var dataBuf: TWSABuf
dataBuf.buf = cast[cstring](alloc0(size))
dataBuf.len = size.ULONG
var bytesReceived: Dword
var flagsio = flags.toOSFlags().Dword
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
if bytesCount == 0 and dataBuf.buf[0] == '\0':
retFuture.complete("")
else:
var data = newString(bytesCount)
assert bytesCount <= size
copyMem(addr data[0], addr dataBuf.buf[0], bytesCount)
retFuture.complete($data)
else:
if flags.isDisconnectionError(errcode):
retFuture.complete("")
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
if dataBuf.buf != nil:
dealloc dataBuf.buf
dataBuf.buf = nil
)
let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
addr flagsio, cast[POVERLAPPED](ol), nil)
if ret == -1:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
if dataBuf.buf != nil:
dealloc dataBuf.buf
dataBuf.buf = nil
GC_unref(ol)
if flags.isDisconnectionError(err):
retFuture.complete("")
else:
retFuture.fail(newException(OSError, osErrorMsg(err)))
elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0':
# We have to ensure that the buffer is empty because WSARecv will tell
# us immediately when it was disconnected, even when there is still
# data in the buffer.
# We want to give the user as much data as we can. So we only return
# the empty string (which signals a disconnection) when there is
# nothing left to read.
retFuture.complete("")
# TODO: "For message-oriented sockets, where a zero byte message is often
# allowable, a failure with an error code of WSAEDISCON is used to
# indicate graceful closure."
# ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx
else:
# Request to read completed immediately.
# From my tests bytesReceived isn't reliable.
let realSize =
if bytesReceived == 0:
size
else:
bytesReceived
var data = newString(realSize)
assert realSize <= size
copyMem(addr data[0], addr dataBuf.buf[0], realSize)
#dealloc dataBuf.buf
retFuture.complete($data)
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
return retFuture
proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
flags = {SocketFlag.SafeDisconn}): Future[int] =
## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
## at least be of that size. Returned future will complete once all the
## data requested is read, a part of the data has been read, or the socket
## has disconnected in which case the future will complete with a value of
## ``0``.
##
## **Warning**: The ``Peek`` socket flag is not supported on Windows.
# Things to note:
# * When WSARecv completes immediately then ``bytesReceived`` is very
# unreliable.
# * Still need to implement message-oriented socket disconnection,
# '\0' in the message currently signifies a socket disconnect. Who
# knows what will happen when someone sends that to our socket.
verifyPresence(socket)
assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
var retFuture = newFuture[int]("recvInto")
#buf[] = '\0'
var dataBuf: TWSABuf
dataBuf.buf = buf
dataBuf.len = size.ULONG
var bytesReceived: Dword
var flagsio = flags.toOSFlags().Dword
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
if bytesCount == 0 and dataBuf.buf[0] == '\0':
retFuture.complete(0)
else:
retFuture.complete(bytesCount)
else:
if flags.isDisconnectionError(errcode):
retFuture.complete(0)
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
if dataBuf.buf != nil:
dataBuf.buf = nil
)
let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
addr flagsio, cast[POVERLAPPED](ol), nil)
if ret == -1:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
if dataBuf.buf != nil:
dataBuf.buf = nil
GC_unref(ol)
if flags.isDisconnectionError(err):
retFuture.complete(0)
else:
retFuture.fail(newException(OSError, osErrorMsg(err)))
elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0':
# We have to ensure that the buffer is empty because WSARecv will tell
# us immediately when it was disconnected, even when there is still
# data in the buffer.
# We want to give the user as much data as we can. So we only return
# the empty string (which signals a disconnection) when there is
# nothing left to read.
retFuture.complete(0)
# TODO: "For message-oriented sockets, where a zero byte message is often
# allowable, a failure with an error code of WSAEDISCON is used to
# indicate graceful closure."
# ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx
else:
# Request to read completed immediately.
# From my tests bytesReceived isn't reliable.
let realSize =
if bytesReceived == 0:
size
else:
bytesReceived
assert realSize <= size
retFuture.complete(realSize)
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
return retFuture
proc send*(socket: AsyncFD, data: string,
flags = {SocketFlag.SafeDisconn}): Future[void] =
## Sends ``data`` to ``socket``. The returned future will complete once all
## data has been sent.
verifyPresence(socket)
var retFuture = newFuture[void]("send")
var dataBuf: TWSABuf
dataBuf.buf = data # since this is not used in a callback, this is fine
dataBuf.len = data.len.ULONG
var bytesReceived, lowFlags: Dword
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
retFuture.complete()
else:
if flags.isDisconnectionError(errcode):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
)
let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
lowFlags, cast[POVERLAPPED](ol), nil)
if ret == -1:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
GC_unref(ol)
if flags.isDisconnectionError(err):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(err)))
else:
retFuture.complete()
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
return retFuture
proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
Future[tuple[address: string, client: AsyncFD]] =
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection and the remote address of the client.
## The future will complete when the connection is successfully accepted.
##
## The resulting client socket is automatically registered to the
## dispatcher.
##
## The ``accept`` call may result in an error if the connecting socket
## disconnects during the duration of the ``accept``. If the ``SafeDisconn``
## flag is specified then this error will not be raised and instead
## accept will be called again.
verifyPresence(socket)
var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
var clientSock = newNativeSocket()
if clientSock == osInvalidSocket: raiseOSError(osLastError())
const lpOutputLen = 1024
var lpOutputBuf = newString(lpOutputLen)
var dwBytesReceived: Dword
let dwReceiveDataLength = 0.Dword # We don't want any data to be read.
let dwLocalAddressLength = Dword(sizeof (Sockaddr_in) + 16)
let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in) + 16)
template completeAccept(): stmt {.immediate, dirty.} =
var listenSock = socket
let setoptRet = setsockopt(clientSock, SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
sizeof(listenSock).SockLen)
if setoptRet != 0: raiseOSError(osLastError())
var localSockaddr, remoteSockaddr: ptr SockAddr
var localLen, remoteLen: int32
getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength,
dwLocalAddressLength, dwRemoteAddressLength,
addr localSockaddr, addr localLen,
addr remoteSockaddr, addr remoteLen)
register(clientSock.AsyncFD)
# TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
retFuture.complete(
(address: $inet_ntoa(cast[ptr Sockaddr_in](remoteSockAddr).sin_addr),
client: clientSock.AsyncFD)
)
template failAccept(errcode): stmt =
if flags.isDisconnectionError(errcode):
var newAcceptFut = acceptAddr(socket, flags)
newAcceptFut.callback =
proc () =
if newAcceptFut.failed:
retFuture.fail(newAcceptFut.readError)
else:
retFuture.complete(newAcceptFut.read)
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
completeAccept()
else:
failAccept(errcode)
)
# http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0],
dwReceiveDataLength,
dwLocalAddressLength,
dwRemoteAddressLength,
addr dwBytesReceived, cast[POVERLAPPED](ol))
if not ret:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
failAccept(err)
GC_unref(ol)
else:
completeAccept()
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
return retFuture
proc newAsyncNativeSocket*(domain, sockType, protocol: cint): AsyncFD =
## Creates a new socket and registers it with the dispatcher implicitly.
result = newNativeSocket(domain, sockType, protocol).AsyncFD
result.SocketHandle.setBlocking(false)
register(result)
proc newAsyncNativeSocket*(domain: Domain = nativesockets.AF_INET,
sockType: SockType = SOCK_STREAM,
protocol: Protocol = IPPROTO_TCP): AsyncFD =
## Creates a new socket and registers it with the dispatcher implicitly.
result = newNativeSocket(domain, sockType, protocol).AsyncFD
result.SocketHandle.setBlocking(false)
register(result)
proc closeSocket*(socket: AsyncFD) =
## Closes a socket and ensures that it is unregistered.
socket.SocketHandle.close()
getGlobalDispatcher().handles.excl(socket)
proc unregister*(fd: AsyncFD) =
## Unregisters ``fd``.
getGlobalDispatcher().handles.excl(fd)
initAll()
else:
import selectors
when defined(windows):
import winlean
const
EINTR = WSAEINPROGRESS
EINPROGRESS = WSAEINPROGRESS
EWOULDBLOCK = WSAEWOULDBLOCK
EAGAIN = EINPROGRESS
MSG_NOSIGNAL = 0
else:
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
MSG_NOSIGNAL
type
AsyncFD* = distinct cint
Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
PData* = ref object of RootRef
fd: AsyncFD
readCBs: seq[Callback]
writeCBs: seq[Callback]
PDispatcher* = ref object of PDispatcherBase
selector: Selector
{.deprecated: [TAsyncFD: AsyncFD, TCallback: Callback].}
proc `==`*(x, y: AsyncFD): bool {.borrow.}
proc newDispatcher*(): PDispatcher =
new result
result.selector = newSelector()
result.timers = @[]
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil: gDisp = newDispatcher()
result = gDisp
proc update(fd: AsyncFD, events: set[Event]) =
let p = getGlobalDispatcher()
assert fd.SocketHandle in p.selector
p.selector.update(fd.SocketHandle, events)
proc register*(fd: AsyncFD) =
let p = getGlobalDispatcher()
var data = PData(fd: fd, readCBs: @[], writeCBs: @[])
p.selector.register(fd.SocketHandle, {}, data.RootRef)
proc newAsyncNativeSocket*(domain: cint, sockType: cint,
protocol: cint): AsyncFD =
result = newNativeSocket(domain, sockType, protocol).AsyncFD
result.SocketHandle.setBlocking(false)
when defined(macosx):
result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
register(result)
proc newAsyncNativeSocket*(domain: Domain = AF_INET,
sockType: SockType = SOCK_STREAM,
protocol: Protocol = IPPROTO_TCP): AsyncFD =
result = newNativeSocket(domain, sockType, protocol).AsyncFD
result.SocketHandle.setBlocking(false)
when defined(macosx):
result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
register(result)
proc closeSocket*(sock: AsyncFD) =
let disp = getGlobalDispatcher()
disp.selector.unregister(sock.SocketHandle)
sock.SocketHandle.close()
proc unregister*(fd: AsyncFD) =
getGlobalDispatcher().selector.unregister(fd.SocketHandle)
proc addRead*(fd: AsyncFD, cb: Callback) =
let p = getGlobalDispatcher()
if fd.SocketHandle notin p.selector:
raise newException(ValueError, "File descriptor not registered.")
p.selector[fd.SocketHandle].data.PData.readCBs.add(cb)
update(fd, p.selector[fd.SocketHandle].events + {EvRead})
proc addWrite*(fd: AsyncFD, cb: Callback) =
let p = getGlobalDispatcher()
if fd.SocketHandle notin p.selector:
raise newException(ValueError, "File descriptor not registered.")
p.selector[fd.SocketHandle].data.PData.writeCBs.add(cb)
update(fd, p.selector[fd.SocketHandle].events + {EvWrite})
proc poll*(timeout = 500) =
let p = getGlobalDispatcher()
for info in p.selector.select(timeout):
let data = PData(info.key.data)
assert data.fd == info.key.fd.AsyncFD
#echo("In poll ", data.fd.cint)
# There may be EvError here, but we handle them in callbacks,
# so that exceptions can be raised from `send(...)` and
# `recv(...)` routines.
if EvRead in info.events:
# Callback may add items to ``data.readCBs`` which causes issues if
# we are iterating over ``data.readCBs`` at the same time. We therefore
# make a copy to iterate over.
let currentCBs = data.readCBs
data.readCBs = @[]
for cb in currentCBs:
if not cb(data.fd):
# Callback wants to be called again.
data.readCBs.add(cb)
if EvWrite in info.events:
let currentCBs = data.writeCBs
data.writeCBs = @[]
for cb in currentCBs:
if not cb(data.fd):
# Callback wants to be called again.
data.writeCBs.add(cb)
if info.key in p.selector:
var newEvents: set[Event]
if data.readCBs.len != 0: newEvents = {EvRead}
if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite}
if newEvents != info.key.events:
update(data.fd, newEvents)
else:
# FD no longer a part of the selector. Likely been closed
# (e.g. socket disconnected).
discard
processTimers(p)
proc connect*(socket: AsyncFD, address: string, port: Port,
domain = AF_INET): Future[void] =
var retFuture = newFuture[void]("connect")
proc cb(fd: AsyncFD): bool =
var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
if ret == 0:
# We have connected.
retFuture.complete()
return true
elif ret == EINTR:
# interrupted, keep waiting
return false
else:
retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
return true
assert getSockDomain(socket.SocketHandle) == domain
var aiList = getAddrInfo(address, port, domain)
var success = false
var lastError: OSErrorCode
var it = aiList
while it != nil:
var ret = connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen)
if ret == 0:
# Request to connect completed immediately.
success = true
retFuture.complete()
break
else:
lastError = osLastError()
if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
success = true
addWrite(socket, cb)
break
else:
success = false
it = it.ai_next
dealloc(aiList)
if not success:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
return retFuture
proc recv*(socket: AsyncFD, size: int,
flags = {SocketFlag.SafeDisconn}): Future[string] =
var retFuture = newFuture[string]("recv")
var readBuffer = newString(size)
proc cb(sock: AsyncFD): bool =
result = true
let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
flags.toOSFlags())
if res < 0:
let lastError = osLastError()
if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
if flags.isDisconnectionError(lastError):
retFuture.complete("")
else:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
result = false # We still want this callback to be called.
elif res == 0:
# Disconnected
retFuture.complete("")
else:
readBuffer.setLen(res)
retFuture.complete(readBuffer)
# TODO: The following causes a massive slowdown.
#if not cb(socket):
addRead(socket, cb)
return retFuture
proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
flags = {SocketFlag.SafeDisconn}): Future[int] =
var retFuture = newFuture[int]("recvInto")
proc cb(sock: AsyncFD): bool =
result = true
let res = recv(sock.SocketHandle, buf, size.cint,
flags.toOSFlags())
if res < 0:
let lastError = osLastError()
if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
if flags.isDisconnectionError(lastError):
retFuture.complete(0)
else:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
result = false # We still want this callback to be called.
else:
retFuture.complete(res)
# TODO: The following causes a massive slowdown.
#if not cb(socket):
addRead(socket, cb)
return retFuture
proc send*(socket: AsyncFD, data: string,
flags = {SocketFlag.SafeDisconn}): Future[void] =
var retFuture = newFuture[void]("send")
var written = 0
proc cb(sock: AsyncFD): bool =
result = true
let netSize = data.len-written
var d = data.cstring
let res = send(sock.SocketHandle, addr d[written], netSize.cint,
MSG_NOSIGNAL)
if res < 0:
let lastError = osLastError()
if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
if flags.isDisconnectionError(lastError):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
result = false # We still want this callback to be called.
else:
written.inc(res)
if res != netSize:
result = false # We still have data to send.
else:
retFuture.complete()
# TODO: The following causes crashes.
#if not cb(socket):
addWrite(socket, cb)
return retFuture
proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
Future[tuple[address: string, client: AsyncFD]] =
var retFuture = newFuture[tuple[address: string,
client: AsyncFD]]("acceptAddr")
proc cb(sock: AsyncFD): bool =
result = true
var sockAddress: Sockaddr_storage
var addrLen = sizeof(sockAddress).Socklen
var client = accept(sock.SocketHandle,
cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
if client == osInvalidSocket:
let lastError = osLastError()
assert lastError.int32 notin {EWOULDBLOCK, EAGAIN}
if lastError.int32 == EINTR:
return false
else:
if flags.isDisconnectionError(lastError):
return false
else:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
register(client.AsyncFD)
retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), client.AsyncFD))
addRead(socket, cb)
return retFuture
proc sleepAsync*(ms: int): Future[void] =
## Suspends the execution of the current async procedure for the next
## ``ms`` milliseconds.
var retFuture = newFuture[void]("sleepAsync")
let p = getGlobalDispatcher()
p.timers.add((epochTime() + (ms / 1000), retFuture))
return retFuture
proc accept*(socket: AsyncFD,
flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] =
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection.
## The future will complete when the connection is successfully accepted.
var retFut = newFuture[AsyncFD]("accept")
var fut = acceptAddr(socket, flags)
fut.callback =
proc (future: Future[tuple[address: string, client: AsyncFD]]) =
assert future.finished
if future.failed:
retFut.fail(future.error)
else:
retFut.complete(future.read.client)
return retFut
# -- Await Macro
proc skipUntilStmtList(node: NimNode): NimNode {.compileTime.} =
# Skips a nest of StmtList's.
result = node
if node[0].kind == nnkStmtList:
result = skipUntilStmtList(node[0])
proc skipStmtList(node: NimNode): NimNode {.compileTime.} =
result = node
if node[0].kind == nnkStmtList:
result = node[0]
template createCb(retFutureSym, iteratorNameSym,
name: expr): stmt {.immediate.} =
var nameIterVar = iteratorNameSym
#{.push stackTrace: off.}
proc cb {.closure,gcsafe.} =
try:
if not nameIterVar.finished:
var next = nameIterVar()
if next == nil:
assert retFutureSym.finished, "Async procedure's (" &
name & ") return Future was not finished."
else:
next.callback = cb
except:
if retFutureSym.finished:
# Take a look at tasyncexceptions for the bug which this fixes.
# That test explains it better than I can here.
raise
else:
retFutureSym.fail(getCurrentException())
cb()
#{.pop.}
proc generateExceptionCheck(futSym,
tryStmt, rootReceiver, fromNode: NimNode): NimNode {.compileTime.} =
if tryStmt.kind == nnkNilLit:
result = rootReceiver
else:
var exceptionChecks: seq[tuple[cond, body: NimNode]] = @[]
let errorNode = newDotExpr(futSym, newIdentNode("error"))
for i in 1 .. <tryStmt.len:
let exceptBranch = tryStmt[i]
if exceptBranch[0].kind == nnkStmtList:
exceptionChecks.add((newIdentNode("true"), exceptBranch[0]))
else:
var exceptIdentCount = 0
var ifCond: NimNode
for i in 0 .. <exceptBranch.len:
let child = exceptBranch[i]
if child.kind == nnkIdent:
let cond = infix(errorNode, "of", child)
if exceptIdentCount == 0:
ifCond = cond
else:
ifCond = infix(ifCond, "or", cond)
else:
break
exceptIdentCount.inc
expectKind(exceptBranch[exceptIdentCount], nnkStmtList)
exceptionChecks.add((ifCond, exceptBranch[exceptIdentCount]))
# -> -> else: raise futSym.error
exceptionChecks.add((newIdentNode("true"),
newNimNode(nnkRaiseStmt).add(errorNode)))
# Read the future if there is no error.
# -> else: futSym.read
let elseNode = newNimNode(nnkElse, fromNode)
elseNode.add newNimNode(nnkStmtList, fromNode)
elseNode[0].add rootReceiver
let ifBody = newStmtList()
ifBody.add newCall(newIdentNode("setCurrentException"), errorNode)
ifBody.add newIfStmt(exceptionChecks)
ifBody.add newCall(newIdentNode("setCurrentException"), newNilLit())
result = newIfStmt(
(newDotExpr(futSym, newIdentNode("failed")), ifBody)
)
result.add elseNode
template useVar(result: var NimNode, futureVarNode: NimNode, valueReceiver,
rootReceiver: expr, fromNode: NimNode) =
## Params:
## futureVarNode: The NimNode which is a symbol identifying the Future[T]
## variable to yield.
## fromNode: Used for better debug information (to give context).
## valueReceiver: The node which defines an expression that retrieves the
## future's value.
##
## rootReceiver: ??? TODO
# -> yield future<x>
result.add newNimNode(nnkYieldStmt, fromNode).add(futureVarNode)
# -> future<x>.read
valueReceiver = newDotExpr(futureVarNode, newIdentNode("read"))
result.add generateExceptionCheck(futureVarNode, tryStmt, rootReceiver,
fromNode)
template createVar(result: var NimNode, futSymName: string,
asyncProc: NimNode,
valueReceiver, rootReceiver: expr,
fromNode: NimNode) =
result = newNimNode(nnkStmtList, fromNode)
var futSym = genSym(nskVar, "future")
result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y
useVar(result, futSym, valueReceiver, rootReceiver, fromNode)
proc processBody(node, retFutureSym: NimNode,
subTypeIsVoid: bool,
tryStmt: NimNode): NimNode {.compileTime.} =
#echo(node.treeRepr)
result = node
case node.kind
of nnkReturnStmt:
result = newNimNode(nnkStmtList, node)
if node[0].kind == nnkEmpty:
if not subTypeIsVoid:
result.add newCall(newIdentNode("complete"), retFutureSym,
newIdentNode("result"))
else:
result.add newCall(newIdentNode("complete"), retFutureSym)
else:
result.add newCall(newIdentNode("complete"), retFutureSym,
node[0].processBody(retFutureSym, subTypeIsVoid, tryStmt))
result.add newNimNode(nnkReturnStmt, node).add(newNilLit())
return # Don't process the children of this return stmt
of nnkCommand, nnkCall:
if node[0].kind == nnkIdent and node[0].ident == !"await":
case node[1].kind
of nnkIdent, nnkInfix:
# await x
result = newNimNode(nnkStmtList, node)
var futureValue: NimNode
result.useVar(node[1], futureValue, futureValue, node)
# -> yield x
# -> x.read()
of nnkCall, nnkCommand:
# await foo(p, x)
var futureValue: NimNode
result.createVar("future" & $node[1][0].toStrLit, node[1], futureValue,
futureValue, node)
else:
error("Invalid node kind in 'await', got: " & $node[1].kind)
elif node.len > 1 and node[1].kind == nnkCommand and
node[1][0].kind == nnkIdent and node[1][0].ident == !"await":
# foo await x
var newCommand = node
result.createVar("future" & $node[0].toStrLit, node[1][1], newCommand[1],
newCommand, node)
of nnkVarSection, nnkLetSection:
case node[0][2].kind
of nnkCommand:
if node[0][2][0].kind == nnkIdent and node[0][2][0].ident == !"await":
# var x = await y
var newVarSection = node # TODO: Should this use copyNimNode?
result.createVar("future" & $node[0][0].ident, node[0][2][1],
newVarSection[0][2], newVarSection, node)
else: discard
of nnkAsgn:
case node[1].kind
of nnkCommand:
if node[1][0].ident == !"await":
# x = await y
var newAsgn = node
result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn, node)
else: discard
of nnkDiscardStmt:
# discard await x
if node[0].kind == nnkCommand and node[0][0].kind == nnkIdent and
node[0][0].ident == !"await":
var newDiscard = node
result.createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1],
newDiscard[0], newDiscard, node)
of nnkTryStmt:
# try: await x; except: ...
result = newNimNode(nnkStmtList, node)
template wrapInTry(n, tryBody: expr) =
var temp = n
n[0] = tryBody
tryBody = temp
# Transform ``except`` body.
# TODO: Could we perform some ``await`` transformation here to get it
# working in ``except``?
tryBody[1] = processBody(n[1], retFutureSym, subTypeIsVoid, nil)
proc processForTry(n: NimNode, i: var int,
res: NimNode): bool {.compileTime.} =
## Transforms the body of the tryStmt. Does not transform the
## body in ``except``.
## Returns true if the tryStmt node was transformed into an ifStmt.
result = false
var skipped = n.skipStmtList()
while i < skipped.len:
var processed = processBody(skipped[i], retFutureSym,
subTypeIsVoid, n)
# Check if we transformed the node into an exception check.
# This suggests skipped[i] contains ``await``.
if processed.kind != skipped[i].kind or processed.len != skipped[i].len:
processed = processed.skipUntilStmtList()
expectKind(processed, nnkStmtList)
expectKind(processed[2][1], nnkElse)
i.inc
if not processForTry(n, i, processed[2][1][0]):
# We need to wrap the nnkElse nodes back into a tryStmt.
# As they are executed if an exception does not happen
# inside the awaited future.
# The following code will wrap the nodes inside the
# original tryStmt.
wrapInTry(n, processed[2][1][0])
res.add processed
result = true
else:
res.add skipped[i]
i.inc
var i = 0
if not processForTry(node, i, result):
# If the tryStmt hasn't been transformed we can just put the body
# back into it.
wrapInTry(node, result)
return
else: discard
for i in 0 .. <result.len:
result[i] = processBody(result[i], retFutureSym, subTypeIsVoid, nil)
proc getName(node: NimNode): string {.compileTime.} =
case node.kind
of nnkPostfix:
return $node[1].ident
of nnkIdent:
return $node.ident
of nnkEmpty:
return "anonymous"
else:
error("Unknown name.")
proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
## This macro transforms a single procedure into a closure iterator.
## The ``async`` macro supports a stmtList holding multiple async procedures.
if prc.kind notin {nnkProcDef, nnkLambda}:
error("Cannot transform this node kind into an async proc." &
" Proc definition or lambda node expected.")
hint("Processing " & prc[0].getName & " as an async proc.")
let returnType = prc[3][0]
var baseType: NimNode
# Verify that the return type is a Future[T]
if returnType.kind == nnkBracketExpr:
let fut = repr(returnType[0])
if fut != "Future":
error("Expected return type of 'Future' got '" & fut & "'")
baseType = returnType[1]
elif returnType.kind in nnkCallKinds and $returnType[0] == "[]":
let fut = repr(returnType[1])
if fut != "Future":
error("Expected return type of 'Future' got '" & fut & "'")
baseType = returnType[2]
elif returnType.kind == nnkEmpty:
baseType = returnType
else:
error("Expected return type of 'Future' got '" & repr(returnType) & "'")
let subtypeIsVoid = returnType.kind == nnkEmpty or
(baseType.kind == nnkIdent and returnType[1].ident == !"void")
var outerProcBody = newNimNode(nnkStmtList, prc[6])
# -> var retFuture = newFuture[T]()
var retFutureSym = genSym(nskVar, "retFuture")
var subRetType =
if returnType.kind == nnkEmpty: newIdentNode("void")
else: baseType
outerProcBody.add(
newVarStmt(retFutureSym,
newCall(
newNimNode(nnkBracketExpr, prc[6]).add(
newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`.
subRetType),
newLit(prc[0].getName)))) # Get type from return type of this proc
# -> iterator nameIter(): FutureBase {.closure.} =
# -> {.push warning[resultshadowed]: off.}
# -> var result: T
# -> {.pop.}
# -> <proc_body>
# -> complete(retFuture, result)
var iteratorNameSym = genSym(nskIterator, $prc[0].getName & "Iter")
var procBody = prc[6].processBody(retFutureSym, subtypeIsVoid, nil)
if not subtypeIsVoid:
procBody.insert(0, newNimNode(nnkPragma).add(newIdentNode("push"),
newNimNode(nnkExprColonExpr).add(newNimNode(nnkBracketExpr).add(
newIdentNode("warning"), newIdentNode("resultshadowed")),
newIdentNode("off")))) # -> {.push warning[resultshadowed]: off.}
procBody.insert(1, newNimNode(nnkVarSection, prc[6]).add(
newIdentDefs(newIdentNode("result"), baseType))) # -> var result: T
procBody.insert(2, newNimNode(nnkPragma).add(
newIdentNode("pop"))) # -> {.pop.})
procBody.add(
newCall(newIdentNode("complete"),
retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result)
else:
# -> complete(retFuture)
procBody.add(newCall(newIdentNode("complete"), retFutureSym))
var closureIterator = newProc(iteratorNameSym, [newIdentNode("FutureBase")],
procBody, nnkIteratorDef)
closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure"))
outerProcBody.add(closureIterator)
# -> createCb(retFuture)
#var cbName = newIdentNode("cb")
var procCb = newCall(bindSym"createCb", retFutureSym, iteratorNameSym,
newStrLitNode(prc[0].getName))
outerProcBody.add procCb
# -> return retFuture
outerProcBody.add newNimNode(nnkReturnStmt, prc[6][prc[6].len-1]).add(retFutureSym)
result = prc
# Remove the 'async' pragma.
for i in 0 .. <result[4].len:
if result[4][i].kind == nnkIdent and result[4][i].ident == !"async":
result[4].del(i)
result[4] = newEmptyNode()
if subtypeIsVoid:
# Add discardable pragma.
if returnType.kind == nnkEmpty:
# Add Future[void]
result[3][0] = parseExpr("Future[void]")
result[6] = outerProcBody
#echo(treeRepr(result))
#if prc[0].getName == "g":
# echo(toStrLit(result))
macro async*(prc: stmt): stmt {.immediate.} =
## Macro which processes async procedures into the appropriate
## iterators and yield statements.
if prc.kind == nnkStmtList:
for oneProc in prc:
result = newStmtList()
result.add asyncSingleProc(oneProc)
else:
result = asyncSingleProc(prc)
proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
## Reads a line of data from ``socket``. Returned future will complete once
## a full line is read or an error occurs.
##
## If a full line is read ``\r\L`` is not
## added to ``line``, however if solely ``\r\L`` is read then ``line``
## will be set to it.
##
## If the socket is disconnected, ``line`` will be set to ``""``.
##
## If the socket is disconnected in the middle of a line (before ``\r\L``
## is read) then line will be set to ``""``.
## The partial line **will be lost**.
##
## **Warning**: This assumes that lines are delimited by ``\r\L``.
##
## **Note**: This procedure is mostly used for testing. You likely want to
## use ``asyncnet.recvLine`` instead.
template addNLIfEmpty(): stmt =
if result.len == 0:
result.add("\c\L")
result = ""
var c = ""
while true:
c = await recv(socket, 1)
if c.len == 0:
return ""
if c == "\r":
c = await recv(socket, 1)
assert c == "\l"
addNLIfEmpty()
return
elif c == "\L":
addNLIfEmpty()
return
add(result, c)
proc runForever*() =
## Begins a never ending global dispatcher poll loop.
while true:
poll()
proc waitFor*[T](fut: Future[T]): T =
## **Blocks** the current thread until the specified future completes.
while not fut.finished:
poll()
fut.read
@cheatfate
Copy link
Author

Last revision causes this error:
asyncdispatch.nim(882, 7) Error: type mismatch: got (proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode){.closure, locks: <unknown>.}) but expected 'proc (fd: AsyncFD, bytesTransferred: DWORD, errcode: OSErrorCode){.closure, gcsafe.}'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment