Last active
January 27, 2016 17:48
-
-
Save PaulStovell/4601772 to your computer and use it in GitHub Desktop.
An example using RavenDB's changes API to implement a queue of tasks.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reflection; | |
using System.Threading; | |
using Raven.Abstractions.Data; | |
using Raven.Client.Embedded; | |
using Raven.Client.Indexes; | |
namespace OctoQueues | |
{ | |
public class GlobalStats | |
{ | |
public static int DatabaseQueries; | |
public static int TasksCreated; | |
public static int TasksRun; | |
} | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var store = new EmbeddableDocumentStore(); | |
store.Initialize(); | |
IndexCreation.CreateIndexes(Assembly.GetEntryAssembly(), store); | |
var queue = new TaskQueue(store); | |
var scheduled = new ScheduledTaskQueue(store); | |
Console.WriteLine("Press any key to create a task"); | |
queue.Start(); | |
scheduled.Start(); | |
// Write stats to the title | |
ThreadPool.QueueUserWorkItem(delegate | |
{ | |
while (true) | |
{ | |
Thread.Sleep(100); | |
Console.Title = string.Format("Created: {0}; Run: {1}; Queries: {2}", GlobalStats.TasksCreated, GlobalStats.TasksRun, GlobalStats.DatabaseQueries); | |
} | |
}); | |
while (true) | |
{ | |
Console.ReadKey(true); | |
using (var session = store.OpenSession()) | |
{ | |
session.Store(new Task { Status = TaskStatus.ScheduledForLater, DueAfter = DateTime.UtcNow.AddSeconds(10) }); | |
session.SaveChanges(); | |
Interlocked.Increment(ref GlobalStats.TasksCreated); | |
} | |
} | |
} | |
} | |
internal class TaskQueue : IObserver<IndexChangeNotification>, IDisposable | |
{ | |
private readonly EmbeddableDocumentStore store; | |
private bool running = true; | |
private readonly ManualResetEventSlim tasksSignal = new ManualResetEventSlim(false); | |
private IDisposable subscription; | |
public TaskQueue(EmbeddableDocumentStore store) | |
{ | |
this.store = store; | |
} | |
public void OnNext(IndexChangeNotification value) | |
{ | |
tasksSignal.Set(); | |
} | |
public void OnError(Exception error) | |
{ | |
tasksSignal.Set(); | |
} | |
public void OnCompleted() | |
{ | |
tasksSignal.Set(); | |
} | |
public void Start() | |
{ | |
subscription = store.Changes().ForIndex(new QueuedTasksIndex().IndexName).Subscribe(this); | |
ThreadPool.QueueUserWorkItem(delegate | |
{ | |
while (running) | |
{ | |
Task task = null; | |
using (var session = store.OpenSession()) | |
{ | |
Interlocked.Increment(ref GlobalStats.DatabaseQueries); | |
var result = session.Query<Task, QueuedTasksIndex>().FirstOrDefault(); | |
if (result != null && result.Status == TaskStatus.Queued) | |
{ | |
result.Status = TaskStatus.Running; | |
session.SaveChanges(); | |
task = result; | |
} | |
} | |
if (task == null) | |
{ | |
tasksSignal.Wait(TimeSpan.FromSeconds(10)); | |
tasksSignal.Reset(); | |
} | |
else | |
{ | |
RunTask(task); | |
} | |
} | |
}); | |
} | |
private void RunTask(Task task) | |
{ | |
Interlocked.Increment(ref GlobalStats.TasksRun); | |
Console.WriteLine("Running task: " + task.Id); | |
} | |
public void Dispose() | |
{ | |
running = false; | |
subscription.Dispose(); | |
} | |
} | |
internal class ScheduledTaskQueue : IObserver<IndexChangeNotification>, IDisposable | |
{ | |
private readonly EmbeddableDocumentStore store; | |
private bool running = true; | |
private readonly ManualResetEventSlim tasksSignal = new ManualResetEventSlim(false); | |
private IDisposable subscription; | |
private List<TaskDueSoon> dueTasks = new List<TaskDueSoon>(); | |
public ScheduledTaskQueue(EmbeddableDocumentStore store) | |
{ | |
this.store = store; | |
} | |
public void OnNext(IndexChangeNotification value) | |
{ | |
tasksSignal.Set(); | |
} | |
public void OnError(Exception error) | |
{ | |
tasksSignal.Set(); | |
} | |
public void OnCompleted() | |
{ | |
tasksSignal.Set(); | |
} | |
public void Start() | |
{ | |
subscription = store.Changes().ForIndex(new ScheduledTasksIndex().IndexName).Subscribe(this); | |
// This thread syncs a local list of tasks due in the next hour or so | |
ThreadPool.QueueUserWorkItem(delegate | |
{ | |
while (running) | |
{ | |
// Re-sync our local cache of tasks due in the next hour | |
using (var session = store.OpenSession()) | |
{ | |
Interlocked.Increment(ref GlobalStats.DatabaseQueries); | |
var tasks = session.Query<Task, ScheduledTasksIndex>().ToList(); | |
dueTasks = tasks.Where(t => t.Status == TaskStatus.ScheduledForLater) | |
.Select(t => new TaskDueSoon {DueDate = t.DueAfter, TaskId = t.Id}) | |
.ToList(); | |
} | |
tasksSignal.Wait(TimeSpan.FromSeconds(60)); | |
tasksSignal.Reset(); | |
} | |
}); | |
// This thread runs the local list of tasks when they are due | |
ThreadPool.QueueUserWorkItem(delegate | |
{ | |
while (true) | |
{ | |
Thread.Sleep(1000); | |
var list = dueTasks; | |
foreach (var item in list.Where(d => d.DueDate < DateTime.UtcNow)) | |
{ | |
using (var session = store.OpenSession()) | |
{ | |
var task = session.Load<Task>(item.TaskId); | |
if (task.Status == TaskStatus.ScheduledForLater) | |
{ | |
Console.WriteLine(string.Format("Task: {0} was scheduled for {1}, it is now queued", item.TaskId, item.DueDate.ToLongTimeString())); | |
task.Status = TaskStatus.Queued; | |
session.SaveChanges(); | |
} | |
} | |
} | |
} | |
}); | |
} | |
private void RunTask(Task task) | |
{ | |
Interlocked.Increment(ref GlobalStats.TasksRun); | |
Console.WriteLine("Running task: " + task.Id); | |
} | |
public void Dispose() | |
{ | |
running = false; | |
subscription.Dispose(); | |
} | |
public class TaskDueSoon | |
{ | |
public string TaskId { get; set; } | |
public DateTime DueDate { get; set; } | |
} | |
} | |
public class ScheduledTasksIndex : AbstractIndexCreationTask<Task, ScheduledTasksIndex.Result> | |
{ | |
public ScheduledTasksIndex() | |
{ | |
Map = tasks => from task in tasks | |
where task.Status == TaskStatus.ScheduledForLater | |
select new Result { Id = task.Id }; | |
} | |
public class Result | |
{ | |
public string Id { get; set; } | |
} | |
} | |
public class QueuedTasksIndex : AbstractIndexCreationTask<Task, QueuedTasksIndex.Result> | |
{ | |
public QueuedTasksIndex() | |
{ | |
Map = tasks => from task in tasks | |
where task.Status == TaskStatus.Queued | |
select new Result { Id = task.Id }; | |
} | |
public class Result | |
{ | |
public string Id { get; set; } | |
} | |
} | |
public class Task | |
{ | |
public string Id { get; set; } | |
public TaskStatus Status { get; set; } | |
public DateTime DueAfter { get; set; } | |
} | |
public enum TaskStatus | |
{ | |
ScheduledForLater, | |
Queued, | |
Running, | |
Completed | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
What happens if an exception occurs, or the connection to the database is lost, during RunTask? How would you avoid the task being forever stuck at TaskStatus.Running and not being picked up again for processing.