Skip to content

Instantly share code, notes, and snippets.

@jonatbergn
Last active November 30, 2023 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jonatbergn/8974889261b54d0856c360e86b70440a to your computer and use it in GitHub Desktop.
Save jonatbergn/8974889261b54d0856c360e86b70440a to your computer and use it in GitHub Desktop.
package com.jonathanbergen
import kotlinx.coroutines.Job
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch
/**
* Defines a unified interface for accessing data from both local and remote sources.
* This interface provides various strategies to handle data retrieval, ensuring that the process
* of fetching and storing data remains consistent and abstracted away from the consumer.
*
* Note: When accessing remote data, this implementation ensures that fetch operations are not
* performed concurrently. If a fetch operation is already in progress and another caller
* requests remote data, the latter caller will wait for the ongoing fetch operation to complete
* before proceeding. This avoids duplicate fetch operations.
*/
interface DataAccess<T> {
/**
* Retrieves data from the local source.
*
* @return The data from the local source if available, or null if no local data is found.
*/
suspend fun get(): T?
/**
* Attempts to retrieve data from the local source, falling back to remote source if not available.
* If the data is fetched from the remote source, it is also stored locally for future use.
*
* @return The data from the local source if available, or from the remote source if local data is not present.
*/
suspend fun getOrFetch(): T
/**
* Provides a flow that first emits the local data (if available) and then fetches
* and emits data from the remote source. The fetched data is also stored locally.
* This approach allows for an immediate response with local data followed by an
* updated response if newer data is available remotely.
*/
val flowGetAndFetch: Flow<T>
companion object {
/**
* Factory method to create an instance of [DataAccess] with specific logic for
* mapping between local and remote data types. Operations for saving and retrieving
* local data are passed as parameters.
*
* @param get A suspend function to retrieve local data.
* @param save A suspend function to store remote data locally.
* @param fetch A suspend function to fetch data from the remote source.
* @return An instance of [DataAccess] for the specified local type.
*/
operator fun <Remote, Local> invoke(
get: suspend () -> Local?,
save: suspend (Remote?) -> Unit,
fetch: suspend () -> Remote,
): DataAccess<Local> = DataAccessImpl(
getLocal = get,
saveRemote = save,
fetchRemote = fetch,
)
/**
* Factory method to create a [DataAccess] instance where fetched data is stored in and
* retrieved from memory. This approach is suitable when there is no need for persistent
* local storage.
*
* @param fetch A suspend function to fetch data from the remote source.
* @return An instance of [DataAccess] that operates entirely in memory.
*/
operator fun <Remote> invoke(
fetch: suspend () -> Remote,
): DataAccess<Remote> {
var data: Remote? = null
return DataAccess(
get = { data },
save = { data = it },
fetch = fetch,
)
}
}
}
// A generic class that implements data access with separate local and remote sources.
private class DataAccessImpl<Remote, Local>(
// Function to get data from the local source.
val getLocal: suspend () -> Local?,
// Function to save data to the remote source.
val saveRemote: suspend (Remote) -> Unit,
// Function to fetch data from the remote source.
val fetchRemote: suspend () -> Remote,
) : DataAccess<Local> {
// A nullable Job to keep track of the ongoing fetch operation.
private var fetchJob: Job? = null
// Extension function on FlowCollector to emit the current local data if it's not null.
private suspend fun FlowCollector<Local>.emitIfNotNull(): Local? {
val current = get()
if (current != null) emit(current)
return current
}
// Extension function on FlowCollector to handle fetching and saving remote data,
// and then emitting the local data.
private suspend fun FlowCollector<Local>.fetchSaveEmit() = coroutineScope {
// Starts a new fetch job if there isn't an active one.
if (fetchJob?.isActive != true) fetchJob = launch { saveRemote(fetchRemote()) }
// Waits for the current fetch job to complete.
fetchJob?.join()
// Attempts to emit the local data after the fetch operation.
emitIfNotNull()
}
// Flow property that initially emits local data and then fetches and emits remote data.
// It ensures that only distinct consecutive values are emitted.
override val flowGetAndFetch = flow { fetchSaveEmit() }.onStart { emitIfNotNull() }.distinctUntilChanged()
// Function to get the local data.
override suspend fun get() = getLocal()
// Function to either get the local data or fetch from remote if local data is not available.
override suspend fun getOrFetch() = flow { emitIfNotNull() ?: fetchSaveEmit() }.first()
}
@jonatbergn
Copy link
Author

Test

import junit.framework.TestCase.assertSame
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertEquals
import kotlin.test.fail
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.test.runTest

class DataAccessTest {

    @Test
    fun `get returns local data if available`() = runTest {
        val localData = "localData"
        val dataAccess = DataAccess(
            get = { localData },
            save = ::fail,
            fetch = ::fail,
        )
        val result = dataAccess.get()
        assertSame(localData, result)
    }

    @Test
    fun `getOrFetch returns local data when available`() = runTest {
        val localData = "localData"
        val dataAccess = DataAccess(
            get = { localData },
            save = ::fail,
            fetch = ::fail,
        )
        val result = dataAccess.getOrFetch()
        assertSame(localData, result)
    }

    @Test
    fun `getOrFetch fetches remote data when local data is not available`() = runTest {
        val remoteData = "remoteData"
        val dataAccess = DataAccess(
            fetch = { remoteData },
        )
        val result = dataAccess.getOrFetch()
        assertSame(remoteData, result)
    }

    @Test
    fun `flowGetAndFetch emits local then remote data`() = runTest {
        val localData = "localData"
        val remoteData = "remoteData"
        val dataAccess = DataAccess(
            current = localData,
            fetch = { remoteData },
        )
        val result = dataAccess.flowGetAndFetch.toList()
        assertContentEquals(listOf(localData, remoteData), result)
    }

    @Test
    fun `concurrent data access calls fetch only once`() = runTest {
        var fetches = 0
        val mutex = Mutex(locked = true)
        val dataAccess = DataAccess(
            get = { null },
            save = { },
            fetch = { mutex.withLock { fetches++ } },
        )
        val jobs = (0..9).map { async { dataAccess.flowGetAndFetch.collect() } }
        mutex.unlock()
        jobs.awaitAll()
        assertEquals(1, fetches)
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment