Skip to content

Instantly share code, notes, and snippets.

@Snawoot
Last active January 15, 2020 13:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Snawoot/4035e3fd9f2f0c5289c310311d1ff795 to your computer and use it in GitHub Desktop.
Save Snawoot/4035e3fd9f2f0c5289c310311d1ff795 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import ssl
import random
import io
import os
import timeit
import hashlib
import collections
import ctypes
HASH = hashlib.sha256
RANDOM_CHUNK = 512
READ_CHUNK_LOW = 5
READ_CHUNK_HIGH = 50
DATA = io.BytesIO()
DGST = HASH()
for _ in range((2**20) // RANDOM_CHUNK):
buf = os.urandom(RANDOM_CHUNK)
DATA.write(buf)
DGST.update(buf)
assert DATA.tell() >= 2**20
def bio(BIO):
DATA.seek(0)
b = BIO()
h = HASH()
src_empty = False
dst_empty = False
while not src_empty or not dst_empty:
# write data
if not src_empty:
size = random.randrange(READ_CHUNK_LOW,READ_CHUNK_HIGH)
buf = DATA.read(size)
if not buf:
src_empty = True
b.write(buf)
# read data
size = random.randrange(READ_CHUNK_LOW,READ_CHUNK_HIGH)
buf = b.read(size)
if not buf and src_empty:
break
h.update(buf)
assert h.digest() == DGST.digest()
class DequeBIO:
def __init__(self):
self._chunks = collections.deque()
self._skip_left = 0
def write(self, data):
self._chunks.append(data)
def read(self, n):
tmp = []
sent = 0
while sent < n and self._chunks:
curr = self._chunks.popleft()
curr_len = len(curr) - self._skip_left
if curr_len <= (n - sent):
# sink current chunk completely
tmp.append(curr[self._skip_left:])
self._skip_left = 0
sent += curr_len
else:
# cut piece from current chunk (fill remaining requested bytes)
tmp.append(curr[self._skip_left:self._skip_left + (n - sent)])
self._chunks.appendleft(curr)
self._skip_left += (n - sent)
sent = n
return b''.join(tmp)
class BIODequeBIO:
def __init__(self, chunk_size=1024):
self._chunks = collections.deque()
self._chunk_size = chunk_size
self._right_leftover = 0
self._left_skip = 0
def write(self, data):
received = 0
n = len(data)
view = memoryview(data)
while received < n:
if self._right_leftover:
# finish previous chunk
curr = self._chunks.pop()
curr.seek(0, io.SEEK_END)
if self._right_leftover <= (n - received):
curr.write(view[received:received + self._right_leftover])
received += self._right_leftover
self._right_leftover = 0
else:
curr.write(view[received:])
self._right_leftover -= (n - received)
received = n
self._chunks.append(curr)
else:
# create new chunk
self._chunks.append(io.BytesIO())
self._right_leftover = self._chunk_size
def read(self, n):
tmp = []
sent = 0
while sent < n and self._chunks:
curr = self._chunks.popleft()
curr_len = curr.seek(0, io.SEEK_END) - self._left_skip
to_read = min((n - sent), curr_len)
curr.seek(self._left_skip)
tmp.append(curr.read(to_read))
if to_read < curr_len:
self._chunks.appendleft(curr)
self._left_skip += to_read
else:
self._left_skip = 0
if not self._chunks:
self._right_leftover = 0
sent += to_read
return b''.join(tmp)
class MovingBIO:
def __init__(self, compact_treshold=4096):
self._buff = io.BytesIO()
self._offset = 0
self._compact_treshold = compact_treshold
def write(self, data):
self.compact()
self._buff.seek(0, io.SEEK_END)
return self._buff.write(data)
def read(self, nbytes):
self._buff.seek(self._offset)
res = self._buff.read(nbytes)
self._offset += len(res)
return res
def compact(self):
self._buff.seek(0, io.SEEK_END)
oldlen = self._buff.tell()
if self._compact_treshold < self._offset < oldlen:
view = self._buff.getbuffer()
newlen = oldlen - self._offset
src = (ctypes.c_char * newlen).from_buffer(view, self._offset)
dst = (ctypes.c_char * oldlen).from_buffer(view)
ctypes.memmove(dst, src, newlen)
del src
del dst
del view
self._buff.seek(newlen)
self._buff.truncate(newlen)
self._offset = 0
def main():
print("ssl.MemoryBIO\t%.3f" %
timeit.timeit("bio(ssl.MemoryBIO)", number=10, setup="import ssl;from __main__ import bio"))
print("DequeBIO\t%.3f" %
timeit.timeit("bio(DequeBIO)", number=10, setup="from __main__ import bio, DequeBIO"))
print("BIODequeBIO\t%.3f" %
timeit.timeit("bio(BIODequeBIO)", number=10, setup="from __main__ import bio, BIODequeBIO"))
print("MovingBIO\t%.3f" %
timeit.timeit("bio(MovingBIO)", number=10, setup="from __main__ import bio, MovingBIO"))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment