Skip to content

Instantly share code, notes, and snippets.

@NaikSoftware
Last active May 12, 2022 14:17
Show Gist options
  • Save NaikSoftware/3105f60cf4cd0e7dbc7ef053169916ea to your computer and use it in GitHub Desktop.
Save NaikSoftware/3105f60cf4cd0e7dbc7ef053169916ea to your computer and use it in GitHub Desktop.
Repository design pattern for Dart
/*
* 2020-2021 NaikSoftware, drstranges, MagTuxGit
*/
abstract class CacheStorage<K, V> {
Future<dynamic> ensureInitialized();
Future<void> clear();
Future<CacheEntry<V>?> get(K cacheKey);
Future<void> put(K cacheKey, V data, {int? storeTime});
Future<void> delete(K cacheKey);
Stream<List<V>> watch();
Future<List<V>> getAll();
}
class CacheEntry<V> {
V data;
int storeTime;
CacheEntry(this.data, {required this.storeTime});
}
/*
* 2020-2021 NaikSoftware, drstranges, MagTuxGit
*/
import 'package:app/data/service/logger.dart';
import 'package:app/data/service/resource/cache_storage.dart';
import 'package:synchronized/synchronized.dart';
class SimpleMemoryCacheStorage<K, V> implements CacheStorage<K, V> {
static final _lock = Lock();
static final Map<String, Map> _boxes = {};
final _logger = const Logger('MemoryCacheStorage');
final String _boxKey;
SimpleMemoryCacheStorage(this._boxKey);
@override
Future<void> ensureInitialized() => Future.value();
static Future<void> clearAll() async {
_boxes.clear();
}
Future<Map<K, CacheEntry<V>>> _ensureBox() => _lock.synchronized(() async {
Map? box = _boxes[_boxKey];
if (box == null) {
box = <K, CacheEntry<V>>{};
_boxes[_boxKey] = box;
}
return box as Map<K, CacheEntry<V>>;
});
@override
Future<void> clear() => _ensureBox().then((box) => box.clear());
@override
Future<CacheEntry<V>?> get(K cacheKey) async {
return (await _ensureBox())[cacheKey];
}
@override
Future<void> put(K cacheKey, V data, {int? storeTime}) async {
final box = await _ensureBox();
box[cacheKey] = CacheEntry(
data,
storeTime: storeTime ?? DateTime.now().millisecondsSinceEpoch,
);
}
@override
Future<void> delete(K cacheKey) async {
(await _ensureBox()).remove(cacheKey);
}
@override
Stream<List<V>> watch() => throw UnsupportedError('Not supported!');
@override
Future<List<V>> getAll() =>
_ensureBox().then((box) => box.values.map((e) => e.data).toList());
@override
String toString() => 'MemoryCacheStorage($_boxKey)';
}
/*
* 2020-2021 NaikSoftware, drstranges, MagTuxGit
*/
import 'package:flutter/foundation.dart';
class Resource<T> {
final ResourceState state;
final T? data;
final String? message;
final dynamic error;
bool get isLoading => state == ResourceState.loading;
bool get isNotLoading => state != ResourceState.loading;
bool get isError => state == ResourceState.error;
bool get isNotError => state != ResourceState.error;
bool get isSuccess => state == ResourceState.success;
bool get hasData => data != null;
Resource._(this.state, {this.data, this.message, this.error});
factory Resource.loading([T? data]) =>
Resource._(ResourceState.loading, data: data);
factory Resource.success([T? data]) =>
Resource._(ResourceState.success, data: data);
factory Resource.error(String message, {dynamic error, T? data}) =>
Resource._(ResourceState.error,
message: message, error: error, data: data);
Resource<NewType> map<NewType>(NewType? Function(T?) transform) =>
Resource._(state, data: transform(data), message: message, error: error);
@override
bool operator ==(Object other) =>
identical(this, other) ||
other is Resource &&
runtimeType == other.runtimeType &&
state == other.state &&
_compareData(data, other.data) &&
message == other.message &&
error == other.error;
bool _compareData(dynamic v1, dynamic v2) =>
v1 is List && v2 is List ? listEquals(v1, v2) : v1 == v2;
@override
int get hashCode =>
state.hashCode ^ data.hashCode ^ message.hashCode ^ error.hashCode;
@override
String toString() {
return 'State : $state \n Message : $message \n Data : $data';
}
}
enum ResourceState {
loading,
success,
error,
}
/*
* Copyright 2020-2021 NaikSoftware, drstranges, MagTuxGit
*/
import 'dart:async';
import 'package:app/data/service/resource/cache_storage.dart';
import 'package:app/data/service/resource/resource.dart';
import 'package:app/data/service/resource/storage/hive_cache_storage.dart';
import 'package:app/data/service/resource/storage/objectbox/objectbox_cache_storage_stub.dart'
if (dart.library.io) 'package:eated/data/service/resource/storage/objectbox/objectbox_cache_storage.dart';
import 'package:app/data/service/resource/stream_resource.dart';
import 'package:flutter/foundation.dart';
import 'package:rxdart/rxdart.dart';
import 'package:synchronized/synchronized.dart';
class StreamRepository<K, V> {
final Map<K, StreamResource<K, V>> _resources = {};
final Future<V> Function(K key, ResourceFetchArguments? arguments)? fetch;
final CacheDurationResolver<K, V> cacheDurationResolver;
final Map<K, bool> _firstLoad = {};
final _lock = Lock();
final CacheStorage<K, V> storage;
StreamRepository({
this.fetch,
required String storageKey,
required V Function(dynamic json) decode,
Duration? cacheDuration,
CacheDurationResolver<K, V>? cacheDurationResolver,
}) : storage = kIsWeb
? HiveCacheStorage(storageKey, decode: decode)
: createObjectBoxCacheStorage(storageKey, decode: decode),
cacheDurationResolver =
(cacheDurationResolver ?? (k, v) => cacheDuration ?? Duration.zero);
StreamRepository.create({
this.fetch,
required this.storage,
Duration? cacheDuration,
CacheDurationResolver<K, V>? cacheDurationResolver,
}) : cacheDurationResolver =
(cacheDurationResolver ?? (k, v) => cacheDuration ?? Duration.zero);
Stream<Resource<V>> stream(
K key, {
bool? forceReload,
void Function(V)? doOnStore,
ResourceFetchArguments? fetchArguments,
bool allowEmptyLoading = false,
}) {
final force = forceReload ?? _firstLoad[key] ?? true;
_firstLoad[key] = false;
return _ensureResource(key)
.asStream()
.switchMap((resource) => resource.load(
forceReload: force,
doOnStore: doOnStore,
allowEmptyLoading: allowEmptyLoading,
fetchArguments: fetchArguments,
));
}
Future<Resource<V>> load(
K key, {
bool? forceReload,
void Function(V)? doOnStore,
ResourceFetchArguments? fetchArguments,
bool allowEmptyLoading = false,
}) =>
stream(
key,
forceReload: forceReload,
doOnStore: doOnStore,
fetchArguments: fetchArguments,
allowEmptyLoading: allowEmptyLoading,
).where((r) => r.isNotLoading).first;
Future<void> invalidate(K key) =>
_resources[key]?.invalidate() ?? Future.value();
Future<void> invalidateAll() =>
Future.wait(_resources.values.map((r) => r.invalidate()));
Future<void> updateValue(K key, V? Function(V? value) changeValue,
{bool notifyOnNull = false}) =>
_ensureResource(key)
.then((r) => r.updateValue(changeValue, notifyOnNull: notifyOnNull));
Future<void> putValue(K key, V value) =>
_ensureResource(key).then((r) => r.putValue(value));
Stream<List<V>> watch() => storage.watch();
Future<List<V>> getAll() => storage.getAll();
Future<void> clear([K? key]) => _lock.synchronized(() async {
if (key != null) {
await _resources[key]?.close();
_resources.remove(key);
} else {
_resources.clear();
await storage.clear();
}
});
Future<StreamResource<K, V>> _ensureResource(K key) =>
_lock.synchronized(() async {
var resource = _resources[key];
if (resource == null) {
resource = StreamResource<K, V>(
fetch,
key,
cacheDurationResolver,
storage,
);
_resources[key] = resource;
}
return resource;
});
}
/*
* 2020-2021 NaikSoftware, drstranges, MagTuxGit
*/
import 'dart:async';
import 'package:app/data/service/logger.dart';
import 'package:app/data/service/resource/cache_storage.dart';
import 'package:app/data/service/resource/resource.dart';
import 'package:rxdart/rxdart.dart';
import 'package:synchronized/synchronized.dart';
class StreamResource<K, V> {
final _logger = const Logger('StreamResource');
final BehaviorSubject<Resource<V>> _subject = BehaviorSubject<Resource<V>>();
final CacheStorage<K, V> _storage;
final Future<V> Function(K key, ResourceFetchArguments? arguments)? fetch;
final K resourceKey;
final CacheDurationResolver<K, V> cacheDurationResolver;
final _lock = Lock();
bool _isLoading = false;
bool _shouldReload = false;
StreamResource(
this.fetch,
this.resourceKey,
this.cacheDurationResolver,
this._storage,
);
/// forceReload - reload even if cache is valid
/// allowEmptyLoading - put empty loading to prevent previous SUCCESS to return
Stream<Resource<V>> load({
bool forceReload = false,
void Function(V)? doOnStore,
bool allowEmptyLoading = false,
final ResourceFetchArguments? fetchArguments,
}) {
if (!_isLoading) {
_isLoading = true;
_lock.synchronized(() async {
_shouldReload = false;
// try always starting with loading value
if (allowEmptyLoading || _subject.hasValue) {
// prevent previous SUCCESS to return
_subject.add(Resource.loading(_subject.valueOrNull?.data));
}
await _loadProcess(forceReload, doOnStore, fetchArguments);
}).then((_) {
_isLoading = false;
if (_shouldReload) {
load(
forceReload: forceReload,
doOnStore: doOnStore,
allowEmptyLoading: allowEmptyLoading,
);
}
});
} else if (forceReload) {
// don't need to call load many times
// perform another load only once
_shouldReload = true;
}
return _subject;
}
Future<void> _loadProcess(
bool forceReload,
void Function(V)? doOnStore,
ResourceFetchArguments? fetchArguments,
) async {
// fetch value from DB
final cached = await _storage.get(resourceKey);
if (cached != null) {
final cacheDuration =
cacheDurationResolver(resourceKey, cached.data).inMilliseconds;
if (cached.storeTime <
DateTime.now().millisecondsSinceEpoch - cacheDuration) {
forceReload = true;
}
}
if (cached != null || fetch == null) {
if (forceReload && fetch != null) {
final resource = Resource.loading(cached?.data);
if (_subject.valueOrNull != resource) {
_subject.add(resource);
}
} else {
final resource = Resource.success(cached?.data);
if (_subject.valueOrNull != resource) {
_subject.add(resource);
}
return;
}
}
// no need to perform another load while fetch not called yet
_shouldReload = false;
// fetch value from network
return _subject.addStream(fetch!(resourceKey, fetchArguments)
.asStream()
.asyncMap((data) async {
if (doOnStore != null) {
doOnStore(data);
}
await _storage.put(resourceKey, data);
return data;
})
.map((data) => Resource.success(data))
.doOnError((error, trace) => _logger.e(
'Error loading resource by id $resourceKey with storage $_storage',
error: error,
stackTrace: trace))
.onErrorReturnWith((error, trace) => Resource.error(
'Resource $resourceKey loading error',
error: error,
data: cached?.data)));
}
Future<void> updateValue(V? Function(V? value) changeValue,
{bool notifyOnNull = false}) async {
_lock.synchronized(() async {
final cached = await _storage.get(resourceKey);
final newValue = changeValue.call(cached?.data);
if (newValue != null) {
await _storage.put(
resourceKey,
newValue,
storeTime: cached?.storeTime ?? 0,
);
_subject.add(Resource.success(newValue));
} else if (cached != null) {
await _storage.delete(resourceKey);
if (notifyOnNull) _subject.add(Resource.success(null));
}
});
}
Future<void> putValue(V value) async {
assert(value != null);
_lock.synchronized(() async {
await _storage.put(resourceKey, value);
_subject.add(Resource.success(value));
});
}
Future<void> _clearCache() async {
await _storage.ensureInitialized();
await _storage.delete(resourceKey);
}
Future<void> _resetStoreTime() async {
_lock.synchronized(() async {
var cached = await _storage.get(resourceKey);
if (cached != null) {
await _storage.put(resourceKey, cached.data, storeTime: 0);
} else {
await _storage.delete(resourceKey);
}
});
}
Future<void> invalidate() async {
// don't clear cache for offline usage
//await _clearCache();
await _resetStoreTime();
await load(forceReload: true).where((event) => event.isNotLoading).first;
}
Future<void> close() async {
await _clearCache();
_subject.close(); // TODO: Maybe leave not closed? Need tests
}
}
class ResourceFetchArguments {
final int? limit;
final String? permissionToken;
final dynamic payload;
const ResourceFetchArguments({
this.limit,
this.permissionToken,
this.payload,
});
@override
bool operator ==(Object other) =>
identical(this, other) ||
other is ResourceFetchArguments &&
runtimeType == other.runtimeType &&
permissionToken == other.permissionToken;
@override
int get hashCode => permissionToken.hashCode;
}
typedef CacheDurationResolver<K, V> = Duration Function(K key, V value);
// Setup repository with parameters
_userRepository = StreamRepository(
fetch: (key, arguments) => _api.getUser(userId: key),
storageKey: 'users',
decode: (json) => UserDto.fromJson(json),
cacheDuration: const Duration(days: 1),
);
// Get data stream
Stream<Resource<UserDto>> getUser({
String userId = 'me',
bool forceReload = false,
}) =>
_userRepository.stream(
userId,
forceReload: forceReload,
);
// Subscribe to data
getIt.get<UserService>().getProfile().listen((resource) {
if (resource.hasData) {
setState(currentState.copyWith(profile: resource.data));
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment