Skip to content

Instantly share code, notes, and snippets.

@rkern
Last active August 14, 2020 03:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rkern/9502a19c5926a139ce7deb5bca76c312 to your computer and use it in GitHub Desktop.
Save rkern/9502a19c5926a139ce7deb5bca76c312 to your computer and use it in GitHub Desktop.
Python implementation of SeedSequence
#!/usr/bin/env python
""" PRNG seed sequence implementation for np.random.
The algorithms are derived from Melissa E. O'Neill's C++11 `std::seed_seq`
implementation, as it has a lot of nice properties that we want.
https://gist.github.com/imneme/540829265469e673d045
http://www.pcg-random.org/posts/developing-a-seed_seq-alternative.html
The MIT License (MIT)
Copyright (c) 2015 Melissa E. O'Neill
Copyright (c) 2019 Robert Kern
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import abc
from itertools import cycle
import re
import secrets
import sys
import numpy as np
DECIMAL_RE = re.compile(r'[0-9]+')
DEFAULT_POOL_SIZE = 4
INIT_A = np.uint32(0x43b0d7e5)
MULT_A = np.uint32(0x931e8875)
INIT_B = np.uint32(0x8b51f9dd)
MULT_B = np.uint32(0x58f38ded)
MIX_MULT_L = np.uint32(0xca01f9dd)
MIX_MULT_R = np.uint32(0x4973f715)
XSHIFT = np.dtype(np.uint32).itemsize * 8 // 2
MASK32 = 0xFFFFFFFF
def _int_to_uint32_array(n):
arr = []
if n < 0:
raise ValueError("expected non-negative integer")
if n == 0:
arr.append(np.uint32(n))
while n > 0:
arr.append(np.uint32(n & MASK32))
n >>= 32
return np.array(arr, dtype=np.uint32)
def coerce_to_uint32_array(x):
""" Coerce an input to a uint32 array.
If a `uint32` array, pass it through directly.
If a non-negative integer, then break it up into `uint32` words, lowest
bits first.
If a string starting with "0x", then interpret as a hex integer, as above.
If a string of decimal digits, interpret as a decimal integer, as above.
If a sequence of ints or strings, interpret each element as above and
concatenate.
Note that the handling of `int64` or `uint64` arrays are not just
straightforward views as `uint32` arrays. If an element is small enough to
fit into a `uint32`, then it will only take up one `uint32` element in the
output. This is to make sure that the interpretation of a sequence of
integers is the same regardless of numpy's default integer type, which
differs on different platforms.
Parameters
----------
x : int, str, sequence of int or str
Returns
-------
seed_array : uint32 array
Examples
--------
>>> import numpy as np
>>> from seed_seq import coerce_to_uint32_array
>>> coerce_to_uint32_array(12345)
array([12345], dtype=uint32)
>>> coerce_to_uint32_array('12345')
array([12345], dtype=uint32)
>>> coerce_to_uint32_array('0x12345')
array([74565], dtype=uint32)
>>> coerce_to_uint32_array([12345, '67890'])
array([12345, 67890], dtype=uint32)
>>> coerce_to_uint32_array(np.array([12345, 67890], dtype=np.uint32))
array([12345, 67890], dtype=uint32)
>>> coerce_to_uint32_array(np.array([12345, 67890], dtype=np.int64))
array([12345, 67890], dtype=uint32)
>>> coerce_to_uint32_array([12345, 0x10deadbeef, 67890, 0xdeadbeef])
array([ 12345, 3735928559, 16, 67890, 3735928559],
dtype=uint32)
>>> coerce_to_uint32_array(1234567890123456789012345678901234567890)
array([3460238034, 2898026390, 3235640248, 2697535605, 3],
dtype=uint32)
"""
if isinstance(x, np.ndarray) and x.dtype == np.dtype(np.uint32):
return x.copy()
elif isinstance(x, str):
if x.startswith('0x'):
x = int(x, base=16)
elif DECIMAL_RE.match(x):
x = int(x)
else:
raise ValueError("unrecognized seed string")
if isinstance(x, (int, np.integer)):
return _int_to_uint32_array(x)
else:
if len(x) == 0:
return np.array([], dtype=np.uint32)
# Should be a sequence of interpretable-as-ints. Convert each one to
# a uint32 array and concatenate.
subseqs = [coerce_to_uint32_array(v) for v in x]
return np.concatenate(subseqs)
class ISeedSequence(abc.ABC):
""" Abstract base class for seed sequences.
"""
@abc.abstractmethod
def generate_state(self, n_words, dtype=np.uint32):
""" Return the requested number of words for PRNG seeding.
Parameters
----------
n_words : int
dtype : np.uint32 or np.uint64, optional
The size of each word. This should only be either `uint32` or
`uint64`. Strings (`'uint32'`, `'uint64'`) are fine. Note that
requesting `uint64` will draw twice as many bits as `uint32` for
the same `n_words`. This is a convenience for `BitGenerator`s that
express their states as `uint64` arrays.
Returns
-------
state : uint32 or uint64 array, shape=(n_words,)
"""
class SeedSequence(ISeedSequence):
def __init__(self, entropy=None, program_entropy=None, spawn_key=(),
pool_size=DEFAULT_POOL_SIZE):
if pool_size < DEFAULT_POOL_SIZE:
raise ValueError("The size of the entropy pool should be at least "
f"{DEFAULT_POOL_SIZE}")
if entropy is None:
entropy = secrets.randbits(pool_size * 32)
self.entropy = entropy
self.program_entropy = program_entropy
self.spawn_key = tuple(spawn_key)
self.pool_size = pool_size
self.pool = np.zeros(pool_size, dtype=np.uint32)
self.n_children_spawned = 0
self.mix_entropy(self.get_assembled_entropy())
def __repr__(self):
lines = [
f'{type(self).__name__}(',
f' entropy={self.entropy!r},',
]
# Omit some entries if they are left as the defaults in order to
# simplify things.
if self.program_entropy is not None:
lines.append(f' program_entropy={self.program_entropy!r},')
if self.spawn_key:
lines.append(f' spawn_key={self.spawn_key!r},')
if self.pool_size != DEFAULT_POOL_SIZE:
lines.append(f' pool_size={self.pool_size!r},')
lines.append(')')
text = '\n'.join(lines)
return text
@np.errstate(over='ignore')
def mix_entropy(self, entropy_array):
""" Mix in the given entropy.
Parameters
----------
entropy_array : 1D uint32 array
"""
hash_const = INIT_A
def hash(value):
# We are modifying the multiplier as we go along.
nonlocal hash_const
value ^= hash_const
hash_const *= MULT_A
value *= hash_const
value ^= value >> XSHIFT
return value
def mix(x, y):
# Ensure that we're emulating C++ uint32_t arithmetic correctly.
result = np.uint32(np.uint32(MIX_MULT_L * x) - np.uint32(MIX_MULT_R * y))
result ^= result >> XSHIFT
return result
mixer = self.pool
# Add in the entropy up to the pool size.
for i in range(len(mixer)):
if i < len(entropy_array):
mixer[i] = hash(entropy_array[i])
else:
# Our pool size is bigger than our entropy, so just keep
# running the hash out.
mixer[i] = hash(np.uint32(0))
# Mix all bits together so late bits can affect earlier bits.
for i_src in range(len(mixer)):
for i_dst in range(len(mixer)):
if i_src != i_dst:
mixer[i_dst] = mix(mixer[i_dst], hash(mixer[i_src]))
# Add any remaining entropy, mixing each new entropy word with each
# pool word.
for i_src in range(len(mixer), len(entropy_array)):
for i_dst in range(len(mixer)):
mixer[i_dst] = mix(mixer[i_dst], hash(entropy_array[i_src]))
# Should have modified in-place.
assert mixer is self.pool
def get_assembled_entropy(self):
""" Convert and assemble all entropy sources into a uniform uint32
array.
Returns
-------
entropy_array : 1D uint32 array
"""
# Convert run-entropy, program-entropy, and the spawn key into uint32
# arrays and concatenate them.
# We MUST have at least some run-entropy. The others are optional.
assert self.entropy is not None
run_entropy = coerce_to_uint32_array(self.entropy)
if self.program_entropy is None:
# We *could* make `coerce_to_uint32_array(None)` handle this case,
# but that would make it easier to misuse, a la
# `coerce_to_uint32_array([None, 12345])`
program_entropy = np.array([], dtype=np.uint32)
else:
program_entropy = coerce_to_uint32_array(self.program_entropy)
spawn_entropy = coerce_to_uint32_array(self.spawn_key)
entropy_array = np.concatenate([run_entropy, program_entropy,
spawn_entropy])
return entropy_array
@np.errstate(over='ignore')
def generate_state(self, n_words, dtype=np.uint32):
""" Return the requested number of words for PRNG seeding.
Parameters
----------
n_words : int
dtype : np.uint32 or np.uint64, optional
The size of each word. This should only be either `uint32` or
`uint64`. Strings (`'uint32'`, `'uint64'`) are fine. Note that
requesting `uint64` will draw twice as many bits as `uint32` for
the same `n_words`. This is a convenience for `BitGenerator`s that
express their states as `uint64` arrays.
Returns
-------
state : uint32 or uint64 array, shape=(n_words,)
"""
hash_const = INIT_B
out_dtype = np.dtype(dtype)
if out_dtype == np.dtype(np.uint32):
pass
elif out_dtype == np.dtype(np.uint64):
n_words *= 2
else:
raise ValueError("only support uint32 or uint64")
state = np.zeros(n_words, dtype=np.uint32)
src_cycle = cycle(self.pool)
for i_dst in range(n_words):
data_val = next(src_cycle)
data_val ^= hash_const
hash_const *= MULT_B
data_val *= hash_const
data_val ^= data_val >> XSHIFT
state[i_dst] = data_val
if out_dtype == np.dtype(np.uint64):
state = state.view(np.uint64)
return state
def spawn(self, n_children):
""" Spawn a number of child `SeedSequence`s by extending the
`spawn_key`.
Parameters
----------
n_children : int
Returns
-------
seqs : list of `SeedSequence`s
"""
seqs = []
for i in range(self.n_children_spawned,
self.n_children_spawned + n_children):
seqs.append(SeedSequence(
self.entropy,
program_entropy=self.program_entropy,
spawn_key=self.spawn_key + (i,),
pool_size=self.pool_size,
))
return seqs
def seed_prng(name, seed_seq):
""" Use a `SeedSequence` to initialize the given PRNG.
This is not an API I'm recommending for numpy. This is just for
bootstrapping this demo.
"""
def _pcg64_cons(arr):
""" For some reason, I can't pass in a 128-bit integer to the
constructor, so we'll just punch one in through the state.
"""
bitgen = np.random.PCG64()
state = bitgen.state.copy()
state['state']['state'] = int(arr[0]) + int(arr[1]) * (1<<64)
# The author now recommends using seed entropy to set the increment as well.
state['state']['inc'] = (int(arr[2]) + int(arr[3]) * (1<<64)) | 1
bitgen.state = state
return bitgen
def _pcg32_cons(arr):
bitgen = np.random.PCG32(int(arr[0]), inc=int(arr[1]) | 1)
return bitgen
def _mt19937_cons(arr):
bitgen = np.random.MT19937()
state = bitgen.state.copy()
state['state']['key'] = arr
bitgen.state = state
return bitgen
n_words, dtype, constructor = {
'PCG32': (2, 'uint64', _pcg32_cons),
'PCG64': (4, 'uint64', _pcg64_cons),
'Xoshiro256': (4, 'uint64', np.random.Xoshiro256),
'Xoshiro512': (8, 'uint64', np.random.Xoshiro512),
'ThreeFry': (4, 'uint64', lambda key: np.random.ThreeFry(key=key)),
'Philox': (2, 'uint64', lambda key: np.random.Philox(key=key)),
'MT19937': (624, 'uint32', _mt19937_cons),
# DSFMT has a very particular state configuration, so just give it 256
# bits and let its seeding work things out.
'DSFMT': (8, 'uint32', np.random.DSFMT),
}[name]
bitgen = constructor(seed_seq.generate_state(n_words, dtype))
return bitgen
class SeededGenerator(np.random.Generator):
""" Prototype of `Generator` API incorporating `SeedSequence`.
"""
def __init__(self, bit_generator, seed_seq=None):
# FIXME: seed_seq will probably be owned by the BitGenerator, not
# Generator, but for now, it's easiest to subclass just Generator for
# demo purposes.
super().__init__(bit_generator)
self._seed_seq = seed_seq
@property
def seed(self):
if self._seed_seq is None:
return None
return getattr(self._seed_seq, 'entropy', None)
def spawn(self, n_children):
child_seeds = self._seed_seq.spawn(n_children)
# FIXME: placeholder. Actually, we would just pass the children
# SeedSequences to the BitGenerator constructors when they accept them.
bitgen_name = type(self.bit_generator).__name__
child_bitgens = [SeededGenerator(seed_prng(bitgen_name, seed), seed)
for seed in child_seeds]
return child_bitgens
def default_gen(seed=None):
if isinstance(seed, SeedSequence):
seed_seq = seed
else:
seed_seq = SeedSequence(seed)
bitgen = seed_prng('PCG64', seed_seq)
gen = SeededGenerator(bitgen, seed_seq)
return gen
def gen_interleaved_bytes(gens, n_per_gen=1024, output_dtype=np.uint32):
while True:
draws = [g.integers(np.iinfo(output_dtype).max, dtype=output_dtype,
endpoint=True, size=n_per_gen)
for g in gens]
interleaved = np.column_stack(draws).ravel()
bytes_chunk = bytes(interleaved.data)
yield bytes_chunk
def main():
import argparse
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
ALGORITHMS = [
'PCG32',
'PCG64',
'Xoshiro256',
'Xoshiro512',
'ThreeFry',
'Philox',
'MT19937',
'DSFMT',
]
parser.add_argument('-s', '--seed', type=int, help='The root seed.')
parser.add_argument('-d', '--depth', type=int, default=4,
help='The depth of the spawn tree.')
parser.add_argument('-p', '--ply', type=int, default=8,
help='The number of spawns at each level.')
parser.add_argument('--dtype', choices=['uint32', 'uint64'], default='uint32',
help='The output dtype.')
parser.add_argument('-a', '--algorithm', choices=ALGORITHMS,
default='PCG64',
help='The PRNG algorithm to test.')
args = parser.parse_args()
ss = SeedSequence(args.seed)
root = SeededGenerator(seed_prng(args.algorithm, ss), ss)
# See? We have easy access to the original seed. The user can copy-paste
# that and use it on the CLI later. Print it to stderr because RNG_test is
# consuming stdout.
print(f"seed = {root.seed}", file=sys.stderr)
# Generate `ply ** depth` leaf `SeededGenerators` by the `spawn()` API.
leaves = []
nodes = [root]
for i in range(args.depth):
children = []
for node in nodes:
children.extend(node.spawn(args.ply))
nodes = children
gens = nodes
dtype = np.dtype(args.dtype)
for chunk in gen_interleaved_bytes(gens, output_dtype=dtype):
sys.stdout.buffer.write(chunk)
if __name__ == '__main__':
main()
@mattip
Copy link

mattip commented Jun 13, 2019

@bashtage do you have any thoughts? I would be happy to do the work to get this in and use it to replace .seed() once we have buy-in.

@bashtage
Copy link

I think this is heading in as the preferred method to handle seeding. @rkern are you planning on a PR?

@mattip
Copy link

mattip commented Jun 13, 2019

I can do the PR, just wanted to make sure it is accepted

@mattip
Copy link

mattip commented Jun 14, 2019

@bashtage, @rkern PR numpy/numpy#13780 is progressing. How can I make sure the code is "correct"? Is there a test one can do with the output of the chunks above?

@rkern
Copy link
Author

rkern commented Jun 15, 2019

Good question. Your implementation and mine seem to give different results, both of which differ from the C++ implementation. The latter may be an error in my C++ driver, though, so I'll track that down.

@rkern
Copy link
Author

rkern commented Jun 15, 2019

@rkern
Copy link
Author

rkern commented Jun 15, 2019

Ah, found the problem. My code was relying on using np.uint32 scalar objects in generate_state() to emulate C uint32_t arithmetic, and that worked because all the objects came from the explicit np.uint32 module-level constants or from indexing np.uint32 arrays. When you turned the module-level constants to cdef uint32_t Cython constants, the undeclared function-level variables in generate_state() are just plain-old Python objects that happen to be ints, not np.uint32 objects. That's why you had to add in that & MASK32 that I didn't have.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment