Skip to content

Instantly share code, notes, and snippets.

@slavanap
Last active May 11, 2019 20:41
Show Gist options
  • Save slavanap/3b9c436c2defae8b1d305ad30fdfe889 to your computer and use it in GitHub Desktop.
Save slavanap/3b9c436c2defae8b1d305ad30fdfe889 to your computer and use it in GitHub Desktop.
Populate cache from single thread.
/*
class UsageSample {
public DistributedCacheUpdater _cache;
async Task<JObject> GetResponse(HttpRequestMessage httpRequest, DistributedCacheEntryOptions cacheOptions, CancellationToken ct) {
using (var proxy = await DistributedCacheUpdater.RequestAsync(_cache, httpRequest.RequestUri.ToString(), ct)) {
string responseData;
if (proxy.Value == null) {
_metrics.Measure.Meter.Mark(_cacheHits, "nohit");
using (var httpResponse = await _client.SendAsync(httpRequest, HttpCompletionOption.ResponseContentRead, ct)) {
httpResponse.EnsureSuccessStatusCode();
responseData = await httpResponse.Content.ReadAsStringAsync();
}
await proxy.Update(Encoding.UTF8.GetBytes(responseData), cacheOptions, ct);
}
else {
_metrics.Measure.Meter.Mark(_cacheHits, "hit");
responseData = Encoding.UTF8.GetString(proxy.Value);
}
return JObject.Parse(responseData);
}
}
// ...
}
*/
using Microsoft.Extensions.Caching.Distributed;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
// MIT license. Vyacheslav Napadovsky
namespace CoreApp.Extensions {
public class DistributedCacheUpdater {
public class ValueProxy: IDisposable {
private readonly DistributedCacheUpdater _helper;
private TaskCompletionSource<byte[]> _notifyCreated;
public readonly string Key;
public byte[] Value { get; private set; }
public ValueProxy(string key) {
_helper = null;
_notifyCreated = null;
Key = key;
Value = null;
}
internal ValueProxy(DistributedCacheUpdater helper, string key, byte[] value) {
_helper = helper;
_notifyCreated = null;
Key = key;
Value = value;
}
internal ValueProxy(DistributedCacheUpdater helper, string key, TaskCompletionSource<byte[]> notifyCreated) {
_helper = helper;
_notifyCreated = notifyCreated;
Key = key;
Value = null;
}
private void Notify(byte[] newValue) {
if (_notifyCreated != null) {
TaskCompletionSource<byte[]> notifyCreated;
if (!_helper._pendingUpdates.TryRemove(Key, out notifyCreated))
throw new InvalidOperationException("Key should exist in the dictionary");
notifyCreated.SetResult(newValue);
if (notifyCreated != _notifyCreated)
throw new InvalidOperationException("Expected correct notifyCreated value");
_notifyCreated = null;
}
}
public async Task Update(byte[] value, DistributedCacheEntryOptions options, CancellationToken ct) {
if (_helper != null) {
await _helper._cache.SetAsync(Key, value, options, ct);
Value = value;
Notify(value);
}
}
public void Dispose() {
if (_helper != null)
Notify(null);
}
}
readonly IDistributedCache _cache;
readonly ConcurrentDictionary<string, TaskCompletionSource<byte[]>> _pendingUpdates =
new ConcurrentDictionary<string, TaskCompletionSource<byte[]>>();
public DistributedCacheUpdater(IDistributedCache cache) {
_cache = cache;
}
public async Task<ValueProxy> RequestAsync(string key, CancellationToken ct) {
while (true) {
var result = await _cache.GetAsync(key, ct);
if (result != null)
return new ValueProxy(this, key, result);
var newTaskCompletionSource = new TaskCompletionSource<byte[]>();
var notifyCreated = _pendingUpdates.GetOrAdd(key, newTaskCompletionSource);
if (notifyCreated == newTaskCompletionSource) // == if added
return new ValueProxy(this, key, notifyCreated);
result = await notifyCreated.Task.ConfigureAwait(false);
if (result != null)
return new ValueProxy(this, key, result);
}
}
public static Task<ValueProxy> RequestAsync(DistributedCacheUpdater updater, string key, CancellationToken ct) {
if (updater != null)
return updater.RequestAsync(key, ct);
return Task.FromResult(new ValueProxy(key));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment