Last active
July 16, 2019 00:39
-
-
Save adeshmukh-quizlet/848caa61c5b80c6b92ae773ec941f99d to your computer and use it in GitHub Desktop.
Distributed ID Generation in Kubernetes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// 32-bit value will overflow in 2038. Migrate to 64-bit support by then. | |
// Ref: https://en.wikipedia.org/wiki/Year_2038_problem | |
private const val TIME_PART_MASK = 0x0000_0000_FFFF_FFFF | |
const val SHARD_PART_WIDTH = 5 | |
const val SEQ_PART_WIDTH = 15 | |
const val SEQ_PART_LIMIT_EXCLUSIVE = 1.shl(SEQ_PART_WIDTH) | |
/** | |
* A distributed, thread-safe ID generator that generates | |
* 53-bit IDs using the following scheme: | |
* | |
* [--1--][--------32--------][--5--][----15----] | |
* Sign UnixTimeSeconds Shard Sequence | |
* | |
* Limits: | |
* * Max 32,768 unique IDs per second. | |
* * Max 32 shards (i.e. pods). | |
* * 32-bit Unix time in seconds will last upto Jan 2038. | |
* | |
* The limits can be increased by increasing the bit-width to, say, 64. | |
* But this will impact clients that cannot deal with 64-bit integers. | |
*/ | |
@ThreadSafe | |
class ShardIdGenerator( | |
private val shardId: Int, | |
private val clock: Clock | |
) : IdGenerator { | |
private val sequenceLock: Lock = ReentrantLock(true) | |
@Volatile | |
private var sequence = 0L | |
@Volatile | |
private var lastZeroSequencePartTimestamp: Long = -1L | |
init { | |
if (shardId >= 1.shl(SHARD_PART_WIDTH)) { | |
throw IllegalStateException( | |
format("Shard ID %s exceeds %s-bit limit.", shardId, SHARD_PART_WIDTH) | |
) | |
} | |
} | |
override fun nextId(): Long { | |
val currentTimeSeconds = clock.instant().epochSecond | |
val timePart = currentTimeSeconds | |
.and(TIME_PART_MASK) | |
.shl(SHARD_PART_WIDTH + SEQ_PART_WIDTH) | |
val nodePart = shardId.shl(SEQ_PART_WIDTH).toLong() | |
val sequencePart = nextSequencePart(currentTimeSeconds) | |
return timePart.or(nodePart).or(sequencePart) | |
} | |
private fun nextSequencePart(currentTimeSeconds: Long): Long = | |
sequenceLock.withLock { | |
val sequencePart = sequence.rem(SEQ_PART_LIMIT_EXCLUSIVE) | |
if (sequencePart == 0L && lastZeroSequencePartTimestamp == currentTimeSeconds) { | |
throw IdCollisionException(format("ID collision at %s.", currentTimeSeconds)) | |
} | |
sequence += 1 | |
lastZeroSequencePartTimestamp = currentTimeSeconds | |
return sequencePart | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const val SHARD_ID = 31 | |
class ShardIdGeneratorTests { | |
@Test | |
fun `basic id generation`() { | |
val shardIdGenerator = ShardIdGenerator(SHARD_ID, Clock.systemUTC()) | |
val initialId = shardIdGenerator.nextId() | |
assertTrue(initialId > 0) | |
assertTrue(shardIdGenerator.nextId() > initialId) | |
} | |
@Test | |
fun `Ensure creation fails if shard ID exceeds range`() { | |
val shardId = 1.shl(SHARD_PART_WIDTH) | |
assertThrows(IllegalStateException::class.java) { | |
ShardIdGenerator(shardId, Clock.systemUTC()) | |
} | |
} | |
@RepeatedTest(200) | |
fun `Ensure 2x rated performance`() { | |
/* | |
* ShardIdGenerator is designed to provide upto SEQ_PART_LIMIT_EXCLUSIVE | |
* values per second. This test ensures that we can generate | |
* SEQ_PART_LIMIT_EXCLUSIVE values in 1/2 of that time period (i.e. in 500ms). | |
* | |
* An implicit assumption is that the performance of the CI infrastructure | |
* running this test is inferior to production. | |
* Hence if this passes in CI, it's certainly good for production. | |
*/ | |
val shardIdGenerator = ShardIdGenerator(SHARD_ID, Clock.systemUTC()) | |
val initialId = shardIdGenerator.nextId() | |
var finalId: Long = initialId | |
val execTime = measureNanoTime { | |
repeat(SEQ_PART_LIMIT_EXCLUSIVE - 1) { finalId = shardIdGenerator.nextId() } | |
} | |
/* We compare only the shard and sequence part of the ID | |
* | |
*/ | |
val mask: Long = 0b1111_1111_1111_1111_1111 // 20-bit mask to only retain the shard and sequence part. | |
assertEquals(finalId.and(mask) - initialId.and(mask), (SEQ_PART_LIMIT_EXCLUSIVE - 1).toLong()) | |
assertTrue(Duration.ofNanos(execTime) < Duration.ofMillis(500)) | |
} | |
@Test | |
fun `Ensure collision if IDs generated faster than SEQ_PART_LIMIT_EXCLUSIVE per second`() { | |
// Number of ID generation calls that are expected to fail due to collision. | |
val numIdsExceedingLimit = 10 | |
// Set up a mock clock to allow precise control over the time boundaries for this test. | |
val initialInstant = Instant.now() | |
val clock = mock<Clock> { | |
on { | |
instant() | |
}.also { | |
var stubbing = it | |
// Exceed the limit on IDs generated within the second by numIdsExceedingLimit. | |
// This allows us to expect numIdsExceedingLimit failures in the ID generation. | |
repeat(SEQ_PART_LIMIT_EXCLUSIVE + numIdsExceedingLimit) { | |
stubbing = stubbing.thenReturn(initialInstant) | |
} | |
// Tick the clock to next second to verify that the ID generator | |
// can recover from a prior collision after the second has passed. | |
stubbing.thenReturn(initialInstant.plusSeconds(1)) | |
} | |
} | |
// Create the ShardIdGenerator to be tested. | |
val shardIdGenerator = ShardIdGenerator(SHARD_ID, clock) | |
// Concurrently generate SEQ_PART_LIMIT_EXCLUSIVE IDs. | |
val generatedIds: Set<Long> = Stream.generate { | |
fun(): Long = shardIdGenerator.nextId() | |
} | |
.limit(SEQ_PART_LIMIT_EXCLUSIVE.toLong()) | |
.parallel() | |
.map { it.invoke() } | |
.collect(Collectors.toSet()) | |
// Verify every value was unique. | |
assertEquals(SEQ_PART_LIMIT_EXCLUSIVE, generatedIds.size) | |
// After the SEQ_PART_LIMIT_EXCLUSIVE IDs are generated, subsequent | |
// attempts to get the next ID within the same second are expected to fail. | |
// NOTE: They are guaranteed to be generated within the same second because | |
// this is how we setup the Clock for this test. | |
repeat(numIdsExceedingLimit) { | |
assertThrows(IdGenerator.IdCollisionException::class.java) { | |
shardIdGenerator.nextId() | |
} | |
} | |
// We have previously setup the Clock so that subsequent calls | |
// to nextId() occur in the next second, thus these are expected | |
// to succeed. | |
val generatedIds2 = Stream.generate { | |
fun(): Long = shardIdGenerator.nextId() | |
} | |
.limit(SEQ_PART_LIMIT_EXCLUSIVE.toLong()) | |
.parallel() | |
.map { it.invoke() } | |
.collect(Collectors.toSet()) | |
assertEquals(SEQ_PART_LIMIT_EXCLUSIVE, generatedIds2.size) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment