Skip to content

Instantly share code, notes, and snippets.

@schlamar
Created January 20, 2014 13:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save schlamar/8519530 to your computer and use it in GitHub Desktop.
Save schlamar/8519530 to your computer and use it in GitHub Desktop.
WSAAccept with Python and ctypes
import errno
import multiprocessing
import socket
import time
import ctypes
from ctypes import windll
from ctypes import POINTER, WINFUNCTYPE, c_char_p, c_char, c_int, c_ushort
from ctypes.wintypes import DWORD, UINT, INT, ULONG
SOCKET = UINT
SERVICETYPE = UINT
GROUP = UINT
DWORD_PTR = POINTER(DWORD)
LPINT = POINTER(INT)
CF_REJECT = 0x0001
class sockaddr(ctypes.Structure):
_fields_ = [
('sa_family', c_ushort),
('sa_data', c_char * 14)
]
class WSABUF(ctypes.Structure):
_fields_ = [
('len', ULONG),
('buf', c_char_p)
]
LPWSABUF = POINTER(WSABUF)
class FLOWSPEC(ctypes.Structure):
_fields_ = [
('TokenRate', ULONG),
('TokenBucketSize', ULONG),
('PeakBandwidth', ULONG),
('Latency', ULONG),
('DelayVariation', ULONG),
('ServiceType', SERVICETYPE),
('MaxSduSize', ULONG),
('MinimumPolicedSize', ULONG),
]
LPQOS = POINTER(FLOWSPEC)
LPCONDITIONPROC = WINFUNCTYPE(
c_int, LPWSABUF, LPWSABUF, LPQOS, LPQOS, LPWSABUF, LPWSABUF,
POINTER(GROUP), DWORD_PTR)
@LPCONDITIONPROC
def cond_callback(*args):
return CF_REJECT
lib = windll.LoadLibrary('Ws2_32')
WSAAccept = lib.WSAAccept
WSAAccept.argtypes = [SOCKET, POINTER(sockaddr), LPINT, LPCONDITIONPROC,
DWORD_PTR]
WSAAccept.restype = SOCKET
PORT = 50123
def server():
s = socket.socket()
s.bind(('127.0.0.1', PORT))
s.listen(0)
while True:
print 'wait'
sa_client = sockaddr()
i_client_size = INT(ctypes.sizeof(sa_client))
WSAAccept(s.fileno(), ctypes.byref(sa_client),
ctypes.byref(i_client_size), cond_callback, None)
print 'refused', sa_client.sa_family, repr(sa_client.sa_data)
def main():
p = multiprocessing.Process(target=server)
p.daemon = True
p.start()
client = socket.socket()
start = time.time()
try:
client.connect(('127.0.0.1', PORT))
except socket.error as e:
assert e.errno == errno.WSAECONNREFUSED
print 'connection attempt took', time.time() - start
client2 = socket.socket()
start = time.time()
try:
client2.connect(('127.0.0.1', PORT))
except socket.error as e:
assert e.errno == errno.WSAECONNREFUSED
print 'connection attempt took', time.time() - start
finally:
client.close()
client2.close()
p.join(1)
if __name__ == '__main__':
main()
@b3mb4m
Copy link

b3mb4m commented May 1, 2016

Reject not work as expected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment