Skip to content

Instantly share code, notes, and snippets.

@fluidsonic
Last active October 14, 2020 12:04
Show Gist options
  • Save fluidsonic/e5e12e3940e89639719d4f390e41a3ca to your computer and use it in GitHub Desktop.
Save fluidsonic/e5e12e3940e89639719d4f390e41a3ca to your computer and use it in GitHub Desktop.
"mapLatest" must only be executed if the upstream has sent a NEW value. Unfortunately that doesn't work if the Flow was temporarily cold
package edis
import eve.*
import io.fluidsonic.stdlib.*
import java.io.*
import java.nio.file.*
import kotlin.coroutines.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import org.slf4j.*
private val cachePath = Path.of("cache/navigationSystem").toAbsolutePath() // FIXME make configurable
class NavigationComputer(
parentContext: CoroutineContext,
logger: Logger,
stargates: Flow<EveEntitySet<EveStargate, EveStargateId>>,
systems: Flow<EveEntitySet<EveSystem, EveSystemId>>,
) {
private val coroutineContext = parentContext + CoroutineName("NavigationComputer")
val navigationSystem = navigationSystemFlowIn(
scope = CoroutineScope(coroutineContext),
logger = logger,
stargates = stargates,
systems = systems
)
}
@Suppress("NAME_SHADOWING")
private fun navigationSystemFlowIn(
scope: CoroutineScope,
logger: Logger,
stargates: Flow<EveEntitySet<EveStargate, EveStargateId>>,
systems: Flow<EveEntitySet<EveSystem, EveSystemId>>,
): Flow<NavigationSystem> {
val isLoaded = atomic(false)
val saveChannel = Channel<NavigationSystem>(Channel.RENDEZVOUS)
scope.launch {
saveChannel.consumeAsFlow().collectLatest { navigationSystem ->
withContext(Dispatchers.IO) {
val startMark = kotlin.time.TimeSource.Monotonic.markNow() // FIXME https://youtrack.jetbrains.com/issue/KT-42625
logger.debug("Saving navigation system to '$cachePath'…")
val temporaryFile = Files.createTempFile("edis", ".navigationSystem")
try {
ObjectOutputStream(Files.newOutputStream(temporaryFile)).use { navigationSystem.writeTo(it) }
ensureActive()
}
catch (e: Throwable) {
if (e is CancellationException)
logger.debug("Saving of navigation system canceled after ${startMark.elapsedNow()}.")
Files.delete(temporaryFile)
throw e
}
Files.move(temporaryFile, cachePath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING)
logger.debug("Navigation system saved in ${startMark.elapsedNow()}.")
}
}
}
return systems
.combine(stargates) { systems, stargates -> systems to stargates }
.stateIn(scope, SharingStarted.WhileSubscribed(), initialValue = null)
.filterNotNull()
.distinctUntilChanged()
.mapLatest { (systems, stargates) ->
val startMark = kotlin.time.TimeSource.Monotonic.markNow() // FIXME https://youtrack.jetbrains.com/issue/KT-42625
if (isLoaded.compareAndSet(expect = false, update = true))
withContext(Dispatchers.IO) {
Files.exists(cachePath).thenTake {
logger.debug("Loading navigation system from '$cachePath'…")
try {
ObjectInputStream(Files.newInputStream(cachePath)).use { stream ->
NavigationSystem.readFrom(stream = stream, stargates = stargates, systems = EveEntitySet.of(systems))
}
}
catch (e: CancellationException) {
logger.debug("Loading of navigation system canceled after ${startMark.elapsedNow()}.")
throw e
}.also { navigationSystem ->
if (navigationSystem != null)
logger.debug("Navigation system loaded in ${startMark.elapsedNow()}.")
else
logger.debug("Loading of navigation system aborted after ${startMark.elapsedNow()} because it is outdated.")
}
}
}?.let { return@mapLatest it }
coroutineContext.ensureActive()
logger.debug("Building navigation system…")
val navigationSystem = try {
NavigationSystem.build(stargates = stargates, systems = systems)
}
catch (e: CancellationException) {
logger.debug("Navigation system build canceled after ${startMark.elapsedNow()}.")
throw e
}
logger.debug("Navigation system built in ${startMark.elapsedNow()}.")
saveChannel.send(navigationSystem)
navigationSystem
}
.shareIn(scope, SharingStarted.WhileSubscribed(), replay = 1)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment