Last active
January 25, 2025 15:11
-
-
Save pirate/7e44387c12f434a77072d50c52a3d18e to your computer and use it in GitHub Desktop.
Pure python all-in-one UUIDv7 Implementation with graceful degradation between ms, µs, and ns timestamp precision
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# A single-file pure-python implementation of UUIDv7: (e.g. 01941230-851a-77fd-9b0b-8c8eac3b2d23) | |
# - makes sure UUIDv7s are always generated in alphabetic order (using system time + nanoseconds + extra monotonic random bits) | |
# - store millisecond, microsecond, nanosecond / variable precision timestamps all using same format | |
# - graceful degradation in precision, falls back to monotonic urandom bytes depending on user-provided timestamp precision | |
# - fully compatible with standard UUIDv7 spec (48 bit millisecond unix epoch time with rand_a & rand_b monotonic randomness) | |
# - allows you to generate a UUIDv7 with a given timestamp (of any precision), and parse the timestamp back out from any UUIDv7 | |
# - helps guarantee that UUIDv7s generated back-to-back in the same thread are always monotonically sortable | |
# - helps lower the risk of UUIDv7s colliding with other UUIDv7s generated in other threads / on other machines | |
# honestly this is basically just ULID reinvented inside a UUID form factor. | |
import time | |
import math | |
import secrets | |
import concurrent.futures | |
from typing_extensions import Self | |
from datetime import datetime, timezone | |
from uuid import UUID, uuid4 | |
class UUID7(UUID): | |
@classmethod | |
def from_parts(cls, timestamp: int | float | datetime | str | None = None, suffix: UUID | str | None = None, ns: int=0) -> Self: | |
""" | |
The 128 bits in the UUID are allocated as follows, differing slightly from standard UUIDv7 in that it uses | |
rand_a and part of rand_b for extra timing precision (but fully compatible with UUIDv7 spec): | |
- 48 bits of milliseconds since unix epoch (or less common: 36 bits of whole seconds + 12+12+12) | |
- 4 bits for the uuid version (0b111) | |
- 12 bits of µs ts precision (filling rand_a) | |
- 4 bits for the uuid variant & 2 bits of rand (0b10xx) (defined by RFC4122 & RFC9562) | |
- 12 bits of ns ts precision / monotonic randomness (filling leftmost 12 bits of rand_b) | |
- 48 bits of random suffix bits (filling rightmost 48 bits of rand_b) | |
0 1 2 3 | |
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
t1 | milliseconds since epoch (48 bits) | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
t2/t3 | ... ms since epoch (48 bits) | ver | ts µs (12 bits) | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
t4/rand |var|rnd| ts ns (12 bits) | suffix rand (48 bits) | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
rand | ...suffix rand (48 bits) | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
total = 128 bits = 32 chars of hex (36 with dashes) | |
idx 0 8 12 16 20 32 | |
1. [epoch ms] 48bits = 12 chars [01941230-851a] - 77fd - 9b0b-8c8eac3b2d23 | |
3. [version] 4bits = 1 char 01941230-851a -[7]7fd - 9b0b-8c8eac3b2d23 | |
4. [final_µs] 12bits = 3 chars 01941230-851a - 7[7fd]- 9b0b-8c8eac3b2d23 | |
5. [2 variant+2 rand] 4bits = 1 char 01941230-851a - 77fd -[9]b0b-8c8eac3b2d23 | |
6. [final_ns] 12bits = 3 chars 01941230-851a - 77fd - 9[b0b]-8c8eac3b2d23 | |
7. [suffix rand] 48bits = 12 chars 01941230-851a - 77fd - 9b0b-[8c8eac3b2d23] | |
SQLite equivalent: | |
select | |
-- timestamp | |
format('%08x', ((strftime('%s') * 1000) >> 16)) || '-' || | |
format('%04x', ((strftime('%s') * 1000) | |
+ ((strftime('%f') * 1000) % 1000)) & 0xffff) || '-' || | |
-- version / rand_a | |
format('%04x', 0x7000 + abs(random()) % 0x0fff) || '-' || | |
-- variant / rand_b | |
format('%04x', 0x8000 + abs(random()) % 0x3fff) || '-' || | |
-- rand_b | |
format('%012x', abs(random()) >> 16) as value; | |
For more, see: https://antonz.org/uuidv7/ | |
""" | |
if timestamp is None: | |
# Use current time with µs precision | |
timestamp = datetime.now(tz=timezone.utc) | |
if suffix is None: | |
# generate a random suffix if none is provided | |
suffix = uuid4() | |
user_datetime, user_ns = parse_timestamp(timestamp, ns) | |
# parse user-provided timestamp into separate seconds, milliseconds, microseconds chunks | |
assert isinstance(user_datetime, datetime) and datetime(year=1970, month=1, day=1, hour=1, minute=1, second=1, tzinfo=timezone.utc) < user_datetime < datetime(year=3000, month=1, day=1, hour=1, minute=1, second=1, tzinfo=timezone.utc), f'{user_datetime=} is not a valid datetime' | |
assert isinstance(user_ns, int) and 0 <= user_ns < 1000, f'{user_ns=} is not a valid nanoseconds int value' | |
user_seconds = int(user_datetime.timestamp()) | |
user_ms = int(user_datetime.timestamp() * 1000) % 1000 | |
user_µs = int(user_datetime.timestamp() * 1000*1000) % 1000 | |
# parse hardware-provided nanosecond resolution timestamp into seconds, milliseconds, microseconds chunks | |
sys_clock_ns = time.monotonic_ns() # 1735462083_894_528_000 | |
sys_ms = int(sys_clock_ns / 1_000_000) % 1000 | |
sys_µs = int(sys_clock_ns / 1_000) % 1000 | |
sys_ns = (int(sys_clock_ns) % 1000) or secrets.randbelow(1000) # fallback to random numbers if hardware doesn't provide nanoseconds | |
# combine user-provided and system-provided timestamps into final timestamp | |
final_s = user_seconds | |
final_ms = user_ms or 0 | |
final_µs = user_µs or 0 | |
final_ns = user_ns or sys_ns or secrets.randbelow(1000) or secrets.randbelow(1000) or secrets.randbelow(1000) or secrets.randbelow(1000) # should never be 0, that's reserved for events with manually created UUIDs | |
# trim suffix hex str to 12 rightmost characters | |
suffix_str = str(suffix).replace('-', '')[-12:] | |
# set variant to a character 0b10xx where x are random bits | |
variant_char = 8 # (secrets.randbits(4) | (1<<1)) & ~(1<<0) | |
# OPTIONAL alternative behavior: store timestamp as (36 bit whole seconds integer, 12 bit ms, 12 bit µs, 12 bit ns) | |
# ts_sec_hex = f'{final_s:09x}' # 36 bit 9 character whole seconds | |
# ts_ms_hex = f'{final_ms:03x}' # 12 bit 3 character milliseconds | |
# CURRENT behavior: store timestamp as (48 bit millisecond integer, 12 bit µs, 12 bit ns) | |
ts_sec_hex = '' | |
ts_ms_hex = f'{((final_s*1000)+final_ms):012x}' # 48 bit 12 character millisecond integer | |
version_hex = '7' # 4 bit 1 character version | |
ts_µs_hex = f'{final_µs:03x}' # 12 bit 3 character microseconds | |
variant_hex = f'{variant_char:01x}' # 4 bit 2 bits of variant + 2 bits of randomness | |
ts_ns_hex = f'{final_ns:03x}' # 12 bit 3 character nanoseconds | |
suffix_hex = suffix_str # 48 bit 12 character suffix | |
# assemble full hex [01941230-851a]-[7] [7fd] - [9] [b0b] - [8c8eac3b2d23] | |
hex_full = f'{ts_sec_hex}{ts_ms_hex}{version_hex}{ts_µs_hex}{variant_hex}{ts_ns_hex}{suffix_hex}' | |
hex_full_dashed = f'{hex_full[:8]}-{hex_full[8:12]}-{hex_full[12:16]}-{hex_full[16:20]}-{hex_full[20:]}' | |
# make sure parts are all the right length | |
assert len(ts_sec_hex+ts_ms_hex) == 12, f'{ts_sec_hex+ts_ms_hex} is not 12 chars long' | |
assert len(version_hex) == 1, f'{version_hex} is not 1 char long' | |
assert len(ts_µs_hex) == 3, f'{ts_µs_hex} is not 3 chars long' | |
assert len(variant_hex) == 1, f'{variant_hex} is not 1 char long' | |
assert len(ts_ns_hex) == 3, f'{ts_ns_hex} is not 3 chars long' | |
assert len(suffix_hex) == 12, f'{suffix_hex} is not 12 chars long' | |
assert ts_ns_hex != '000', f'{ts_ns_hex=} has zeros' | |
# make sure the fully assembled hex string is a valid UUID hex string | |
assert len(hex_full_dashed) == 36 and all(c in '0123456789abcdef-' for c in hex_full_dashed), f'{hex_full_dashed=} is not a valid UUID7 hex string' | |
assert hex_full_dashed.count('-') == 4, f'{hex_full_dashed=} has {hex_full_dashed.count("-")} dashes, not 4' | |
# make sure version bit is set to 7 | |
assert hex_full_dashed[14] == '7' | |
# make sure variant bit is set to 0b10 | |
assert hex_full_dashed[19] in ('8', '9', 'a', 'b'), f'{hex_full_dashed[18]} is not a valid variant bit (8, 9, a, b)' # https://antonz.org/uuidv7/ | |
return cls(hex=hex_full) | |
def summary(self) -> str: | |
return f'{self.seconds}s {self.milliseconds:03d}ms {self.microseconds:03d}µs {self.nanoseconds:03d}ns' | |
@property | |
def date(self) -> datetime: | |
hex_str = str(self).replace('-', '') | |
# extract the first 48 bits (12 chars) as milliseconds unix epoch timestamp | |
ms_ts = int(hex_str[:12], 16) | |
# extract the µs precision from the next 12 bits after the version | |
µs_ts = int(hex_str[13:16], 16) | |
# create python datetime object from float seconds.msµs | |
return datetime.fromtimestamp(ms_ts/1_000 + µs_ts/1_000_000, tz=timezone.utc) | |
@property | |
def ts(self) -> float: | |
return float(self.ts_str) | |
@property | |
def ts_str(self) -> str: | |
"""UUIDv7(timestamp=1735689599.123456789) -> 1735689599.123456789""" | |
return f'{self.seconds}.{self.milliseconds:03d}{self.microseconds:03d}{self.nanoseconds:03d}' | |
@property | |
def seconds(self) -> int: | |
"""UUIDv7(timestamp=1735689599.123456789) -> 1735689599""" | |
hex_str = str(self).replace('-', '') | |
ms_ts = int(hex_str[:12], 16) | |
return int(ms_ts/1000) | |
@property | |
def milliseconds(self) -> int: | |
"""UUIDv7(timestamp=1735689599.123456789) -> 123""" | |
hex_str = str(self).replace('-', '') | |
ms_ts = int(hex_str[:12], 16) | |
return int(ms_ts) % 1000 | |
@property | |
def microseconds(self) -> int: | |
"""UUIDv7(timestamp=1735689599.123456789) -> 456""" | |
hex_str = str(self).replace('-', '') | |
return int(hex_str[13:16], 16) | |
@property | |
def nanoseconds(self) -> int: | |
"""UUIDv7(timestamp=1735689599.123456789) -> 789""" | |
hex_str = str(self).replace('-', '') # 019412e4-7611-71a8-12ea-deadbeefdead -> 019412e4761171a812eadeadbeefdead -> 2ea | |
return int(hex_str[17:20], 16) | |
@property | |
def suffix(self) -> str: | |
"""UUIDv7('01941f29-7886-7065-d170-deadbeefdead') -> 'deadbeefdead'""" | |
return str(self)[-12:] | |
def truncate_float(n: float, places: int) -> float: | |
return int(n * (10 ** places)) / 10 ** places | |
def parse_timestamp(ts: int | float | datetime | str | None, ns: int=0) -> tuple[datetime, int]: | |
if isinstance(ts, datetime): | |
return ts, ns | |
if isinstance(ts, str): | |
# ts is a ISO 8601 formatted datetime string, return it immediately as it's unambiguously a datetime | |
if '-' in ts: | |
return datetime.fromisoformat(ts), ns # ISO 8601 don't provide ns-level resolution, so it's usually 0 here | |
# ts is a seconds.msµsns int or float string with up to 9 decimal places | |
# DO NOT CAST DIRECTLY TO FLOAT, floats are not precise enough for nanoseconds, split nanoseconds out using string manipulation first | |
elif ts.replace('.', '').isdigit(): | |
decimals = int(ts.split('.')[-1])) | |
ts = truncate_float(float(ts), 6) # X.XXXXXX000 get just first part of ts seconds.msµs (safe to store as float), used to construct python datetime | |
ns = (decimals * 10**9) % 1000 # 0.000000XXX get just this part, needs to be returned separately becuase it doesnt fit in float precision | |
if isinstance(ts, (int, float)): | |
try: | |
# try parsing as seconds.msµsns float or seconds int | |
ts = datetime.fromtimestamp(float(ts), tz=timezone.utc) | |
except Exception: | |
try: | |
# try parsing as milliseconds.µsns float or milliseconds int | |
ts = datetime.fromtimestamp(float(ts)/1000, tz=timezone.utc) | |
except Exception: | |
try: | |
# try parsing as microseconds.ns | |
ts = datetime.fromtimestamp(float(ts)/1000/1000, tz=timezone.utc) | |
except Exception: | |
# try parsing as nanoseconds int | |
ts = datetime.fromtimestamp(int(ts)/1000/1000/1000, tz=timezone.utc) | |
ns = int(ts) % 1000 # extract nanoseconds digits to return separately | |
assert isinstance(ts, datetime) | |
return ts, ns # seconds.msµs (since 1970 unix epoch), ns (just the nanosecond/rand digits of the timestamp) | |
def generate_uuid7_with_precision(precision: str) -> UUID7: | |
now = time.time() | |
if precision == 'second': | |
ts = int(now) | |
elif precision == 'millisecond': | |
ts = now * 1000 | |
elif precision == 'microsecond': | |
ts = now * 1_000_000 | |
else: # nanosecond | |
ts = int(time.time_ns()) | |
return UUID7.from_parts(ts, suffix=uuid4()) | |
def run_tests(): | |
# Test specific precision cases | |
test_cases = [ | |
# Second precision | |
(datetime(2024, 12, 31, 23, 59, 59, tzinfo=timezone.utc), "Second-precision"), | |
(datetime(2024, 12, 31, 23, 59, 59, 999000, tzinfo=timezone.utc), "Millisecond-precision"), | |
(datetime(2024, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc), "Microsecond-precision"), | |
(1735689599999999111, "ns-int-precision"), | |
(1735689599999999.111, "µs-float-precision"), | |
(1735689599999.999111, "ms-float-precision"), | |
(1735689599.999999111, "s-float-precision"), | |
# now | |
(datetime.now(tz=timezone.utc), "NOW") | |
] | |
print("\nTesting different precision levels:") | |
for ts, desc in test_cases: | |
suffix = UUID("deadbeef-dead-beef-8ead-deadbeefdead") | |
print(f"\n{desc}:") | |
print('------------------------------------------------------------------------') | |
print(f"Input timestamp: {parse_timestamp(ts)[0].isoformat()} + {parse_timestamp(ts)[1]}ns") | |
uuid = UUID7.from_parts(ts, suffix=suffix) | |
print(f"Final UUIDv7: {uuid}") | |
print() | |
print(f"Decoded Timestamp: [{uuid.summary()}]") | |
print(f"Decoded date: {uuid.date.isoformat()} + {uuid.nanoseconds}ns") | |
print(f"Timestamp diff: {(uuid.date - parse_timestamp(ts)[0]).total_seconds():.6f} seconds") | |
# Test single-threaded UUID7 generation | |
print("\nTesting single-threaded UUID7 generation for 500,000 UUIDs...") | |
precisions = ['second', 'millisecond', 'microsecond', 'nanosecond'] | |
uuids = [] | |
for _ in range(500_000): | |
uuid = UUID7.from_parts() | |
if uuids and str(uuid) < uuids[-1]: | |
print(f"UUID {str(uuid)} is not in order, should come before {uuids[-1]}") | |
raise Exception(f"UUID {str(uuid)} is not in order, should come before {uuids[-1]}") | |
uuids.append(str(uuid)) | |
print(f"\nTesting parallel UUID7 generation of 1,000,000 UUIDs...") | |
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: | |
futures = [] | |
for _ in range(10): | |
futures.append(executor.submit(UUID7.from_parts)) | |
for future in concurrent.futures.as_completed(futures): | |
# check if it's in order | |
if uuids and str(future.result()) < uuids[-1]: | |
print(f"UUID {str(future.result())} is not in order, should come before {uuids[-1]}") | |
raise Exception(f"UUID {str(future.result())} is not in order, should come before {uuids[-1]}") | |
uuids.append(str(future.result())) | |
# Print some statistics | |
print(f"Generated {len(uuids)} UUIDs") | |
print(f"First UUID: {uuids[0]}") | |
print(f"Last UUID: {uuids[-1]}") | |
print(f"All UUIDs are monotonically increasing") | |
print("\nAll tests passed successfully!") | |
if __name__ == "__main__": | |
run_tests() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment