Skip to content

Instantly share code, notes, and snippets.

@PaulStovell
Last active January 27, 2016 17:48
Show Gist options
  • Save PaulStovell/4601772 to your computer and use it in GitHub Desktop.
Save PaulStovell/4601772 to your computer and use it in GitHub Desktop.
An example using RavenDB's changes API to implement a queue of tasks.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using Raven.Abstractions.Data;
using Raven.Client.Embedded;
using Raven.Client.Indexes;
namespace OctoQueues
{
public class GlobalStats
{
public static int DatabaseQueries;
public static int TasksCreated;
public static int TasksRun;
}
class Program
{
static void Main(string[] args)
{
var store = new EmbeddableDocumentStore();
store.Initialize();
IndexCreation.CreateIndexes(Assembly.GetEntryAssembly(), store);
var queue = new TaskQueue(store);
var scheduled = new ScheduledTaskQueue(store);
Console.WriteLine("Press any key to create a task");
queue.Start();
scheduled.Start();
// Write stats to the title
ThreadPool.QueueUserWorkItem(delegate
{
while (true)
{
Thread.Sleep(100);
Console.Title = string.Format("Created: {0}; Run: {1}; Queries: {2}", GlobalStats.TasksCreated, GlobalStats.TasksRun, GlobalStats.DatabaseQueries);
}
});
while (true)
{
Console.ReadKey(true);
using (var session = store.OpenSession())
{
session.Store(new Task { Status = TaskStatus.ScheduledForLater, DueAfter = DateTime.UtcNow.AddSeconds(10) });
session.SaveChanges();
Interlocked.Increment(ref GlobalStats.TasksCreated);
}
}
}
}
internal class TaskQueue : IObserver<IndexChangeNotification>, IDisposable
{
private readonly EmbeddableDocumentStore store;
private bool running = true;
private readonly ManualResetEventSlim tasksSignal = new ManualResetEventSlim(false);
private IDisposable subscription;
public TaskQueue(EmbeddableDocumentStore store)
{
this.store = store;
}
public void OnNext(IndexChangeNotification value)
{
tasksSignal.Set();
}
public void OnError(Exception error)
{
tasksSignal.Set();
}
public void OnCompleted()
{
tasksSignal.Set();
}
public void Start()
{
subscription = store.Changes().ForIndex(new QueuedTasksIndex().IndexName).Subscribe(this);
ThreadPool.QueueUserWorkItem(delegate
{
while (running)
{
Task task = null;
using (var session = store.OpenSession())
{
Interlocked.Increment(ref GlobalStats.DatabaseQueries);
var result = session.Query<Task, QueuedTasksIndex>().FirstOrDefault();
if (result != null && result.Status == TaskStatus.Queued)
{
result.Status = TaskStatus.Running;
session.SaveChanges();
task = result;
}
}
if (task == null)
{
tasksSignal.Wait(TimeSpan.FromSeconds(10));
tasksSignal.Reset();
}
else
{
RunTask(task);
}
}
});
}
private void RunTask(Task task)
{
Interlocked.Increment(ref GlobalStats.TasksRun);
Console.WriteLine("Running task: " + task.Id);
}
public void Dispose()
{
running = false;
subscription.Dispose();
}
}
internal class ScheduledTaskQueue : IObserver<IndexChangeNotification>, IDisposable
{
private readonly EmbeddableDocumentStore store;
private bool running = true;
private readonly ManualResetEventSlim tasksSignal = new ManualResetEventSlim(false);
private IDisposable subscription;
private List<TaskDueSoon> dueTasks = new List<TaskDueSoon>();
public ScheduledTaskQueue(EmbeddableDocumentStore store)
{
this.store = store;
}
public void OnNext(IndexChangeNotification value)
{
tasksSignal.Set();
}
public void OnError(Exception error)
{
tasksSignal.Set();
}
public void OnCompleted()
{
tasksSignal.Set();
}
public void Start()
{
subscription = store.Changes().ForIndex(new ScheduledTasksIndex().IndexName).Subscribe(this);
// This thread syncs a local list of tasks due in the next hour or so
ThreadPool.QueueUserWorkItem(delegate
{
while (running)
{
// Re-sync our local cache of tasks due in the next hour
using (var session = store.OpenSession())
{
Interlocked.Increment(ref GlobalStats.DatabaseQueries);
var tasks = session.Query<Task, ScheduledTasksIndex>().ToList();
dueTasks = tasks.Where(t => t.Status == TaskStatus.ScheduledForLater)
.Select(t => new TaskDueSoon {DueDate = t.DueAfter, TaskId = t.Id})
.ToList();
}
tasksSignal.Wait(TimeSpan.FromSeconds(60));
tasksSignal.Reset();
}
});
// This thread runs the local list of tasks when they are due
ThreadPool.QueueUserWorkItem(delegate
{
while (true)
{
Thread.Sleep(1000);
var list = dueTasks;
foreach (var item in list.Where(d => d.DueDate < DateTime.UtcNow))
{
using (var session = store.OpenSession())
{
var task = session.Load<Task>(item.TaskId);
if (task.Status == TaskStatus.ScheduledForLater)
{
Console.WriteLine(string.Format("Task: {0} was scheduled for {1}, it is now queued", item.TaskId, item.DueDate.ToLongTimeString()));
task.Status = TaskStatus.Queued;
session.SaveChanges();
}
}
}
}
});
}
private void RunTask(Task task)
{
Interlocked.Increment(ref GlobalStats.TasksRun);
Console.WriteLine("Running task: " + task.Id);
}
public void Dispose()
{
running = false;
subscription.Dispose();
}
public class TaskDueSoon
{
public string TaskId { get; set; }
public DateTime DueDate { get; set; }
}
}
public class ScheduledTasksIndex : AbstractIndexCreationTask<Task, ScheduledTasksIndex.Result>
{
public ScheduledTasksIndex()
{
Map = tasks => from task in tasks
where task.Status == TaskStatus.ScheduledForLater
select new Result { Id = task.Id };
}
public class Result
{
public string Id { get; set; }
}
}
public class QueuedTasksIndex : AbstractIndexCreationTask<Task, QueuedTasksIndex.Result>
{
public QueuedTasksIndex()
{
Map = tasks => from task in tasks
where task.Status == TaskStatus.Queued
select new Result { Id = task.Id };
}
public class Result
{
public string Id { get; set; }
}
}
public class Task
{
public string Id { get; set; }
public TaskStatus Status { get; set; }
public DateTime DueAfter { get; set; }
}
public enum TaskStatus
{
ScheduledForLater,
Queued,
Running,
Completed
}
}
@BlackLight-Matt-Smith
Copy link

What happens if an exception occurs, or the connection to the database is lost, during RunTask? How would you avoid the task being forever stuck at TaskStatus.Running and not being picked up again for processing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment