Skip to content

Instantly share code, notes, and snippets.

@adeshmukh-quizlet
Last active July 16, 2019 00:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adeshmukh-quizlet/848caa61c5b80c6b92ae773ec941f99d to your computer and use it in GitHub Desktop.
Save adeshmukh-quizlet/848caa61c5b80c6b92ae773ec941f99d to your computer and use it in GitHub Desktop.
Distributed ID Generation in Kubernetes
// 32-bit value will overflow in 2038. Migrate to 64-bit support by then.
// Ref: https://en.wikipedia.org/wiki/Year_2038_problem
private const val TIME_PART_MASK = 0x0000_0000_FFFF_FFFF
const val SHARD_PART_WIDTH = 5
const val SEQ_PART_WIDTH = 15
const val SEQ_PART_LIMIT_EXCLUSIVE = 1.shl(SEQ_PART_WIDTH)
/**
* A distributed, thread-safe ID generator that generates
* 53-bit IDs using the following scheme:
*
* [--1--][--------32--------][--5--][----15----]
* Sign UnixTimeSeconds Shard Sequence
*
* Limits:
* * Max 32,768 unique IDs per second.
* * Max 32 shards (i.e. pods).
* * 32-bit Unix time in seconds will last upto Jan 2038.
*
* The limits can be increased by increasing the bit-width to, say, 64.
* But this will impact clients that cannot deal with 64-bit integers.
*/
@ThreadSafe
class ShardIdGenerator(
private val shardId: Int,
private val clock: Clock
) : IdGenerator {
private val sequenceLock: Lock = ReentrantLock(true)
@Volatile
private var sequence = 0L
@Volatile
private var lastZeroSequencePartTimestamp: Long = -1L
init {
if (shardId >= 1.shl(SHARD_PART_WIDTH)) {
throw IllegalStateException(
format("Shard ID %s exceeds %s-bit limit.", shardId, SHARD_PART_WIDTH)
)
}
}
override fun nextId(): Long {
val currentTimeSeconds = clock.instant().epochSecond
val timePart = currentTimeSeconds
.and(TIME_PART_MASK)
.shl(SHARD_PART_WIDTH + SEQ_PART_WIDTH)
val nodePart = shardId.shl(SEQ_PART_WIDTH).toLong()
val sequencePart = nextSequencePart(currentTimeSeconds)
return timePart.or(nodePart).or(sequencePart)
}
private fun nextSequencePart(currentTimeSeconds: Long): Long =
sequenceLock.withLock {
val sequencePart = sequence.rem(SEQ_PART_LIMIT_EXCLUSIVE)
if (sequencePart == 0L && lastZeroSequencePartTimestamp == currentTimeSeconds) {
throw IdCollisionException(format("ID collision at %s.", currentTimeSeconds))
}
sequence += 1
lastZeroSequencePartTimestamp = currentTimeSeconds
return sequencePart
}
}
const val SHARD_ID = 31
class ShardIdGeneratorTests {
@Test
fun `basic id generation`() {
val shardIdGenerator = ShardIdGenerator(SHARD_ID, Clock.systemUTC())
val initialId = shardIdGenerator.nextId()
assertTrue(initialId > 0)
assertTrue(shardIdGenerator.nextId() > initialId)
}
@Test
fun `Ensure creation fails if shard ID exceeds range`() {
val shardId = 1.shl(SHARD_PART_WIDTH)
assertThrows(IllegalStateException::class.java) {
ShardIdGenerator(shardId, Clock.systemUTC())
}
}
@RepeatedTest(200)
fun `Ensure 2x rated performance`() {
/*
* ShardIdGenerator is designed to provide upto SEQ_PART_LIMIT_EXCLUSIVE
* values per second. This test ensures that we can generate
* SEQ_PART_LIMIT_EXCLUSIVE values in 1/2 of that time period (i.e. in 500ms).
*
* An implicit assumption is that the performance of the CI infrastructure
* running this test is inferior to production.
* Hence if this passes in CI, it's certainly good for production.
*/
val shardIdGenerator = ShardIdGenerator(SHARD_ID, Clock.systemUTC())
val initialId = shardIdGenerator.nextId()
var finalId: Long = initialId
val execTime = measureNanoTime {
repeat(SEQ_PART_LIMIT_EXCLUSIVE - 1) { finalId = shardIdGenerator.nextId() }
}
/* We compare only the shard and sequence part of the ID
*
*/
val mask: Long = 0b1111_1111_1111_1111_1111 // 20-bit mask to only retain the shard and sequence part.
assertEquals(finalId.and(mask) - initialId.and(mask), (SEQ_PART_LIMIT_EXCLUSIVE - 1).toLong())
assertTrue(Duration.ofNanos(execTime) < Duration.ofMillis(500))
}
@Test
fun `Ensure collision if IDs generated faster than SEQ_PART_LIMIT_EXCLUSIVE per second`() {
// Number of ID generation calls that are expected to fail due to collision.
val numIdsExceedingLimit = 10
// Set up a mock clock to allow precise control over the time boundaries for this test.
val initialInstant = Instant.now()
val clock = mock<Clock> {
on {
instant()
}.also {
var stubbing = it
// Exceed the limit on IDs generated within the second by numIdsExceedingLimit.
// This allows us to expect numIdsExceedingLimit failures in the ID generation.
repeat(SEQ_PART_LIMIT_EXCLUSIVE + numIdsExceedingLimit) {
stubbing = stubbing.thenReturn(initialInstant)
}
// Tick the clock to next second to verify that the ID generator
// can recover from a prior collision after the second has passed.
stubbing.thenReturn(initialInstant.plusSeconds(1))
}
}
// Create the ShardIdGenerator to be tested.
val shardIdGenerator = ShardIdGenerator(SHARD_ID, clock)
// Concurrently generate SEQ_PART_LIMIT_EXCLUSIVE IDs.
val generatedIds: Set<Long> = Stream.generate {
fun(): Long = shardIdGenerator.nextId()
}
.limit(SEQ_PART_LIMIT_EXCLUSIVE.toLong())
.parallel()
.map { it.invoke() }
.collect(Collectors.toSet())
// Verify every value was unique.
assertEquals(SEQ_PART_LIMIT_EXCLUSIVE, generatedIds.size)
// After the SEQ_PART_LIMIT_EXCLUSIVE IDs are generated, subsequent
// attempts to get the next ID within the same second are expected to fail.
// NOTE: They are guaranteed to be generated within the same second because
// this is how we setup the Clock for this test.
repeat(numIdsExceedingLimit) {
assertThrows(IdGenerator.IdCollisionException::class.java) {
shardIdGenerator.nextId()
}
}
// We have previously setup the Clock so that subsequent calls
// to nextId() occur in the next second, thus these are expected
// to succeed.
val generatedIds2 = Stream.generate {
fun(): Long = shardIdGenerator.nextId()
}
.limit(SEQ_PART_LIMIT_EXCLUSIVE.toLong())
.parallel()
.map { it.invoke() }
.collect(Collectors.toSet())
assertEquals(SEQ_PART_LIMIT_EXCLUSIVE, generatedIds2.size)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment