Instantly share code, notes, and snippets.
fluidsonic/NavigationComputer.kt Secret
Last active
October 14, 2020 12:04
-
Star
(0)
0
You must be signed in to star a gist -
Fork
(0)
0
You must be signed in to fork a gist
-
Save fluidsonic/e5e12e3940e89639719d4f390e41a3ca to your computer and use it in GitHub Desktop.
"mapLatest" must only be executed if the upstream has sent a NEW value. Unfortunately that doesn't work if the Flow was temporarily cold
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package edis | |
import eve.* | |
import io.fluidsonic.stdlib.* | |
import java.io.* | |
import java.nio.file.* | |
import kotlin.coroutines.* | |
import kotlinx.atomicfu.* | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.* | |
import kotlinx.coroutines.flow.* | |
import org.slf4j.* | |
private val cachePath = Path.of("cache/navigationSystem").toAbsolutePath() // FIXME make configurable | |
class NavigationComputer( | |
parentContext: CoroutineContext, | |
logger: Logger, | |
stargates: Flow<EveEntitySet<EveStargate, EveStargateId>>, | |
systems: Flow<EveEntitySet<EveSystem, EveSystemId>>, | |
) { | |
private val coroutineContext = parentContext + CoroutineName("NavigationComputer") | |
val navigationSystem = navigationSystemFlowIn( | |
scope = CoroutineScope(coroutineContext), | |
logger = logger, | |
stargates = stargates, | |
systems = systems | |
) | |
} | |
@Suppress("NAME_SHADOWING") | |
private fun navigationSystemFlowIn( | |
scope: CoroutineScope, | |
logger: Logger, | |
stargates: Flow<EveEntitySet<EveStargate, EveStargateId>>, | |
systems: Flow<EveEntitySet<EveSystem, EveSystemId>>, | |
): Flow<NavigationSystem> { | |
val isLoaded = atomic(false) | |
val saveChannel = Channel<NavigationSystem>(Channel.RENDEZVOUS) | |
scope.launch { | |
saveChannel.consumeAsFlow().collectLatest { navigationSystem -> | |
withContext(Dispatchers.IO) { | |
val startMark = kotlin.time.TimeSource.Monotonic.markNow() // FIXME https://youtrack.jetbrains.com/issue/KT-42625 | |
logger.debug("Saving navigation system to '$cachePath'…") | |
val temporaryFile = Files.createTempFile("edis", ".navigationSystem") | |
try { | |
ObjectOutputStream(Files.newOutputStream(temporaryFile)).use { navigationSystem.writeTo(it) } | |
ensureActive() | |
} | |
catch (e: Throwable) { | |
if (e is CancellationException) | |
logger.debug("Saving of navigation system canceled after ${startMark.elapsedNow()}.") | |
Files.delete(temporaryFile) | |
throw e | |
} | |
Files.move(temporaryFile, cachePath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING) | |
logger.debug("Navigation system saved in ${startMark.elapsedNow()}.") | |
} | |
} | |
} | |
return systems | |
.combine(stargates) { systems, stargates -> systems to stargates } | |
.stateIn(scope, SharingStarted.WhileSubscribed(), initialValue = null) | |
.filterNotNull() | |
.distinctUntilChanged() | |
.mapLatest { (systems, stargates) -> | |
val startMark = kotlin.time.TimeSource.Monotonic.markNow() // FIXME https://youtrack.jetbrains.com/issue/KT-42625 | |
if (isLoaded.compareAndSet(expect = false, update = true)) | |
withContext(Dispatchers.IO) { | |
Files.exists(cachePath).thenTake { | |
logger.debug("Loading navigation system from '$cachePath'…") | |
try { | |
ObjectInputStream(Files.newInputStream(cachePath)).use { stream -> | |
NavigationSystem.readFrom(stream = stream, stargates = stargates, systems = EveEntitySet.of(systems)) | |
} | |
} | |
catch (e: CancellationException) { | |
logger.debug("Loading of navigation system canceled after ${startMark.elapsedNow()}.") | |
throw e | |
}.also { navigationSystem -> | |
if (navigationSystem != null) | |
logger.debug("Navigation system loaded in ${startMark.elapsedNow()}.") | |
else | |
logger.debug("Loading of navigation system aborted after ${startMark.elapsedNow()} because it is outdated.") | |
} | |
} | |
}?.let { return@mapLatest it } | |
coroutineContext.ensureActive() | |
logger.debug("Building navigation system…") | |
val navigationSystem = try { | |
NavigationSystem.build(stargates = stargates, systems = systems) | |
} | |
catch (e: CancellationException) { | |
logger.debug("Navigation system build canceled after ${startMark.elapsedNow()}.") | |
throw e | |
} | |
logger.debug("Navigation system built in ${startMark.elapsedNow()}.") | |
saveChannel.send(navigationSystem) | |
navigationSystem | |
} | |
.shareIn(scope, SharingStarted.WhileSubscribed(), replay = 1) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment