Skip to content

Instantly share code, notes, and snippets.

@somdoron
Created May 18, 2021 19:03
Show Gist options
  • Save somdoron/ed930191282b756a74c16c159c78288f to your computer and use it in GitHub Desktop.
Save somdoron/ed930191282b756a74c16c159c78288f to your computer and use it in GitHub Desktop.
Idempotency Postgres
def lock(idempotencyKey: String) = {
// Producing 64bit key which is required for postgres advisory lock
val key1 = MurmurHash3.productHash(idempotencyKey))
val key2 = MurmurHash3.productHash(idempotencyKey, seed)
sql"""SELECT pg_advisory_xact_lock($key1, $key2)""".query[Unit].unique
}
def fetch[T](idempotencyKey: String) =
readonlyTransaction {
(sql"""SELECT result
FROM idempotent_runs
WHERE idempotency_key = $idempotencyKey"""
)).query[T].option
}
def save[T](idempotencyKey: String, result: T) =
sql"""
INSERT INTO idempotent_runs (idempotency_key,result)
VALUES ($idempotencyKey, $result)
""".update.run
def idempotent[T](idempotencyKey: String, effect: Unit => T) =
transaction {
lock(idempotencyKey)
// This is a different readonly transaction, it wouldn't work otherwise because the snapshot of
// the serializable transaction is taken before the lock.
fetch(idempotencyKey) match {
case Some(result) => result // a result was found
case None =>
val result = effect(())
save(idempotencyKey, result)
result
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment