public
Last active

b2a_bin in Cython (a-la binascii.hexlify but for binary ("01") strings)

  • Download Gist
.gitignore
1 2 3 4
/b2a_bin.c
/b2a_bin.html
/b2a_bin*.so
/build/*
Makefile
Makefile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
name=b2a_bin
py:=python3
 
default: test_$(name).py
$(py) $<
 
benchmark: test_$(name).py
$(py) $< -b
 
test_$(name).py: ext
 
 
ext: setup.py $(name).c
$(py) setup.py build_ext --inplace
 
%.c: %.pyx
cython $< -o $@
 
clean:
-rm $(name)*.so $(name).c $(name).html $(name).pyc
-rm build __pycache__ -R
 
distclean:
git clean -d -x -f
 
# $@ - current target
# $* '%'-part (works if there *is* '%' in specification)
# $< first dependence
# $^ all dependencies (without duplicates)
b2a_bin.pyx
Cython
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
from cpython.bytes cimport PyBytes_FromStringAndSize
from cython.parallel cimport prange
 
cdef extern from "Python.h":
cdef Py_ssize_t PY_SSIZE_T_MAX
 
def b2a_bin(bytes data, Py_ssize_t _parallel_cutoff=1<<20):
"""Convert data to "01"-string.
 
if data is a non-empty bytes object it is equivalent to:
 
bin(int.from_bytes(data, 'big', signed=False)
)[2:].zfill(len(data)*8).encode('ascii', 'strict')
"""
cdef Py_ssize_t datalen = len(data)
if datalen > PY_SSIZE_T_MAX // 8:
raise MemoryError
cdef:
cdef char* databuf = data # no copy
bytes retval = PyBytes_FromStringAndSize(NULL, datalen*8)
char* resbuf = retval # no copy
unsigned char byte
Py_ssize_t pos, i
char* s01 = "01"
if datalen < _parallel_cutoff: # don't bother with threads
for i in range(datalen):
byte = databuf[i]
for pos in range(8):
resbuf[8*i + (7-pos)] = s01[(byte >> pos) & 1] # big-endian
else:
with nogil:
for i in prange(datalen):
byte = databuf[i]
for pos in range(8):
resbuf[8*i + (7-pos)] = s01[(byte >> pos) & 1]
return retval
setup.py
Python
1 2 3 4 5 6 7 8 9
from distutils.core import setup
from distutils.extension import Extension
 
setup(name='b2a_bin',
ext_modules=[Extension('b2a_bin', ['b2a_bin.c'],
extra_compile_args=['-fopenmp'],
extra_link_args=['-fopenmp']
)],
)
test_b2a_bin.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
#!/usr/bin/env python3
r"""Test and measure performance of b2a_bin*() functions.
 
>>> b2a_bin(b'\x0a\x0b')
b'0000101000001011'
"""
import binascii
import doctest
import os
import unittest
from timeit import Timer
 
from b2a_bin import b2a_bin
 
def b2a_bin_bin(data):
return bin(int.from_bytes(data, 'big', signed=False)
)[2:].zfill(len(data)*8).encode('ascii', 'strict')
 
 
def b2a_bin_format(data):
n = int.from_bytes(data, 'big', signed=False)
return "{:0{}b}".format(n, len(data)*8).encode('ascii', 'strict')
 
 
class Test_b2a_bin(unittest.TestCase):
"""Unit-tests for b2a_bin function."""
def _test(self, data, *args):
self.assertEqual(b2a_bin(data), b2a_bin_bin(data), *args)
 
def test_endianess(self):
self._test(binascii.unhexlify(b'cafe0000'))
self._test(binascii.unhexlify(b'0000babe'))
 
def test_near_power_2(self):
for p in range(100):
n = 2**p
for i in range(n - 10, n + 11):
for size in reversed(range(1, p+2)):
try: b = i.to_bytes(size, 'big', signed=False)
except OverflowError: break
else: self._test(b, (b, i, size))
 
def test_nonbytes_arg(self):
with self.assertRaises(TypeError):
self._test(bytearray(range(10)))
 
def test_empty_arg(self):
self.assertEqual(b2a_bin(b''), b'')
 
 
def run_tests(verbosity=0):
"""Run unit-tests."""
import test_b2a_bin
suite = unittest.TestLoader().loadTestsFromTestCase(
test_b2a_bin.Test_b2a_bin)
suite.addTests(doctest.DocTestSuite(test_b2a_bin))
res = unittest.TextTestRunner(verbosity=verbosity).run(suite)
return not (res.failures or res.errors) # return whether passed the tests
 
 
def test_functions(functions):
"""Test that all functions produces the same result."""
data = os.urandom(2000)
res = functions[0](data)
for f in functions[1:]:
assert f(data) == res, f
 
 
def benchmark(functions):
"""Compare performace of given functions.
 
The results are similar to:
 
$ python -mtimeit -s "import os; from test_b2a_bin import b2a_bin as f; data=os.urandom(1000*1000)" "f(data)"
"""
for data in map(os.urandom, [1, 1000, 1000*1000, 10*1000*1000]):
print("os.urandom(%s)" % (len(data),))
for func in functions:
n = max(1, 1000*1000//len(data))
t = timeit(func, data, number=n)
print("\t%-20s %s" % (func.__name__, human_seconds(t)))
 
 
def human_seconds(seconds, fmt="%.3g %s"):
"""Return human-readable string that represents given seconds."""
t = 1e6*seconds # start with ┬Ásec
for suff in "usec msec".split():
if t < 1000:
return fmt % (t, suff)
t /= 1000
return fmt % (t, "sec")
 
 
def timeit(func, data, number=1000000, repeat=3):
"""Measure how long func(data) takes.
 
number, repeat have the meaning as in timeit.Timer.repeat() method
"""
t = Timer(stmt="func(%r)" % (data,),
setup="from __main__ import %s as func" % (func.__name__,))
return min(t.repeat(number=number, repeat=repeat)) / number
 
 
if __name__=="__main__":
import sys
if run_tests(): # passed unit-tests
functions = [b2a_bin, b2a_bin_bin, b2a_bin_format]
test_functions(functions)
if '-b' in sys.argv:
benchmark(functions)
 
"""
----------------------------------------------------------------------
parallel_cutoff=0
Ran 5 tests in 2.150s
 
OK
os.urandom(1)
b2a_bin 3.07 usec
b2a_bin_bin 1.6 usec
b2a_bin_format 2.43 usec
os.urandom(1000)
b2a_bin 5.23 usec
b2a_bin_bin 31.3 usec
b2a_bin_format 35.3 usec
os.urandom(1000000)
b2a_bin 9.95 msec
b2a_bin_bin 52.1 msec
b2a_bin_format 74.9 msec
os.urandom(10000000)
b2a_bin 44.5 msec
b2a_bin_bin 634 msec
b2a_bin_format 803 msec
 
----------------------------------------------------------------------
parallel_cutoff=1<<20:
Ran 5 tests in 0.695s
 
OK
os.urandom(1)
b2a_bin 0.0856 usec
b2a_bin_bin 1.57 usec
b2a_bin_format 2.43 usec
os.urandom(1000)
b2a_bin 9.49 usec
b2a_bin_bin 31.2 usec
b2a_bin_format 33.8 usec
os.urandom(1000000)
b2a_bin 9.74 msec
b2a_bin_bin 53 msec
b2a_bin_format 75.5 msec
os.urandom(10000000)
b2a_bin 59.1 msec
b2a_bin_bin 620 msec
b2a_bin_format 799 msec
 
----------------------------------------------------------------------
_parallel_cutoff=100000000:
Ran 5 tests in 0.568s
 
OK
os.urandom(1)
b2a_bin 0.0875 usec
b2a_bin_bin 1.6 usec
b2a_bin_format 2.29 usec
os.urandom(1000)
b2a_bin 9.38 usec
b2a_bin_bin 32.9 usec
b2a_bin_format 33.4 usec
os.urandom(1000000)
b2a_bin 9.36 msec
b2a_bin_bin 60.8 msec
b2a_bin_format 81.5 msec
os.urandom(10000000)
b2a_bin 123 msec
b2a_bin_bin 803 msec
b2a_bin_format 803 msec
 
"""

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.