Skip to content

Instantly share code, notes, and snippets.

@mnunberg
Last active December 26, 2015 11:29
Show Gist options
  • Save mnunberg/7144452 to your computer and use it in GitHub Desktop.
Save mnunberg/7144452 to your computer and use it in GitHub Desktop.
pyuv sync
from socket import getaddrinfo
from pyuv import Timer, Loop, TCP
from pyuv.error import UVError
from pyuv import errno as uv_errno
class NetworkError(Exception):
def __init__(self, uverr):
self.code = uverr
def __str__(self):
return "UV Error: {0}".format(uv_errno.strerror(self.code))
class TimeoutError(NetworkError):
def __init__(self):
self.code = -1
def __str__(self):
return "Timed out"
class Context(object):
def __init__(self, loop=None):
if not loop:
loop = Loop()
self.loop = loop
self.timer = Timer(self.loop)
self.sock = TCP(self.loop)
self._timeout = 0
self._next_timeout = 0
self._cur_result = None
self._last_error = None
@property
def timeout(self):
return self._timeout
@timeout.setter
def timeout(self, newval):
self.timer.stop()
self._next_timeout = newval
def _run_timer(self):
self.timer.start(self._timer_expired, self._next_timeout, 0)
def _timer_expired(self, _):
self._last_error = TimeoutError()
self.loop.stop()
def success(self):
self.timer.stop()
self.loop.stop()
def fail(self, err):
self._last_error = err
self.timer.stop()
self.loop.stop()
def _run(self):
self._last_error = None
self._run_timer()
self.loop.run()
if self._last_error:
raise self._last_error
def connect(self, addr, port):
def _cb(handle, error):
if error:
self.fail(error)
return
self.success()
addr = getaddrinfo(addr, port)[0][-1]
self.sock.connect(addr, _cb)
self._run()
def _read_callback(self, handle, data, error):
if error:
if error == uv_errno.UV_EOF:
# End of stream
self.success()
return
self.fail(error)
return
if not len(data):
# Closed?
self.sock.stop_read()
self.success()
return
self._buf += data
if len(self._buf) > size:
# Got enough data for now.
self.sock.stop_read()
self.success()
return
# Reset the timer
self.timer.again()
def read(self, size):
self._buf = bytearray()
self.sock.start_read(self._read_callback)
self._run()
ret = self._buf
self._buf = None
return ret
def write(self, buf):
def _cb(handle, error):
if not error:
self.success()
return
self.fail(error)
self.sock.write(buf, _cb)
self._run()
if __name__ == "__main__":
ctx = Context()
ctx.timeout = 0.5
ctx.connect("google.com", 80)
ctx.write(r'GET / HTTP/1.0\r\n\r\n')
resp = ctx.read(100000)
print resp
# This one should time out
ctx = Context()
ctx.timeout = 0.5
ctx.connect("example.com", 55)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment