Skip to content

Instantly share code, notes, and snippets.

@dom96
Created January 25, 2014 23:55
Show Gist options
  • Save dom96/74a81eb2001f1f9265c8 to your computer and use it in GitHub Desktop.
Save dom96/74a81eb2001f1f9265c8 to your computer and use it in GitHub Desktop.
import os, oids, tables, strutils
import windows
from winlean import TSocketHandle, TSockAddr, TSockLen, TAddrInfo, getAddrInfo,
WSAEWOULDBLOCK, freeaddrinfo, Tsockaddr_in, inet_addr, IN_ADDRANY, bindSocket
import socketsll
## Proactor
## --------
##
## This module implements the proactor pattern, unlike selectors the
## asynchronous I/O operation is first initiated and then waited upon.
type
TCompletionKey = dword
TCompletionData = object
sock: TSocketHandle
cb: proc (sock: TSocketHandle) {.closure.}
PProactor* = ref object
ioPort: THandle
callbacks: TTable[TCompletionKey, TCompletionData]
PFuture*[T] = ref object
value*: T
finished*: bool
cb: proc (future: PFuture[T]) {.closure.}
proc newProactor*(): PProactor =
## Creates a new proactor instance.
new result
result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
result.callbacks = initTable[TCompletionKey, TCompletionData]()
proc register*(p: PProactor, sock: TSocketHandle) =
if CreateIOCompletionPort(sock.handle, p.ioPort,
cast[TCompletionKey](sock), 1) == 0:
OSError(OSLastError())
proc setCallback*(p: PProactor, sock: TSocketHandle, cb: proc (sock: TSocketHandle) {.closure.}) =
## Sets the proc to call when a completion event is received for ``sock``.
p.callbacks[sock.TCompletionKey] = TCompletionData(sock: sock, cb: cb)
proc GetQueuedCompletionStatus*(CompletionPort: THandle,
lpNumberOfBytesTransferred: PDWORD, lpCompletionKey: PULONG,
lpOverlapped: ptr POverlapped,
dwMilliseconds: DWORD): WINBOOL{.stdcall,
dynlib: "kernel32", importc: "GetQueuedCompletionStatus".}
proc poll*(p: PProactor, timeout = 500) =
let llTimeout =
if timeout == -1: windows.INFINITE
else: timeout.int32
var lpNumberOfBytesTransferred: DWORD
var lpCompletionKey: ULONG
var lpOverlapped: POverlapped
let res = GetQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred,
addr lpCompletionKey, addr lpOverlapped, llTimeout).bool
if res:
# We got a completion event.
echo("Got a completion event: ", lpCompletionKey)
let data = p.callbacks[lpCompletionKey.TCompletionKey]
data.cb(data.sock)
else:
var errCode = OSLastError()
if errCode.int32 == WAIT_TIMEOUT:
# Timed out
nil
else: OSError(errCode)
# -- Futures
proc newFuture*[T](): PFuture[T] =
new(result)
result.finished = false
proc complete*[T](future: PFuture[T], val: T) =
assert(not future.finished)
future.value = val
future.finished = true
if future.cb != nil:
future.cb(future)
proc `callback=`*[T](future: PFuture[T],
cb: proc (future: PFuture[T]) {.closure.}) =
## Sets the callback proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
future.cb = cb
if future.finished:
future.cb(future)
const
IOC_OUT = 0x40000000
IOC_IN = 0x80000000
IOC_WS2 = 0x08000000
IOC_INOUT = IOC_IN or IOC_OUT
template WSAIORW(x,y): expr = (IOC_INOUT or x or y)
const
SIO_GET_EXTENSION_FUNCTION_POINTER = WSAIORW(IOC_WS2,6).DWORD
var
WSAID_CONNECTEX: TGUID = TGUID(D1: 0x25a207b9, D2: 0xddf3'i16, D3: 0x4660, D4: [
0x8e'i8, 0xe9'i8, 0x76'i8, 0xe5'i8, 0x8c'i8, 0x74'i8,
0x06'i8, 0x3e'i8])
proc WSAIoctl(s: TSocketHandle, dwIoControlCode: dword, lpvInBuffer: pointer,
cbInBuffer: dword, lpvOutBuffer: pointer, cbOutBuffer: dword,
lpcbBytesReturned: PDword, lpOverlapped: LPOVERLAPPED,
lpCompletionRoutine: LPOVERLAPPED_COMPLETION_ROUTINE): cint {.stdcall, importc: "WSAIoctl", dynlib: "Ws2_32.dll".}
var connectExPtr: pointer = nil
proc initPointer*(s: TSocketHandle, func: var pointer, guid: var TGUID): bool =
# Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
var bytesRet: DWord
func = nil
result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
sizeof(TGUID).dword, addr func, sizeof(pointer).DWORD,
addr bytesRet, nil, nil) == 0
proc initAll*() =
let dummySock = socket()
if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX):
OSError(OSLastError())
proc connectEx*(s: TSocketHandle, name: ptr TSockAddr, namelen: cint,
lpSendBuffer: pointer, dwSendDataLength: dword,
lpdwBytesSent: lpdword, lpOverlapped: POverlapped): bool =
if connectExPtr.isNil: raise newException(EInvalidValue, "Need to initialise ConnectEx().")
let func =
cast[proc (s: TSocketHandle, name: ptr TSockAddr, namelen: cint,
lpSendBuffer: pointer, dwSendDataLength: dword,
lpdwBytesSent: lpdword, lpOverlapped: POverlapped): bool {.stdcall.}](connectExPtr)
result = func(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent,
lpOverlapped)
proc connect(p: PProactor, socket: TSocketHandle, address: string, port: TPort,
af = AF_INET): PFuture[int] =
## Connects ``socket`` to server at ``address:port``.
##
## Returns a ``PFuture`` which will complete when the connection succeeds
## or an error occurs.
var retFuture = newFuture[int]()# TODO: Change to void when that regression is fixed.
# Apparently ``ConnectEx`` expects the socket to be initially bound:
var saddr: Tsockaddr_in
saddr.sin_family = int16(toInt(af))
saddr.sin_port = 0
saddr.sin_addr.s_addr = INADDR_ANY
if bindAddr(socket, cast[ptr TSockAddr](addr(saddr)),
sizeof(saddr).TSockLen) < 0'i32:
OSError(OSLastError())
var aiList = getAddrInfo(address, port, af)
var success = false
var lastError: TOSErrorCode
var it = aiList
while it != nil:
#var ol: TOverlapped
#var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
# nil, 0, nil, addr ol)
var ol = cast[ptr TOverlapped](alloc0(sizeof(TOverlapped)))
var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
nil, 0, nil, ol)
dealloc(ol)
if ret:
echo "Connected"
success = true
retFuture.finished = true
break
else:
lastError = OSLastError()
when defined(windows):
if lastError.int32 == ERROR_IO_PENDING:
success = true
p.setCallback(socket,
proc (sock: TSocketHandle) =
echo("Connected")
retFuture.complete(0)
)
break
else: OSError(lastError)
it = it.ai_next
dealloc(aiList)
if not success: OSError(lastError)
return retFuture
type
TWSABuf {.importc: "WSABUF", header: "winsock2.h".} = object
len: ULONG
buf: cstring
proc WSARecv*(s: TSocketHandle, buf: ptr TWSABuf, bufCount: DWORD,
bytesReceived, flags: PDWORD, lpOverlapped: POverlapped,
completionProc: LPOVERLAPPED_COMPLETION_ROUTINE): cint {.
stdcall, importc: "WSARecv", dynlib: "Ws2_32.dll".}
proc recv(p: PProactor, socket: TSocketHandle, size: int) =
var dataBuf: TWSABuf
dataBuf.buf = newString(size)
dataBuf.len = size
var bytesReceived, flags: DWord
var overlapped: TOverlapped
let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived,
addr flags, addr overlapped, nil)
if ret == -1:
let err = OSLastError()
echo err
OSError(err)
when isMainModule:
initAll()
var p = newProactor()
var sock = socket()
#sock.setBlocking false
p.register(sock)
var f = p.connect(sock, "irc.freenode.net", TPort(6667))
f.callback = proc (future: PFuture[int]) = echo("Connected in future!")
#p.recv(sock, 10)
while true:
p.poll()
echo "polled"
#
#
# Nimrod's Runtime Library
# (c) Copyright 2014 Dominik Picheta
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## This module implements a low-level cross-platform sockets interface. Look
## at the ``net`` module for the higher-level version.
import unsigned, os
when hostos == "solaris":
{.passl: "-lsocket -lnsl".}
when defined(Windows):
import winlean
else:
import posix
type
TPort* = distinct uint16 ## port type
TDomain* = enum ## domain, which specifies the protocol family of the
## created socket. Other domains than those that are listed
## here are unsupported.
AF_UNIX, ## for local socket (using a file). Unsupported on Windows.
AF_INET = 2, ## for network protocol IPv4 or
AF_INET6 = 23 ## for network protocol IPv6.
TType* = enum ## second argument to `socket` proc
SOCK_STREAM = 1, ## reliable stream-oriented service or Stream Sockets
SOCK_DGRAM = 2, ## datagram service or Datagram Sockets
SOCK_RAW = 3, ## raw protocols atop the network layer.
SOCK_SEQPACKET = 5 ## reliable sequenced packet service
TProtocol* = enum ## third argument to `socket` proc
IPPROTO_TCP = 6, ## Transmission control protocol.
IPPROTO_UDP = 17, ## User datagram protocol.
IPPROTO_IP, ## Internet protocol. Unsupported on Windows.
IPPROTO_IPV6, ## Internet Protocol Version 6. Unsupported on Windows.
IPPROTO_RAW, ## Raw IP Packets Protocol. Unsupported on Windows.
IPPROTO_ICMP ## Control message protocol. Unsupported on Windows.
TServent* {.pure, final.} = object ## information about a service
name*: string
aliases*: seq[string]
port*: TPort
proto*: string
Thostent* {.pure, final.} = object ## information about a given host
name*: string
aliases*: seq[string]
addrtype*: TDomain
length*: int
addrList*: seq[string]
when defined(windows):
let
OSInvalidSocket* = winlean.INVALID_SOCKET
else:
let
OSInvalidSocket* = posix.INVALID_SOCKET
proc `==`*(a, b: TPort): bool {.borrow.}
## ``==`` for ports.
proc `$`*(p: TPort): string {.borrow.}
## returns the port number as a string
proc toInt*(domain: TDomain): cint
## Converts the TDomain enum to a platform-dependent ``cint``.
proc toInt*(typ: TType): cint
## Converts the TType enum to a platform-dependent ``cint``.
proc toInt*(p: TProtocol): cint
## Converts the TProtocol enum to a platform-dependent ``cint``.
when defined(posix):
proc toInt(domain: TDomain): cint =
case domain
of AF_UNIX: result = posix.AF_UNIX
of AF_INET: result = posix.AF_INET
of AF_INET6: result = posix.AF_INET6
else: nil
proc toInt(typ: TType): cint =
case typ
of SOCK_STREAM: result = posix.SOCK_STREAM
of SOCK_DGRAM: result = posix.SOCK_DGRAM
of SOCK_SEQPACKET: result = posix.SOCK_SEQPACKET
of SOCK_RAW: result = posix.SOCK_RAW
else: nil
proc toInt(p: TProtocol): cint =
case p
of IPPROTO_TCP: result = posix.IPPROTO_TCP
of IPPROTO_UDP: result = posix.IPPROTO_UDP
of IPPROTO_IP: result = posix.IPPROTO_IP
of IPPROTO_IPV6: result = posix.IPPROTO_IPV6
of IPPROTO_RAW: result = posix.IPPROTO_RAW
of IPPROTO_ICMP: result = posix.IPPROTO_ICMP
else: nil
else:
proc toInt(domain: TDomain): cint =
result = toU16(ord(domain))
proc toInt(typ: TType): cint =
result = cint(ord(typ))
proc toInt(p: TProtocol): cint =
result = cint(ord(p))
proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
## Creates a new socket; returns `InvalidSocket` if an error occurs.
# TODO: The function which will use this will raise EOS.
socket(toInt(domain), toInt(typ), toInt(protocol))
proc close*(socket: TSocketHandle) =
## closes a socket.
when defined(windows):
discard winlean.closeSocket(socket)
else:
discard posix.close(socket)
# TODO: These values should not be discarded. An EOS should be raised.
# http://stackoverflow.com/questions/12463473/what-happens-if-you-call-close-on-a-bsd-socket-multiple-times
proc bindAddr*(socket: TSocketHandle, name: ptr TSockAddr, namelen: TSockLen): cint =
result = bindSocket(socket, name, namelen)
proc getAddrInfo*(address: string, port: TPort, af: TDomain = AF_INET, typ: TType = SOCK_STREAM,
prot: TProtocol = IPPROTO_TCP): ptr TAddrInfo =
##
##
## **Warning**: The resulting ``ptr TAddrInfo`` must be freed using ``dealloc``!
var hints: TAddrInfo
result = nil
hints.ai_family = toInt(af)
hints.ai_socktype = toInt(typ)
hints.ai_protocol = toInt(prot)
var gaiResult = getAddrInfo(address, $port, addr(hints), result)
if gaiResult != 0'i32:
when defined(windows):
OSError(OSLastError())
else:
raise newException(EOS, $gai_strerror(gaiResult))
proc dealloc*(ai: ptr TAddrInfo) =
freeaddrinfo(ai)
when defined(Windows):
var wsa: TWSADATA
if WSAStartup(0x0101'i16, addr wsa) != 0: OSError(OSLastError())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment