Skip to content

Instantly share code, notes, and snippets.

@mkroutikov
Created December 1, 2017 21:10
Show Gist options
  • Save mkroutikov/e5f054640a02f87ead6072487fa1ba5a to your computer and use it in GitHub Desktop.
Save mkroutikov/e5f054640a02f87ead6072487fa1ba5a to your computer and use it in GitHub Desktop.
import unittest
import queue
class BlockPipe:
'''a file-like object that implements Pipe protocol
using blocking queue
'''
def __init__(self, maxsize=0):
self._queue = queue.Queue(maxsize)
self._leftover = None
self._closed = False
def read(self, size=-1):
assert size != 0
if self._leftover is not None:
if size > 0 and len(self._leftover) > size:
data = self._leftover[:size]
self._leftover = self._leftover[size:]
else:
data = self._leftover
self._leftover = None
assert len(data) > 0
elif self._closed:
return b''
else:
data = self._queue.get()
assert len(data) > 0
if size > 0 and len(data) > size:
self._leftover = data[size:]
data = data[:size]
return data
def write(self, data):
if self._closed:
raise RuntimeError('pipe was closed!')
assert type(data) == bytes
assert len(data) > 0
self._queue.put(data)
def close(self):
self._closed = True
class TestBlockPipe(unittest.TestCase):
def test_basic(self):
pipe = Pipe()
pipe.write(b'hello')
data = pipe.read()
self.assertEqual(data, b'hello')
def test01(self):
pipe = Pipe()
pipe.write(b'hello')
pipe.write(b'world')
data = pipe.read()
self.assertEqual(data, b'hello')
data = pipe.read()
self.assertEqual(data, b'world')
def test02(self):
pipe = Pipe()
pipe.write(b'hello')
data = pipe.read(3)
self.assertEqual(data, b'hel')
data = pipe.read(3)
self.assertEqual(data, b'lo')
pipe.write(b'world')
data = pipe.read(200)
self.assertEqual(data, b'world')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment