Skip to content

Instantly share code, notes, and snippets.

@Varriount
Forked from dom96/proactor.nim
Last active January 4, 2016 13:09
Show Gist options
  • Save Varriount/637e63ef29d2581ba828 to your computer and use it in GitHub Desktop.
Save Varriount/637e63ef29d2581ba828 to your computer and use it in GitHub Desktop.

Revisions

  1. Varriount revised this gist Jan 26, 2014. 1 changed file with 6 additions and 6 deletions.
    12 changes: 6 additions & 6 deletions proactor.nim
    Original file line number Diff line number Diff line change
    @@ -163,13 +163,13 @@ proc connect(p: PProactor, socket: TSocketHandle, address: string, port: TPort,
    var lastError: TOSErrorCode
    var it = aiList
    while it != nil:
    # var ol: ptr TOverlapped
    # var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
    # nil, 0, nil, addr ol)
    var ol = cast[ptr TOverlapped](alloc0(sizeof(TOverlapped)))
    var ol: TOverlapped
    var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
    nil, 0, nil, ol)
    dealloc(ol)
    nil, 0, nil, addr ol)
    # var ol = cast[ptr TOverlapped](alloc0(sizeof(TOverlapped)))
    # var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
    # # nil, 0, nil, ol)
    # dealloc(ol)
    if ret:
    echo "Connected"
    success = true
  2. Varriount revised this gist Jan 26, 2014. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions proactor.nim
    Original file line number Diff line number Diff line change
    @@ -163,8 +163,8 @@ proc connect(p: PProactor, socket: TSocketHandle, address: string, port: TPort,
    var lastError: TOSErrorCode
    var it = aiList
    while it != nil:
    #var ol: TOverlapped
    #var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
    # var ol: ptr TOverlapped
    # var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
    # nil, 0, nil, addr ol)
    var ol = cast[ptr TOverlapped](alloc0(sizeof(TOverlapped)))
    var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
  3. @dom96 dom96 created this gist Jan 25, 2014.
    237 changes: 237 additions & 0 deletions proactor.nim
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,237 @@
    import os, oids, tables, strutils

    import windows
    from winlean import TSocketHandle, TSockAddr, TSockLen, TAddrInfo, getAddrInfo,
    WSAEWOULDBLOCK, freeaddrinfo, Tsockaddr_in, inet_addr, IN_ADDRANY, bindSocket

    import socketsll

    ## Proactor
    ## --------
    ##
    ## This module implements the proactor pattern, unlike selectors the
    ## asynchronous I/O operation is first initiated and then waited upon.

    type
    TCompletionKey = dword

    TCompletionData = object
    sock: TSocketHandle
    cb: proc (sock: TSocketHandle) {.closure.}

    PProactor* = ref object
    ioPort: THandle
    callbacks: TTable[TCompletionKey, TCompletionData]

    PFuture*[T] = ref object
    value*: T
    finished*: bool
    cb: proc (future: PFuture[T]) {.closure.}

    proc newProactor*(): PProactor =
    ## Creates a new proactor instance.
    new result
    result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
    result.callbacks = initTable[TCompletionKey, TCompletionData]()

    proc register*(p: PProactor, sock: TSocketHandle) =
    if CreateIOCompletionPort(sock.handle, p.ioPort,
    cast[TCompletionKey](sock), 1) == 0:
    OSError(OSLastError())

    proc setCallback*(p: PProactor, sock: TSocketHandle, cb: proc (sock: TSocketHandle) {.closure.}) =
    ## Sets the proc to call when a completion event is received for ``sock``.
    p.callbacks[sock.TCompletionKey] = TCompletionData(sock: sock, cb: cb)

    proc GetQueuedCompletionStatus*(CompletionPort: THandle,
    lpNumberOfBytesTransferred: PDWORD, lpCompletionKey: PULONG,
    lpOverlapped: ptr POverlapped,
    dwMilliseconds: DWORD): WINBOOL{.stdcall,
    dynlib: "kernel32", importc: "GetQueuedCompletionStatus".}

    proc poll*(p: PProactor, timeout = 500) =
    let llTimeout =
    if timeout == -1: windows.INFINITE
    else: timeout.int32
    var lpNumberOfBytesTransferred: DWORD
    var lpCompletionKey: ULONG
    var lpOverlapped: POverlapped
    let res = GetQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred,
    addr lpCompletionKey, addr lpOverlapped, llTimeout).bool
    if res:
    # We got a completion event.
    echo("Got a completion event: ", lpCompletionKey)

    let data = p.callbacks[lpCompletionKey.TCompletionKey]
    data.cb(data.sock)
    else:
    var errCode = OSLastError()
    if errCode.int32 == WAIT_TIMEOUT:
    # Timed out
    nil
    else: OSError(errCode)

    # -- Futures

    proc newFuture*[T](): PFuture[T] =
    new(result)
    result.finished = false

    proc complete*[T](future: PFuture[T], val: T) =
    assert(not future.finished)
    future.value = val
    future.finished = true
    if future.cb != nil:
    future.cb(future)

    proc `callback=`*[T](future: PFuture[T],
    cb: proc (future: PFuture[T]) {.closure.}) =
    ## Sets the callback proc to be called when the future completes.
    ##
    ## If future has already completed then ``cb`` will be called immediately.
    future.cb = cb
    if future.finished:
    future.cb(future)

    const
    IOC_OUT = 0x40000000
    IOC_IN = 0x80000000
    IOC_WS2 = 0x08000000
    IOC_INOUT = IOC_IN or IOC_OUT

    template WSAIORW(x,y): expr = (IOC_INOUT or x or y)

    const
    SIO_GET_EXTENSION_FUNCTION_POINTER = WSAIORW(IOC_WS2,6).DWORD

    var
    WSAID_CONNECTEX: TGUID = TGUID(D1: 0x25a207b9, D2: 0xddf3'i16, D3: 0x4660, D4: [
    0x8e'i8, 0xe9'i8, 0x76'i8, 0xe5'i8, 0x8c'i8, 0x74'i8,
    0x06'i8, 0x3e'i8])

    proc WSAIoctl(s: TSocketHandle, dwIoControlCode: dword, lpvInBuffer: pointer,
    cbInBuffer: dword, lpvOutBuffer: pointer, cbOutBuffer: dword,
    lpcbBytesReturned: PDword, lpOverlapped: LPOVERLAPPED,
    lpCompletionRoutine: LPOVERLAPPED_COMPLETION_ROUTINE): cint {.stdcall, importc: "WSAIoctl", dynlib: "Ws2_32.dll".}

    var connectExPtr: pointer = nil

    proc initPointer*(s: TSocketHandle, func: var pointer, guid: var TGUID): bool =
    # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
    var bytesRet: DWord
    func = nil
    result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
    sizeof(TGUID).dword, addr func, sizeof(pointer).DWORD,
    addr bytesRet, nil, nil) == 0

    proc initAll*() =
    let dummySock = socket()
    if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX):
    OSError(OSLastError())

    proc connectEx*(s: TSocketHandle, name: ptr TSockAddr, namelen: cint,
    lpSendBuffer: pointer, dwSendDataLength: dword,
    lpdwBytesSent: lpdword, lpOverlapped: POverlapped): bool =
    if connectExPtr.isNil: raise newException(EInvalidValue, "Need to initialise ConnectEx().")
    let func =
    cast[proc (s: TSocketHandle, name: ptr TSockAddr, namelen: cint,
    lpSendBuffer: pointer, dwSendDataLength: dword,
    lpdwBytesSent: lpdword, lpOverlapped: POverlapped): bool {.stdcall.}](connectExPtr)

    result = func(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent,
    lpOverlapped)

    proc connect(p: PProactor, socket: TSocketHandle, address: string, port: TPort,
    af = AF_INET): PFuture[int] =
    ## Connects ``socket`` to server at ``address:port``.
    ##
    ## Returns a ``PFuture`` which will complete when the connection succeeds
    ## or an error occurs.

    var retFuture = newFuture[int]()# TODO: Change to void when that regression is fixed.
    # Apparently ``ConnectEx`` expects the socket to be initially bound:
    var saddr: Tsockaddr_in
    saddr.sin_family = int16(toInt(af))
    saddr.sin_port = 0
    saddr.sin_addr.s_addr = INADDR_ANY
    if bindAddr(socket, cast[ptr TSockAddr](addr(saddr)),
    sizeof(saddr).TSockLen) < 0'i32:
    OSError(OSLastError())

    var aiList = getAddrInfo(address, port, af)
    var success = false
    var lastError: TOSErrorCode
    var it = aiList
    while it != nil:
    #var ol: TOverlapped
    #var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
    # nil, 0, nil, addr ol)
    var ol = cast[ptr TOverlapped](alloc0(sizeof(TOverlapped)))
    var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
    nil, 0, nil, ol)
    dealloc(ol)
    if ret:
    echo "Connected"
    success = true
    retFuture.finished = true
    break
    else:
    lastError = OSLastError()
    when defined(windows):
    if lastError.int32 == ERROR_IO_PENDING:
    success = true
    p.setCallback(socket,
    proc (sock: TSocketHandle) =
    echo("Connected")
    retFuture.complete(0)
    )
    break
    else: OSError(lastError)
    it = it.ai_next

    dealloc(aiList)

    if not success: OSError(lastError)
    return retFuture

    type
    TWSABuf {.importc: "WSABUF", header: "winsock2.h".} = object
    len: ULONG
    buf: cstring

    proc WSARecv*(s: TSocketHandle, buf: ptr TWSABuf, bufCount: DWORD,
    bytesReceived, flags: PDWORD, lpOverlapped: POverlapped,
    completionProc: LPOVERLAPPED_COMPLETION_ROUTINE): cint {.
    stdcall, importc: "WSARecv", dynlib: "Ws2_32.dll".}

    proc recv(p: PProactor, socket: TSocketHandle, size: int) =
    var dataBuf: TWSABuf
    dataBuf.buf = newString(size)
    dataBuf.len = size

    var bytesReceived, flags: DWord
    var overlapped: TOverlapped
    let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived,
    addr flags, addr overlapped, nil)
    if ret == -1:
    let err = OSLastError()
    echo err
    OSError(err)

    when isMainModule:
    initAll()
    var p = newProactor()
    var sock = socket()
    #sock.setBlocking false
    p.register(sock)

    var f = p.connect(sock, "irc.freenode.net", TPort(6667))
    f.callback = proc (future: PFuture[int]) = echo("Connected in future!")
    #p.recv(sock, 10)

    while true:
    p.poll()
    echo "polled"



    161 changes: 161 additions & 0 deletions socketsll.nim
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,161 @@
    #
    #
    # Nimrod's Runtime Library
    # (c) Copyright 2014 Dominik Picheta
    #
    # See the file "copying.txt", included in this
    # distribution, for details about the copyright.
    #

    ## This module implements a low-level cross-platform sockets interface. Look
    ## at the ``net`` module for the higher-level version.

    import unsigned, os

    when hostos == "solaris":
    {.passl: "-lsocket -lnsl".}

    when defined(Windows):
    import winlean
    else:
    import posix

    type

    TPort* = distinct uint16 ## port type

    TDomain* = enum ## domain, which specifies the protocol family of the
    ## created socket. Other domains than those that are listed
    ## here are unsupported.
    AF_UNIX, ## for local socket (using a file). Unsupported on Windows.
    AF_INET = 2, ## for network protocol IPv4 or
    AF_INET6 = 23 ## for network protocol IPv6.

    TType* = enum ## second argument to `socket` proc
    SOCK_STREAM = 1, ## reliable stream-oriented service or Stream Sockets
    SOCK_DGRAM = 2, ## datagram service or Datagram Sockets
    SOCK_RAW = 3, ## raw protocols atop the network layer.
    SOCK_SEQPACKET = 5 ## reliable sequenced packet service

    TProtocol* = enum ## third argument to `socket` proc
    IPPROTO_TCP = 6, ## Transmission control protocol.
    IPPROTO_UDP = 17, ## User datagram protocol.
    IPPROTO_IP, ## Internet protocol. Unsupported on Windows.
    IPPROTO_IPV6, ## Internet Protocol Version 6. Unsupported on Windows.
    IPPROTO_RAW, ## Raw IP Packets Protocol. Unsupported on Windows.
    IPPROTO_ICMP ## Control message protocol. Unsupported on Windows.

    TServent* {.pure, final.} = object ## information about a service
    name*: string
    aliases*: seq[string]
    port*: TPort
    proto*: string

    Thostent* {.pure, final.} = object ## information about a given host
    name*: string
    aliases*: seq[string]
    addrtype*: TDomain
    length*: int
    addrList*: seq[string]

    when defined(windows):
    let
    OSInvalidSocket* = winlean.INVALID_SOCKET
    else:
    let
    OSInvalidSocket* = posix.INVALID_SOCKET

    proc `==`*(a, b: TPort): bool {.borrow.}
    ## ``==`` for ports.

    proc `$`*(p: TPort): string {.borrow.}
    ## returns the port number as a string

    proc toInt*(domain: TDomain): cint
    ## Converts the TDomain enum to a platform-dependent ``cint``.

    proc toInt*(typ: TType): cint
    ## Converts the TType enum to a platform-dependent ``cint``.

    proc toInt*(p: TProtocol): cint
    ## Converts the TProtocol enum to a platform-dependent ``cint``.

    when defined(posix):
    proc toInt(domain: TDomain): cint =
    case domain
    of AF_UNIX: result = posix.AF_UNIX
    of AF_INET: result = posix.AF_INET
    of AF_INET6: result = posix.AF_INET6
    else: nil

    proc toInt(typ: TType): cint =
    case typ
    of SOCK_STREAM: result = posix.SOCK_STREAM
    of SOCK_DGRAM: result = posix.SOCK_DGRAM
    of SOCK_SEQPACKET: result = posix.SOCK_SEQPACKET
    of SOCK_RAW: result = posix.SOCK_RAW
    else: nil

    proc toInt(p: TProtocol): cint =
    case p
    of IPPROTO_TCP: result = posix.IPPROTO_TCP
    of IPPROTO_UDP: result = posix.IPPROTO_UDP
    of IPPROTO_IP: result = posix.IPPROTO_IP
    of IPPROTO_IPV6: result = posix.IPPROTO_IPV6
    of IPPROTO_RAW: result = posix.IPPROTO_RAW
    of IPPROTO_ICMP: result = posix.IPPROTO_ICMP
    else: nil

    else:
    proc toInt(domain: TDomain): cint =
    result = toU16(ord(domain))

    proc toInt(typ: TType): cint =
    result = cint(ord(typ))

    proc toInt(p: TProtocol): cint =
    result = cint(ord(p))


    proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
    protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
    ## Creates a new socket; returns `InvalidSocket` if an error occurs.

    # TODO: The function which will use this will raise EOS.
    socket(toInt(domain), toInt(typ), toInt(protocol))

    proc close*(socket: TSocketHandle) =
    ## closes a socket.
    when defined(windows):
    discard winlean.closeSocket(socket)
    else:
    discard posix.close(socket)
    # TODO: These values should not be discarded. An EOS should be raised.
    # http://stackoverflow.com/questions/12463473/what-happens-if-you-call-close-on-a-bsd-socket-multiple-times

    proc bindAddr*(socket: TSocketHandle, name: ptr TSockAddr, namelen: TSockLen): cint =
    result = bindSocket(socket, name, namelen)

    proc getAddrInfo*(address: string, port: TPort, af: TDomain = AF_INET, typ: TType = SOCK_STREAM,
    prot: TProtocol = IPPROTO_TCP): ptr TAddrInfo =
    ##
    ##
    ## **Warning**: The resulting ``ptr TAddrInfo`` must be freed using ``dealloc``!
    var hints: TAddrInfo
    result = nil
    hints.ai_family = toInt(af)
    hints.ai_socktype = toInt(typ)
    hints.ai_protocol = toInt(prot)
    var gaiResult = getAddrInfo(address, $port, addr(hints), result)
    if gaiResult != 0'i32:
    when defined(windows):
    OSError(OSLastError())
    else:
    raise newException(EOS, $gai_strerror(gaiResult))

    proc dealloc*(ai: ptr TAddrInfo) =
    freeaddrinfo(ai)

    when defined(Windows):
    var wsa: TWSADATA
    if WSAStartup(0x0101'i16, addr wsa) != 0: OSError(OSLastError())