Skip to content

Instantly share code, notes, and snippets.

@adamashton
Created August 15, 2016 19:03
Show Gist options
  • Save adamashton/cb3ee24c05a683176a94a94708f92f5a to your computer and use it in GitHub Desktop.
Save adamashton/cb3ee24c05a683176a94a94708f92f5a to your computer and use it in GitHub Desktop.
An Asynchronous persistent queue implementation utilizing Akavache as the persistent store.
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reactive.Linq;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading;
using System.Threading.Tasks;
using Akavache;
namespace AkavacheQueue
{
public class AsyncQueue<T>
{
private static BinaryFormatter formatter = new BinaryFormatter();
private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);
private readonly IBlobCache _blobCache;
private readonly string _queuePrefix;
private long _headIndex;
private long _size;
public string QueueName { get; private set; }
public AsyncQueue(IBlobCache blobCache, string queueName)
{
_blobCache = blobCache;
QueueName = queueName.ToLower();
_queuePrefix = $"asyncqueue-{QueueName}-";
_headIndex = 0;
_size = 0;
}
public async Task EnsureInitializedAsync()
{
IEnumerable<string> allKeys = await _blobCache.GetAllKeys();
// all keys prefixed with: asyncqueue-queuename-{index}
List<long> indexes = new List<long>();
foreach (var key in allKeys)
{
if (key.StartsWith(_queuePrefix))
{
string indexStrValue = key.Substring(_queuePrefix.Length, key.Length - _queuePrefix.Length);
indexes.Add(long.Parse(indexStrValue));
}
}
if (indexes.Any())
{
var orderedEnumerable = indexes.OrderBy(x => x).ToList();
long firstIndexFound = orderedEnumerable.First();
long lastIndexFound = orderedEnumerable.Last();
_headIndex = firstIndexFound;
_size = lastIndexFound - firstIndexFound;
}
}
public bool IsEmpty()
{
return _size == 0;
}
public async Task<T> PopAsync()
{
await _semaphoreSlim.WaitAsync();
try
{
if (IsEmpty())
return default(T);
var keyName = getKeyName(_headIndex);
// get data
byte[] bytes = await _blobCache.Get(keyName);
// delete from cache
await _blobCache.Invalidate(keyName);
// queue has 1 less item
_size--;
// and the head has moved
_headIndex++;
return Deserialize<T>(bytes);
}
finally
{
_semaphoreSlim.Release();
}
}
public async Task<T> PeakAsync()
{
await _semaphoreSlim.WaitAsync();
try
{
if (IsEmpty())
return default(T);
var keyName = getKeyName(_headIndex);
byte[] bytes = await _blobCache.Get(keyName);
return Deserialize<T>(bytes);
}
finally
{
_semaphoreSlim.Release();
}
}
public async Task EnqueueAsync(T item)
{
await _semaphoreSlim.WaitAsync();
try
{
long index = _headIndex + _size;
var keyName = getKeyName(index);
await _blobCache.Insert(keyName, Serialize(item));
_size++;
}
finally
{
_semaphoreSlim.Release();
}
}
private string getKeyName(long index)
{
return $"{_queuePrefix}{index}";
}
private static byte[] Serialize(object o)
{
using (var stream = new MemoryStream())
{
formatter.Serialize(stream, o);
return stream.ToArray();
}
}
private static T Deserialize<T>(byte[] data)
{
using (var stream = new MemoryStream(data))
{
object deserialize = formatter.Deserialize(stream);
return (T)deserialize;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment