Created
February 24, 2020 13:16
-
-
Save ZirconXi/588add92a1298770949292e3f83d1f69 to your computer and use it in GitHub Desktop.
Snowflake 算法实现
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
twitter Snowflake implement | |
""" | |
import threading | |
import time | |
def get_now_timestamp() -> int: | |
""" 获取当前时间戳 | |
:return 当前时间戳 | |
""" | |
return int(time.time() * 1000) | |
def tail_next_millis(last_timestamp: int) -> int: | |
""" 阻塞到下一毫秒,直到获得新的时间戳 | |
:param last_timestamp 上一个生成的时间戳 | |
:return 当前时间戳 | |
""" | |
timestamp = get_now_timestamp() | |
while timestamp <= last_timestamp: | |
timestamp = get_now_timestamp() | |
return timestamp | |
class Snowflake: | |
# 机器 id 占用的位数 | |
WORKER_ID_BITS = 10 | |
# 最大的机器 id(0b1111111111) | |
MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS) | |
# 序列占用的位数 | |
SEQUENCE_BITS = 12 | |
# 序列掩码(0b111111111111) | |
SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS) | |
# 机器 id 左移 12(SEQUENCE_BITS) 位 | |
WORKER_ID_LSHIFT = SEQUENCE_BITS | |
# 时间戳左移 22(WORKER_ID_BITS+WORKER_ID_LSHIFT) 位 | |
TIMESTAMP_LSHIFT = WORKER_ID_BITS + WORKER_ID_LSHIFT | |
def __init__(self, worker_id: int = 0): | |
self.start_timestamp = 1582473600000 # 起始时间戳:2020-01-01 | |
self.last_timestamp = -1 # 上一个时间戳 | |
self.sequence = 0 # 序列初值 | |
self.worker_id = worker_id # 机器 id | |
self.lock = threading.Lock() # 线程锁 | |
def get_next_id(self) -> int: | |
with self.lock: | |
now_timestamp: int = get_now_timestamp() | |
if now_timestamp < self.last_timestamp: | |
# 若时间回拨,拒绝生成 id | |
raise ValueError( | |
'Clock moved backwards. Refusing to' | |
'generate id for {0} milliseconds.'.format( | |
self.last_timestamp - now_timestamp | |
) | |
) | |
elif now_timestamp == self.last_timestamp: | |
# 同一时间戳,进行毫秒内序列自增 | |
self.sequence: int = (self.sequence + 1) & Snowflake.SEQUENCE_MASK | |
if self.sequence == 0: | |
# 若毫秒内序列溢出,阻塞直到下一个时间戳 | |
now_timestamp: int = tail_next_millis(self.last_timestamp) | |
else: | |
# 重置序列 | |
self.sequence: int = 0 | |
# 更新上一个时间截 | |
self.last_timestamp: int = now_timestamp | |
# 生成 id | |
unique_id: int = ( | |
((now_timestamp - self.start_timestamp) << Snowflake.TIMESTAMP_LSHIFT) | |
| (self.worker_id << Snowflake.WORKER_ID_LSHIFT) | |
| self.sequence | |
) | |
return unique_id | |
def generate(self) -> int: | |
return self.get_next_id() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment