Skip to content

Instantly share code, notes, and snippets.

@ZirconXi
Created February 24, 2020 13:16
Show Gist options
  • Save ZirconXi/588add92a1298770949292e3f83d1f69 to your computer and use it in GitHub Desktop.
Save ZirconXi/588add92a1298770949292e3f83d1f69 to your computer and use it in GitHub Desktop.
Snowflake 算法实现
"""
twitter Snowflake implement
"""
import threading
import time
def get_now_timestamp() -> int:
""" 获取当前时间戳
:return 当前时间戳
"""
return int(time.time() * 1000)
def tail_next_millis(last_timestamp: int) -> int:
""" 阻塞到下一毫秒,直到获得新的时间戳
:param last_timestamp 上一个生成的时间戳
:return 当前时间戳
"""
timestamp = get_now_timestamp()
while timestamp <= last_timestamp:
timestamp = get_now_timestamp()
return timestamp
class Snowflake:
# 机器 id 占用的位数
WORKER_ID_BITS = 10
# 最大的机器 id(0b1111111111)
MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS)
# 序列占用的位数
SEQUENCE_BITS = 12
# 序列掩码(0b111111111111)
SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)
# 机器 id 左移 12(SEQUENCE_BITS) 位
WORKER_ID_LSHIFT = SEQUENCE_BITS
# 时间戳左移 22(WORKER_ID_BITS+WORKER_ID_LSHIFT) 位
TIMESTAMP_LSHIFT = WORKER_ID_BITS + WORKER_ID_LSHIFT
def __init__(self, worker_id: int = 0):
self.start_timestamp = 1582473600000 # 起始时间戳:2020-01-01
self.last_timestamp = -1 # 上一个时间戳
self.sequence = 0 # 序列初值
self.worker_id = worker_id # 机器 id
self.lock = threading.Lock() # 线程锁
def get_next_id(self) -> int:
with self.lock:
now_timestamp: int = get_now_timestamp()
if now_timestamp < self.last_timestamp:
# 若时间回拨,拒绝生成 id
raise ValueError(
'Clock moved backwards. Refusing to'
'generate id for {0} milliseconds.'.format(
self.last_timestamp - now_timestamp
)
)
elif now_timestamp == self.last_timestamp:
# 同一时间戳,进行毫秒内序列自增
self.sequence: int = (self.sequence + 1) & Snowflake.SEQUENCE_MASK
if self.sequence == 0:
# 若毫秒内序列溢出,阻塞直到下一个时间戳
now_timestamp: int = tail_next_millis(self.last_timestamp)
else:
# 重置序列
self.sequence: int = 0
# 更新上一个时间截
self.last_timestamp: int = now_timestamp
# 生成 id
unique_id: int = (
((now_timestamp - self.start_timestamp) << Snowflake.TIMESTAMP_LSHIFT)
| (self.worker_id << Snowflake.WORKER_ID_LSHIFT)
| self.sequence
)
return unique_id
def generate(self) -> int:
return self.get_next_id()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment