Skip to content

Instantly share code, notes, and snippets.

@calebj
Created November 9, 2018 23:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calebj/7990f30d3b7ce2cb8595e4b27da24918 to your computer and use it in GitHub Desktop.
Save calebj/7990f30d3b7ce2cb8595e4b27da24918 to your computer and use it in GitHub Desktop.
from binascii import hexlify as _hexlify, unhexlify as _unhexlify
from random import Random, RECIP_BPF
import quantumrandom
import six
import threading
_longint = int if six.PY3 else long
class _QRBackgroundFetchThread(threading.Thread):
def __init__(self, qr):
self.qr = qr
self.should_fetch = threading.Event()
self.idle = threading.Event()
threading.Thread.__init__(self)
self.daemon = True
self.start()
def run(self):
while self.should_fetch.wait():
self.idle.clear()
self.qr._refresh()
self.idle.set()
self.should_fetch.clear()
class QuantumRandom(Random):
"An implementation of "
def __init__(self, x=None, cached_bytes=1024, low_water=None):
if cached_bytes:
self._buf_idx = 1024 # start uninitialized
self._buf_len = cached_bytes
self._buf_lock = threading.RLock()
self._cache_buf = bytearray(cached_bytes)
self._low_water = max(cached_bytes // 5, 128)
self._fetcher = _QRBackgroundFetchThread(self)
self._fetcher.should_fetch.set()
else:
self._cache_buf = None
Random.__init__(self, x)
def _fetch_qr(self, b):
if b > 1024:
blocks = (b + 1023) // 1024
block_size = 1024
else:
blocks = 1
block_size = b
data = quantumrandom.get_data('hex16', blocks, block_size)
return b''.join(_unhexlify(six.b(b)) for b in data)
def _refresh(self, over=0):
refresh = self._fetch_qr(self._buf_len + over)
with self._buf_lock:
self._cache_buf[:] = refresh[over:]
self._buf_idx = 0
if over:
return refresh[:over]
def _qrandom(self, b):
if self._cache_buf:
with self._buf_lock:
ret = self._cache_buf[self._buf_idx : self._buf_idx + b]
over = self._buf_idx + b - self._buf_len
if over > 0:
if self._fetcher.idle.is_set():
ret += self._refresh(over)
else:
self._fetcher.idle.wait()
ret += self._qrandom(over)
else:
self._buf_idx += b
# notify the background thread that we need more data
if self._buf_idx > self._low_water and not self._fetcher.should_fetch.is_set():
self._fetcher.should_fetch.set()
return ret
else:
return self._fetch_qr(b)
def random(self):
intstr = _hexlify(self._qrandom(7))
return (_longint(intstr, 16) >> 3) * RECIP_BPF
def getrandbits(self, k):
if k <= 0:
raise ValueError('number of bits must be greater than zero')
if k != int(k):
raise TypeError('number of bits should be an integer')
# bits / 8 and rounded up
numbytes = (k + 7) // 8
x = _longint(_hexlify(self._qrandom(numbytes)), 16)
# trim excess bits
return x >> (numbytes * 8 - k)
def seed(self, *args, **kwds):
return None
def _notimplemented(self, *args, **kwds):
raise NotImplementedError('Quantum entropy source does not have state.')
getstate = setstate = _notimplemented
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment