Skip to content

Instantly share code, notes, and snippets.

@niwinz
Forked from wolever/geventfd.py
Created June 16, 2012 07:57
Show Gist options
  • Save niwinz/2940439 to your computer and use it in GitHub Desktop.
Save niwinz/2940439 to your computer and use it in GitHub Desktop.
Simple wrapper around a file descriptor which will perform non-blocking reads/writes using gevent
import os
import fcntl
from gevent.core import wait_read, wait_write
class GeventFD(object):
""" Wrap a file descriptor so it can be used for non-blocking reads and writes with gevent.
>>> stdin = GeventFD(sys.stdin.fileno())
>>> stdin.read(5)
'hello'
"""
def __init__(self, fd):
self.fd = fd
fcntl.fcntl(self.fd, fcntl.F_SETFL, os.O_NONBLOCK)
self.w_pending = ""
self.r_pending = ""
def read(self, total_to_read):
remaining = total_to_read
result = ""
while remaining > 0:
result += self.r_pending[:remaining]
self.r_pending = self.r_pending[remaining:]
remaining = total_to_read - len(result)
if remaining <= 0:
break
wait_read(self.fd)
self.r_pending += os.read(self.fd, 4096)
assert len(result) == total_to_read, \
"len(%r) != %s" %(result, total_to_read)
return result
def write(self, data):
self.w_pending += data
while self.w_pending:
wait_write(self.fd)
written = os.write(self.fd, self.w_pending)
self.w_pending = self.w_pending[written:]
import gevent, gevent.socket
import fcntl, os, sys, errno
class GeventFile(object):
def __init__(self, fobj):
self._obj = fobj
fcntl.fcntl(self._obj, fcntl.F_SETFL, os.O_NONBLOCK)
def __getattr__(self, item):
assert item != '_obj'
return getattr(self._obj, item)
def write(self, data):
# use buffer
bytes_total = len(data)
bytes_written = 0
fileno = self.fileno()
while bytes_written < bytes_total:
try:
# fileobj.write() doesn't return anything, so use os.write.
bytes_written += os.write(fileno, data[bytes_written:])
except IOError, ex:
if ex[0] != errno.EAGAIN:
raise
sys.exc_clear()
gevent.socket.wait_write(fileno)
def read(self, size=-1, chunksize=1024):
chunks = []
bytes_read = 0
fileno = self.fileno()
while size < 0 or bytes_read < size:
try:
if size < 0:
chunk = self._obj.read()
else:
chunk = self._obj.read(min(chunksize, size))
if not chunk:
break
chunks.append(chunk)
bytes_read += len(chunk)
except IOError, ex:
if ex[0] != errno.EAGAIN:
raise
sys.exc_clear()
gevent.socket.wait_read(fileno)
return ''.join(chunks)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment