Skip to content

Instantly share code, notes, and snippets.

@usbharu
Created April 6, 2023 13:49
Show Gist options
  • Save usbharu/49902908c438291da91f336af2ed11b7 to your computer and use it in GitHub Desktop.
Save usbharu/49902908c438291da91f336af2ed11b7 to your computer and use it in GitHub Desktop.
KJob Exposed Library
package dev.usbharu.kjob.exposed
import kjob.core.job.JobProgress
import kjob.core.job.JobSettings
import kjob.core.job.JobStatus
import kjob.core.job.ScheduledJob
import kjob.core.repository.JobRepository
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.*
import org.jetbrains.exposed.dao.id.LongIdTable
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.SqlExpressionBuilder.inList
import org.jetbrains.exposed.sql.SqlExpressionBuilder.plus
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
import org.jetbrains.exposed.sql.transactions.transaction
import java.time.Clock
import java.time.Instant
import java.util.*
class ExposedJobRepository(
private val database: Database,
private val tableName: String,
private val clock: Clock,
private val json: Json
) :
JobRepository {
class Jobs(tableName: String) : LongIdTable(tableName) {
val status = text("status")
val runAt = long("runAt").nullable()
val statusMessage = text("statusMessage").nullable()
val retries = integer("retries")
val kjobId = char("kjobId", 36).nullable()
val createdAt = long("createdAt")
val updatedAt = long("updatedAt")
val jobId = text("jobId")
val name = text("name")
val properties = text("properties").nullable()
val step = integer("step")
val max = integer("max").nullable()
val startedAt = long("startedAt").nullable()
val completedAt = long("completedAt").nullable()
}
val jobs: Jobs = Jobs(tableName)
fun createTable() {
transaction(database) {
SchemaUtils.create(jobs)
}
}
suspend fun <T> query(block: suspend () -> T): T = newSuspendedTransaction(Dispatchers.IO) { block() }
override suspend fun completeProgress(id: String): Boolean {
val now = Instant.now(clock).toEpochMilli()
return query {
jobs.update({ jobs.id eq id.toLong() }) {
it[jobs.completedAt] = now
it[jobs.updatedAt] = now
} == 1
}
}
override suspend fun exist(jobId: String): Boolean {
return query {
jobs.select(jobs.jobId eq jobId).empty().not()
}
}
override suspend fun findNext(names: Set<String>, status: Set<JobStatus>, limit: Int): Flow<ScheduledJob> {
return query {
jobs.select(
jobs.status.inList(list = status.map { it.name })
.and(if (names.isEmpty()) Op.TRUE else jobs.name.inList(names))
).limit(limit)
.map { it.toScheduledJob() }.asFlow()
}
}
override suspend fun get(id: String): ScheduledJob? {
val single = query { jobs.select(jobs.id eq id.toLong()).singleOrNull() } ?: return null
return single.toScheduledJob()
}
override suspend fun reset(id: String, oldKjobId: UUID?): Boolean {
return query {
jobs.update({ jobs.id eq id.toLong() and if (oldKjobId == null) jobs.kjobId.isNull() else jobs.kjobId eq oldKjobId.toString() }) {
it[jobs.status] = JobStatus.CREATED.name
it[jobs.statusMessage] = null
it[jobs.kjobId] = null
it[jobs.step] = 0
it[jobs.max] = null
it[jobs.startedAt] = null
it[jobs.completedAt] = null
it[jobs.updatedAt] = Instant.now(clock).toEpochMilli()
} == 1
}
}
override suspend fun save(jobSettings: JobSettings, runAt: Instant?): ScheduledJob {
val now = Instant.now(clock)
val scheduledJob =
ScheduledJob("", JobStatus.CREATED, runAt, null, 0, null, now, now, jobSettings, JobProgress(0))
val id = query {
jobs.insert {
it[jobs.status] = scheduledJob.status.name
it[jobs.createdAt] = scheduledJob.createdAt.toEpochMilli()
it[jobs.updatedAt] = scheduledJob.updatedAt.toEpochMilli()
it[jobs.jobId] = scheduledJob.settings.id
it[jobs.name] = scheduledJob.settings.name
it[jobs.properties] = scheduledJob.settings.properties.stringify()
it[jobs.runAt] = scheduledJob.runAt?.toEpochMilli()
it[jobs.statusMessage] = null
it[jobs.retries] = 0
it[jobs.kjobId] = null
it[jobs.step] = 0
it[jobs.max] = null
it[jobs.startedAt] = null
it[jobs.completedAt] = null
}[jobs.id].value
}
return scheduledJob.copy(id = id.toString())
}
override suspend fun setProgressMax(id: String, max: Long): Boolean {
val now = Instant.now(clock).toEpochMilli()
return query {
jobs.update({ jobs.id eq id.toLong() }) {
it[jobs.max] = max.toInt()
it[jobs.updatedAt] = now
} == 1
}
}
override suspend fun startProgress(id: String): Boolean {
val now = Instant.now(clock).toEpochMilli()
return query {
jobs.update({ jobs.id eq id.toLong() }) {
it[jobs.startedAt] = now
it[jobs.updatedAt] = now
} == 1
}
}
override suspend fun stepProgress(id: String, step: Long): Boolean {
val now = Instant.now(clock).toEpochMilli()
return query {
jobs.update({ jobs.id eq id.toLong() }) {
it[jobs.step] = jobs.step + step.toInt()
it[jobs.updatedAt] = now
} == 1
}
}
override suspend fun update(
id: String,
oldKjobId: UUID?,
kjobId: UUID?,
status: JobStatus,
statusMessage: String?,
retries: Int
): Boolean {
return query {
jobs.update({ (jobs.id eq id.toLong()) and if (oldKjobId == null) jobs.kjobId.isNull() else jobs.kjobId eq oldKjobId.toString() }) {
it[jobs.status] = status.name
it[jobs.retries] = retries
it[jobs.updatedAt] = Instant.now(clock).toEpochMilli()
it[jobs.id] = id.toLong()
it[jobs.statusMessage] = statusMessage
it[jobs.kjobId] = kjobId.toString()
} == 1
}
}
private fun String?.parseJsonMap(): Map<String, Any> {
this ?: return emptyMap()
return json.parseToJsonElement(this).jsonObject.mapValues { (_, el) ->
if (el is JsonObject) {
val t = el["t"]?.jsonPrimitive?.content ?: error("Cannot get jsonPrimitive")
val value = el["v"]?.jsonArray ?: error("Cannot get jsonArray")
when (t) {
"s" -> value.map { it.jsonPrimitive.content }
"d" -> value.map { it.jsonPrimitive.double }
"l" -> value.map { it.jsonPrimitive.long }
"i" -> value.map { it.jsonPrimitive.int }
"b" -> value.map { it.jsonPrimitive.boolean }
else -> error("Unknown type prefix '$t'")
}.toList()
} else {
val content = el.jsonPrimitive.content
val t = content.substringBefore(':')
val value = content.substringAfter(':')
when (t) {
"s" -> value
"d" -> value.toDouble()
"l" -> value.toLong()
"i" -> value.toInt()
"b" -> value.toBoolean()
else -> error("Unknown type prefix '$t'")
}
}
}
}
private fun Map<String, Any>.stringify(): String? {
if (isEmpty()) {
return null
}
fun listSerialize(value: List<*>): JsonElement {
return if (value.isEmpty()) {
buildJsonObject {
put("t", "s")
putJsonArray("v") {}
}
} else {
val (t, values) = when (val item = value.first()) {
is Double -> "d" to (value as List<Double>).map(::JsonPrimitive)
is Long -> "l" to (value as List<Long>).map(::JsonPrimitive)
is Int -> "i" to (value as List<Int>).map(::JsonPrimitive)
is String -> "s" to (value as List<String>).map(::JsonPrimitive)
is Boolean -> "b" to (value as List<Boolean>).map(::JsonPrimitive)
else -> error("Cannot serialize unsupported list property value: $value")
}
buildJsonObject {
put("t", t)
put("v", JsonArray(values))
}
}
}
fun createJsonPrimitive(string: String, value: Any) = JsonPrimitive("$string:$value")
val jsonObject = JsonObject(
mapValues { (_, value) ->
when (value) {
is List<*> -> listSerialize(value)
is Double -> createJsonPrimitive("d", value)
is Long -> createJsonPrimitive("l", value)
is Int -> createJsonPrimitive("i", value)
is String -> createJsonPrimitive("s", value)
is Boolean -> createJsonPrimitive("b", value)
else -> error("Cannot serialize unsupported property value: $value")
}
}
)
return json.encodeToString(jsonObject)
}
private fun ResultRow.toScheduledJob(): ScheduledJob {
val single = this
jobs.run {
return ScheduledJob(
id = single[this.id].value.toString(),
status = JobStatus.valueOf(single[status]),
runAt = single[runAt]?.let { Instant.ofEpochMilli(it) },
statusMessage = single[statusMessage],
retries = single[retries],
kjobId = single[kjobId]?.let {
try {
UUID.fromString(it)
} catch (e: IllegalArgumentException) {
null
}
},
createdAt = Instant.ofEpochMilli(single[createdAt]),
updatedAt = Instant.ofEpochMilli(single[updatedAt]),
settings = JobSettings(
id = single[jobId],
name = single[name],
properties = single[properties].parseJsonMap()
),
progress = JobProgress(
step = single[step].toLong(),
max = single[max]?.toLong(),
startedAt = single[startedAt]?.let { Instant.ofEpochMilli(it) },
completedAt = single[completedAt]?.let { Instant.ofEpochMilli(it) }
)
)
}
}
}
package dev.usbharu.kjob.exposed
import kjob.core.BaseKJob
import kjob.core.KJob
import kjob.core.KJobFactory
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.Database
import java.time.Clock
class ExposedKJob(config: Configuration) : BaseKJob<ExposedKJob.Configuration>(config) {
companion object : KJobFactory<ExposedKJob, Configuration> {
override fun create(configure: Configuration.() -> Unit): KJob {
return ExposedKJob(Configuration().apply(configure))
}
}
class Configuration : BaseKJob.Configuration() {
var connectionString: String? = null
var driverClassName: String? = null
var database: Database? = null
var jobTableName = "kjobJobs"
var lockTableName = "kjobLocks"
var expireLockInMinutes = 5L
}
private val database: Database = config.database ?: Database.connect(
requireNotNull(config.connectionString),
requireNotNull(config.driverClassName)
)
override val jobRepository: ExposedJobRepository
get() = ExposedJobRepository(database, config.jobTableName, Clock.systemUTC(), config.json)
override val lockRepository: ExposedLockRepository
get() = ExposedLockRepository(database, config, clock)
override fun start(): KJob {
jobRepository.createTable()
lockRepository.createTable()
return super.start()
}
override fun shutdown() = runBlocking {
super.shutdown()
lockRepository.clearExpired()
}
}
package dev.usbharu.kjob.exposed
import kjob.core.job.Lock
import kjob.core.repository.LockRepository
import kotlinx.coroutines.Dispatchers
import org.jetbrains.exposed.dao.id.UUIDTable
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.SqlExpressionBuilder.greater
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
import org.jetbrains.exposed.sql.transactions.transaction
import java.time.Clock
import java.time.Instant
import java.util.*
import kotlin.time.Duration.Companion.minutes
class ExposedLockRepository(
private val database: Database,
private val config: ExposedKJob.Configuration,
private val clock: Clock
) : LockRepository {
class Locks(tableName: String) : UUIDTable(tableName) {
val updatedAt = long("updatedAt")
val expiresAt = long("expiresAt")
}
val locks: Locks = Locks(config.lockTableName)
fun createTable() {
transaction(database) {
SchemaUtils.create(locks)
}
}
suspend fun <T> query(block: suspend () -> T): T = newSuspendedTransaction(Dispatchers.IO) { block() }
override suspend fun exists(id: UUID): Boolean {
val now = Instant.now(clock)
return query {
locks.select(locks.id eq id and locks.expiresAt.greater(now.toEpochMilli())).empty().not()
}
}
override suspend fun ping(id: UUID): Lock {
val now = Instant.now(clock)
val expiresAt = now.plusSeconds(config.expireLockInMinutes.minutes.inWholeSeconds)
val lock = Lock(id, now)
query {
if (locks.select(locks.id eq id).limit(1)
.map { Lock(it[locks.id].value, Instant.ofEpochMilli(it[locks.expiresAt])) }.isEmpty()
) {
locks.insert {
it[locks.id] = id
it[locks.updatedAt] = now.toEpochMilli()
it[locks.expiresAt] = expiresAt.toEpochMilli()
}
} else {
locks.update({ locks.id eq id }) {
it[locks.updatedAt] = now.toEpochMilli()
it[locks.expiresAt] = expiresAt.toEpochMilli()
}
}
}
return lock
}
suspend fun clearExpired(){
val now = Instant.now(clock).toEpochMilli()
query {
locks.deleteWhere { locks.expiresAt greater now }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment