Skip to content

Instantly share code, notes, and snippets.

@ayende
Created June 9, 2020 15:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ayende/6351d6989f111f0a4f84b301611525ce to your computer and use it in GitHub Desktop.
Save ayende/6351d6989f111f0a4f84b301611525ce to your computer and use it in GitHub Desktop.
using System;
using System.Linq;
using System.Net.Sockets;
using System.Threading.Tasks;
using Nito.AsyncEx;
using Raven.Client.Documents;
using System.Reactive.Linq;
using System.Threading;
using Raven.Client.Documents.Indexes;
using Raven.Client.Documents.Linq.Indexing;
using Raven.Client.Documents.Operations.CompareExchange;
using Raven.Client.Documents.Session;
using Raven.Client.Documents.Session.Loaders;
using Raven.Client.Documents.Subscriptions;
using Raven.Client.Exceptions;
using Raven.Client.Exceptions.Documents.Subscriptions;
using Raven.Client.Json;
namespace Crypto
{
class Program
{
static async Task Main(string[] args)
{
using var store = new DocumentStore
{
Urls = new[] { "http://live-test.ravendb.net" },
Database = "Demo",
};
var defaultCollectionName = store.Conventions.FindCollectionName;
store.Conventions.FindCollectionName = type =>
typeof(Command).IsAssignableFrom(type) ?
"Commands" : defaultCollectionName(type);
store.Initialize();
await new ReadyCommands().ExecuteAsync(store);
await CreateCommandsSubscription(store);
// or
await CreateSubscription<SendNewEmployeeMailCommand>(store);
await CreateSubscription<SendBirthdayCardForEmployeeCommand>(store);
//var _ = SendNewEmployeeEmail(store).ContinueWith(PrintStatus);
var tasks = new[]
{
CompetingProcessors_ClusterVersion(store, "1"),
CompetingProcessors_ClusterVersion(store, "2"),
CompetingProcessors_ClusterVersion(store, "3")
};
await RegisterNewEmployee(store, "Ayende", new DateTime(1980,1,2));
await RegisterNewEmployee(store, "Oren", new DateTime(1980, 1, 2));
Console.WriteLine("Done...");
Console.ReadLine();
}
private static void PrintStatus(Task task)
{
switch (task.Status)
{
case TaskStatus.RanToCompletion:
Console.WriteLine("Done successfully");
break;
case TaskStatus.Canceled:
Console.WriteLine("Task cancelled");
break;
case TaskStatus.Faulted:
Console.WriteLine("Error!");
foreach (var exception in task.Exception.InnerExceptions)
{
var e = exception;
while (e != null)
{
Console.WriteLine(e.Message);
e = e.InnerException;
}
}
break;
default:
Console.WriteLine("No idea: " + task.Status);
break;
}
}
public static async Task CompetingProcessors(IDocumentStore store, string id)
{
var hasMsgs = new AsyncManualResetEvent(true);
store.Changes()
.ForIndex(new ReadyCommands().IndexName)
.Subscribe(x => hasMsgs.Set());
do
{
using (var session = store.OpenAsyncSession())
{
var result = await session.Query<Command, ReadyCommands>()
.OrderBy("Date")
.Include(include => include.IncludeDocuments<Locker>(x => x.Id))
.FirstOrDefaultAsync();
if (result == null)
{
await hasMsgs.WaitAsync(/*timeout*/);
continue;
}
var locker = await session.LoadAsync<Locker>("lockers/" + result.Id);
if (locker != null)
{
// index didn't catch up? we'll retry
continue;
}
locker = new Locker
{
ClientId = id
};
await session.StoreAsync(locker, "lockers/" + result.Id);
session.Advanced.GetMetadataFor(locker)["@expires"] = DateTime.UtcNow.AddMinutes(2);
session.Advanced.UseOptimisticConcurrency = true;
try
{
await session.SaveChangesAsync();
}
catch (ConcurrencyException)
{
// expected, someone else got the lock, will retry
continue;
}
await HandleCommand(session, result, id);
session.Delete(result.Id);
session.Delete(locker);
try
{
await session.SaveChangesAsync();
}
catch (ConcurrencyException)
{
// we timed out on the lock and it expired, probably
continue;
}
}
} while (true);
}
public static async Task CompetingProcessors_ClusterVersion(IDocumentStore store, string id)
{
var hasMsgs = new AsyncManualResetEvent(true);
store.Changes()
.ForDocumentsInCollection<Command>()
.Subscribe(x => hasMsgs.Set());
do
{
using (var session = store.OpenAsyncSession(new SessionOptions
{
TransactionMode = TransactionMode.ClusterWide
}))
{
var result = await session.Query<Command, ReadyCommands>()
.OrderBy("random()")
.Include(include => include.IncludeCompareExchangeValue(x => x.Id))
.FirstOrDefaultAsync();
if (result == null)
{
await hasMsgs.WaitAsync();
continue;
}
var locker = await session.Advanced.ClusterTransaction.GetCompareExchangeValueAsync<Locker>(result.Id);
if (locker != null)
{
// index didn't catch up? we'll retry
continue;
}
locker = session.Advanced.ClusterTransaction.CreateCompareExchangeValue(result.Id, new Locker
{
ClientId = id
});
locker.Metadata["@expires"] = DateTime.UtcNow.AddMinutes(2);
try
{
await session.SaveChangesAsync();
}
catch (ConcurrencyException)
{
// expected, someone else got the lock, will retry
continue;
}
await HandleCommand(session, result, id);
session.Delete(result.Id);
session.Advanced.ClusterTransaction.DeleteCompareExchangeValue(locker);
try
{
await session.SaveChangesAsync();
}
catch (ConcurrencyException)
{
// we timed out on the lock and it expired, probably
continue;
}
}
} while (true);
}
private static async Task HandleCommand(IAsyncDocumentSession session, Command cmd, string id)
{
Console.WriteLine($"Processor #{id} Handling command: {cmd.Id}");
}
public class Locker
{
public string ClientId;
}
public class ReadyCommands : AbstractIndexCreationTask<Command>
{
public ReadyCommands()
{
Map = cmds =>
from cmd in cmds
let locker = LoadDocument<Locker>("lockers/" + cmd.Id)
where locker == null
select new
{
Date = MetadataFor(cmd)["@last-modified"]
};
}
}
public static Task SendNewEmployeeEmail(IDocumentStore store)
{
var worker = store.Subscriptions.GetSubscriptionWorker<SendNewEmployeeMailCommand>(
new SubscriptionWorkerOptions("SendNewEmployeeMailCommand_Subscription")
{
Strategy = SubscriptionOpeningStrategy.WaitForFree
});
return worker.Run(async batch =>
{
using var session = batch.OpenAsyncSession();
Console.WriteLine($"Starting batch with {batch.NumberOfItemsInBatch} items");
foreach (var item in batch.Items)
{
SendNewEmployeeMailCommand cmd = item.Result;
var emp = await session.LoadAsync<Employee>(cmd.Employee);
try
{
SendEmail(emp);
session.Delete(cmd);
break;
}
catch (Exception e)
{
cmd.Error = e.ToString();
}
}
await session.SaveChangesAsync();
});
}
private static void SendEmail(Employee emp)
{
if (emp.Name.Length > 4)
throw new ArgumentException("Max emp name is 4");
Console.WriteLine($"{emp.Name}, welcome! ({emp.Id})");
}
public static async Task CreateCommandsSubscription(IDocumentStore store)
{
var taskName = "Commands_Subscription";
try
{
await store.Subscriptions.GetSubscriptionStateAsync(taskName);
return;
}
catch (SubscriptionDoesNotExistException)
{
// expected
}
await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions<Command>
{
Name = taskName,
Filter = c => c.Error == null
});
}
public static async Task CreateSubscription<T>(IDocumentStore store)
where T : Command
{
var name = typeof(T).Name;
var taskName = $"{name}_Subscription";
try
{
await store.Subscriptions.GetSubscriptionStateAsync(taskName);
return;
}
catch (SubscriptionDoesNotExistException)
{
// expected
}
await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions
{
Name = taskName,
Query = $@"
declare function filter(c){{
return c.Type == '{name}' &&
c.Error == null &&
!c['@metadata'].hasOwnProperty('@refresh');
}}
from Commands as c where filter(c)
"
});
}
public static async Task RegisterNewEmployee(IDocumentStore store, string name, DateTime birthday)
{
var nextBirthday = new DateTime(DateTime.Today.Year, birthday.Month, birthday.Day);
if (nextBirthday < DateTime.Today)
nextBirthday = nextBirthday.AddYears(1);
using var session = store.OpenAsyncSession();
var newEmployee = new Employee(name, birthday);
await session.StoreAsync(newEmployee);
await session.StoreAsync(new SendNewEmployeeMailCommand(newEmployee.Id));
var birthdayCmd = new SendBirthdayCardForEmployeeCommand
{
Employee = newEmployee.Id,
};
await session.StoreAsync(birthdayCmd);
session.Advanced.GetMetadataFor(birthdayCmd)["@refresh"] = nextBirthday;
await session.SaveChangesAsync(); // single transaction
}
}
public class Employee
{
public DateTime Birthday;
public string Name;
public string Id;
public Employee()
{
}
public Employee(string name, DateTime birthday)
{
Birthday = birthday;
Name = name;
}
}
public class Command
{
public string Id;
public string Error;
public string Type => GetType().Name;
}
public class BatchCommand : Command
{
public Command[] Commands;
}
public class SendBirthdayCardForEmployeeCommand : Command
{
public string Employee;
}
public class SendNewEmployeeMailCommand : Command
{
public string Employee;
public SendNewEmployeeMailCommand()
{
}
public SendNewEmployeeMailCommand(string employee)
{
Employee = employee;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment