Created
August 15, 2016 19:03
-
-
Save adamashton/cb3ee24c05a683176a94a94708f92f5a to your computer and use it in GitHub Desktop.
An Asynchronous persistent queue implementation utilizing Akavache as the persistent store.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
using System.IO; | |
using System.Linq; | |
using System.Reactive.Linq; | |
using System.Runtime.Serialization.Formatters.Binary; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Akavache; | |
namespace AkavacheQueue | |
{ | |
public class AsyncQueue<T> | |
{ | |
private static BinaryFormatter formatter = new BinaryFormatter(); | |
private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1); | |
private readonly IBlobCache _blobCache; | |
private readonly string _queuePrefix; | |
private long _headIndex; | |
private long _size; | |
public string QueueName { get; private set; } | |
public AsyncQueue(IBlobCache blobCache, string queueName) | |
{ | |
_blobCache = blobCache; | |
QueueName = queueName.ToLower(); | |
_queuePrefix = $"asyncqueue-{QueueName}-"; | |
_headIndex = 0; | |
_size = 0; | |
} | |
public async Task EnsureInitializedAsync() | |
{ | |
IEnumerable<string> allKeys = await _blobCache.GetAllKeys(); | |
// all keys prefixed with: asyncqueue-queuename-{index} | |
List<long> indexes = new List<long>(); | |
foreach (var key in allKeys) | |
{ | |
if (key.StartsWith(_queuePrefix)) | |
{ | |
string indexStrValue = key.Substring(_queuePrefix.Length, key.Length - _queuePrefix.Length); | |
indexes.Add(long.Parse(indexStrValue)); | |
} | |
} | |
if (indexes.Any()) | |
{ | |
var orderedEnumerable = indexes.OrderBy(x => x).ToList(); | |
long firstIndexFound = orderedEnumerable.First(); | |
long lastIndexFound = orderedEnumerable.Last(); | |
_headIndex = firstIndexFound; | |
_size = lastIndexFound - firstIndexFound; | |
} | |
} | |
public bool IsEmpty() | |
{ | |
return _size == 0; | |
} | |
public async Task<T> PopAsync() | |
{ | |
await _semaphoreSlim.WaitAsync(); | |
try | |
{ | |
if (IsEmpty()) | |
return default(T); | |
var keyName = getKeyName(_headIndex); | |
// get data | |
byte[] bytes = await _blobCache.Get(keyName); | |
// delete from cache | |
await _blobCache.Invalidate(keyName); | |
// queue has 1 less item | |
_size--; | |
// and the head has moved | |
_headIndex++; | |
return Deserialize<T>(bytes); | |
} | |
finally | |
{ | |
_semaphoreSlim.Release(); | |
} | |
} | |
public async Task<T> PeakAsync() | |
{ | |
await _semaphoreSlim.WaitAsync(); | |
try | |
{ | |
if (IsEmpty()) | |
return default(T); | |
var keyName = getKeyName(_headIndex); | |
byte[] bytes = await _blobCache.Get(keyName); | |
return Deserialize<T>(bytes); | |
} | |
finally | |
{ | |
_semaphoreSlim.Release(); | |
} | |
} | |
public async Task EnqueueAsync(T item) | |
{ | |
await _semaphoreSlim.WaitAsync(); | |
try | |
{ | |
long index = _headIndex + _size; | |
var keyName = getKeyName(index); | |
await _blobCache.Insert(keyName, Serialize(item)); | |
_size++; | |
} | |
finally | |
{ | |
_semaphoreSlim.Release(); | |
} | |
} | |
private string getKeyName(long index) | |
{ | |
return $"{_queuePrefix}{index}"; | |
} | |
private static byte[] Serialize(object o) | |
{ | |
using (var stream = new MemoryStream()) | |
{ | |
formatter.Serialize(stream, o); | |
return stream.ToArray(); | |
} | |
} | |
private static T Deserialize<T>(byte[] data) | |
{ | |
using (var stream = new MemoryStream(data)) | |
{ | |
object deserialize = formatter.Deserialize(stream); | |
return (T)deserialize; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment