Create a gist now

Instantly share code, notes, and snippets.

b2a_bin in Cython (a-la binascii.hexlify but for binary ("01") strings)
/b2a_bin.c
/b2a_bin.html
/b2a_bin*.so
/build/*
from cpython.bytes cimport PyBytes_FromStringAndSize
from cython.parallel cimport prange
cdef extern from "Python.h":
cdef Py_ssize_t PY_SSIZE_T_MAX
def b2a_bin(bytes data, Py_ssize_t _parallel_cutoff=1<<20):
"""Convert data to "01"-string.
if data is a non-empty bytes object it is equivalent to:
bin(int.from_bytes(data, 'big', signed=False)
)[2:].zfill(len(data)*8).encode('ascii', 'strict')
"""
cdef Py_ssize_t datalen = len(data)
if datalen > PY_SSIZE_T_MAX // 8:
raise MemoryError
cdef:
cdef char* databuf = data # no copy
bytes retval = PyBytes_FromStringAndSize(NULL, datalen*8)
char* resbuf = retval # no copy
unsigned char byte
Py_ssize_t pos, i
char* s01 = "01"
if datalen < _parallel_cutoff: # don't bother with threads
for i in range(datalen):
byte = databuf[i]
for pos in range(8):
resbuf[8*i + (7-pos)] = s01[(byte >> pos) & 1] # big-endian
else:
with nogil:
for i in prange(datalen):
byte = databuf[i]
for pos in range(8):
resbuf[8*i + (7-pos)] = s01[(byte >> pos) & 1]
return retval
name=b2a_bin
py:=python3
default: test_$(name).py
$(py) $<
benchmark: test_$(name).py
$(py) $< -b
test_$(name).py: ext
ext: setup.py $(name).c
$(py) setup.py build_ext --inplace
%.c: %.pyx
cython $< -o $@
clean:
-rm $(name)*.so $(name).c $(name).html $(name).pyc
-rm build __pycache__ -R
distclean:
git clean -d -x -f
# $@ - current target
# $* '%'-part (works if there *is* '%' in specification)
# $< first dependence
# $^ all dependencies (without duplicates)
from distutils.core import setup
from distutils.extension import Extension
setup(name='b2a_bin',
ext_modules=[Extension('b2a_bin', ['b2a_bin.c'],
extra_compile_args=['-fopenmp'],
extra_link_args=['-fopenmp']
)],
)
#!/usr/bin/env python3
r"""Test and measure performance of b2a_bin*() functions.
>>> b2a_bin(b'\x0a\x0b')
b'0000101000001011'
"""
import binascii
import doctest
import os
import unittest
from timeit import Timer
from b2a_bin import b2a_bin
def b2a_bin_bin(data):
return bin(int.from_bytes(data, 'big', signed=False)
)[2:].zfill(len(data)*8).encode('ascii', 'strict')
def b2a_bin_format(data):
n = int.from_bytes(data, 'big', signed=False)
return "{:0{}b}".format(n, len(data)*8).encode('ascii', 'strict')
class Test_b2a_bin(unittest.TestCase):
"""Unit-tests for b2a_bin function."""
def _test(self, data, *args):
self.assertEqual(b2a_bin(data), b2a_bin_bin(data), *args)
def test_endianess(self):
self._test(binascii.unhexlify(b'cafe0000'))
self._test(binascii.unhexlify(b'0000babe'))
def test_near_power_2(self):
for p in range(100):
n = 2**p
for i in range(n - 10, n + 11):
for size in reversed(range(1, p+2)):
try: b = i.to_bytes(size, 'big', signed=False)
except OverflowError: break
else: self._test(b, (b, i, size))
def test_nonbytes_arg(self):
with self.assertRaises(TypeError):
self._test(bytearray(range(10)))
def test_empty_arg(self):
self.assertEqual(b2a_bin(b''), b'')
def run_tests(verbosity=0):
"""Run unit-tests."""
import test_b2a_bin
suite = unittest.TestLoader().loadTestsFromTestCase(
test_b2a_bin.Test_b2a_bin)
suite.addTests(doctest.DocTestSuite(test_b2a_bin))
res = unittest.TextTestRunner(verbosity=verbosity).run(suite)
return not (res.failures or res.errors) # return whether passed the tests
def test_functions(functions):
"""Test that all functions produces the same result."""
data = os.urandom(2000)
res = functions[0](data)
for f in functions[1:]:
assert f(data) == res, f
def benchmark(functions):
"""Compare performace of given functions.
The results are similar to:
$ python -mtimeit -s "import os; from test_b2a_bin import b2a_bin as f; data=os.urandom(1000*1000)" "f(data)"
"""
for data in map(os.urandom, [1, 1000, 1000*1000, 10*1000*1000]):
print("os.urandom(%s)" % (len(data),))
for func in functions:
n = max(1, 1000*1000//len(data))
t = timeit(func, data, number=n)
print("\t%-20s %s" % (func.__name__, human_seconds(t)))
def human_seconds(seconds, fmt="%.3g %s"):
"""Return human-readable string that represents given seconds."""
t = 1e6*seconds # start with µsec
for suff in "usec msec".split():
if t < 1000:
return fmt % (t, suff)
t /= 1000
return fmt % (t, "sec")
def timeit(func, data, number=1000000, repeat=3):
"""Measure how long func(data) takes.
number, repeat have the meaning as in timeit.Timer.repeat() method
"""
t = Timer(stmt="func(%r)" % (data,),
setup="from __main__ import %s as func" % (func.__name__,))
return min(t.repeat(number=number, repeat=repeat)) / number
if __name__=="__main__":
import sys
if run_tests(): # passed unit-tests
functions = [b2a_bin, b2a_bin_bin, b2a_bin_format]
test_functions(functions)
if '-b' in sys.argv:
benchmark(functions)
"""
----------------------------------------------------------------------
parallel_cutoff=0
Ran 5 tests in 2.150s
OK
os.urandom(1)
b2a_bin 3.07 usec
b2a_bin_bin 1.6 usec
b2a_bin_format 2.43 usec
os.urandom(1000)
b2a_bin 5.23 usec
b2a_bin_bin 31.3 usec
b2a_bin_format 35.3 usec
os.urandom(1000000)
b2a_bin 9.95 msec
b2a_bin_bin 52.1 msec
b2a_bin_format 74.9 msec
os.urandom(10000000)
b2a_bin 44.5 msec
b2a_bin_bin 634 msec
b2a_bin_format 803 msec
----------------------------------------------------------------------
parallel_cutoff=1<<20:
Ran 5 tests in 0.695s
OK
os.urandom(1)
b2a_bin 0.0856 usec
b2a_bin_bin 1.57 usec
b2a_bin_format 2.43 usec
os.urandom(1000)
b2a_bin 9.49 usec
b2a_bin_bin 31.2 usec
b2a_bin_format 33.8 usec
os.urandom(1000000)
b2a_bin 9.74 msec
b2a_bin_bin 53 msec
b2a_bin_format 75.5 msec
os.urandom(10000000)
b2a_bin 59.1 msec
b2a_bin_bin 620 msec
b2a_bin_format 799 msec
----------------------------------------------------------------------
_parallel_cutoff=100000000:
Ran 5 tests in 0.568s
OK
os.urandom(1)
b2a_bin 0.0875 usec
b2a_bin_bin 1.6 usec
b2a_bin_format 2.29 usec
os.urandom(1000)
b2a_bin 9.38 usec
b2a_bin_bin 32.9 usec
b2a_bin_format 33.4 usec
os.urandom(1000000)
b2a_bin 9.36 msec
b2a_bin_bin 60.8 msec
b2a_bin_format 81.5 msec
os.urandom(10000000)
b2a_bin 123 msec
b2a_bin_bin 803 msec
b2a_bin_format 803 msec
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment