Skip to content

Instantly share code, notes, and snippets.

@slabarque
Created November 18, 2013 14:31
Show Gist options
  • Save slabarque/7528713 to your computer and use it in GitHub Desktop.
Save slabarque/7528713 to your computer and use it in GitHub Desktop.
A small application that creates a number of documents in Raven and performs concurrent updates on them while also publishing a message to MSMQ. When playing with the parameters noParallelTests and noParallelTxUpdates you can see that sometimes not all updates where performed, even when DTC could succesfully commit.
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Raven.Abstractions.Exceptions;
using Raven.Client;
using Raven.Client.Document;
namespace TestRavenConcurrentDTC
{
internal class Program
{
private static DocumentStore store;
private static object lockObj = new object();
private static bool delayRetries = true;
private static void Main(string[] args)
{
Stopwatch stopwatch = new Stopwatch();
//play with the parameters
int noParallelTests = 3;//number of documents that will be created and updated
int noParallelTxUpdates = 200;//number publishes to MSMQ / updates to a document
InitializeDocumentStore();
stopwatch.Start();
RunParallelTests(noParallelTests, noParallelTxUpdates);
stopwatch.Stop();
WriteDebug(string.Format("Total test time: {0}", stopwatch.ElapsedMilliseconds));
Console.ReadKey();
}
private static void RunParallelTests(int noParallelTests, int noParallelTxUpdates)
{
Parallel.For(0, noParallelTests, i => RunTest(noParallelTxUpdates));
}
private static void InitializeDocumentStore()
{
store = new DocumentStore();
store.Url = "http://dev-grg-rvnc1:8080";
store.DefaultDatabase = "CustomerService";
store.Initialize();
WriteDebug(String.Format("Initialized documentstore {0}", store.Url));
}
private static void RunTest(int noParallelTxUpdates)
{
string docId = CreateTestDocument();
using (var msmQueue = new Queue(String.Format("Messages{0}", docId)))
{
RunParallelTransactionalUpdates(docId, msmQueue, noParallelTxUpdates);
bool success = EvaluatePublishedMessages(docId, msmQueue);
WriteDebug(String.Format("{0}: {1}", docId, success ? "Everything ok!" : "Failed!"), !success);
}
}
private static string CreateTestDocument()
{
string docId = Guid.NewGuid().ToString();
//WriteDebug(String.Format("Counter Id: {0}", docId));
using (IDocumentSession session = store.OpenSession())
{
session.Store(new MessageCounter {Id = docId});
session.SaveChanges();
}
return docId;
}
private static void RunParallelTransactionalUpdates(string docId, Queue msmQueue, int noParallelTxUpdates)
{
//WriteDebug("Starting test");
Parallel.For(0, noParallelTxUpdates, i => IncrementMessageCounterAndPublish(docId, msmQueue));
}
private static bool EvaluatePublishedMessages(string docId, Queue queue)
{
using (IDocumentSession session = store.OpenSession())
{
session.Advanced.AllowNonAuthoritativeInformation = false;
var messageCounter = session.Load<MessageCounter>(docId);
WriteDebug(string.Format("{0} Evaluating messages stored in MSMQ ({1}) VS MessageCounter ({2})", docId, queue.MessageCount, messageCounter.PublishedMessages));
return queue.MessageCount == messageCounter.PublishedMessages;
}
}
private static void IncrementMessageCounterAndPublish(string docId, Queue queue)
{
bool success = false;
int sleep = 10;
int sleepIncrement = 10;
while (!success)
{
success = TryIncrementCounterAndPublish(docId, queue);
if (delayRetries)
{
//WriteDebug(string.Format("{0}: Waiting {1}", docId, sleep));
Thread.Sleep(sleep);
sleep += sleepIncrement;
}
}
}
private static bool TryIncrementCounterAndPublish(string docId, Queue queue)
{
try
{
using (var trans = new TransactionScope())
{
try
{
UpdateDocument(docId);
PublishMessage(queue);
trans.Complete();
return true;
}
catch (ConcurrencyException e)
{
//WriteDebug("Concurrency!!!");
return false;
}
}
}
catch (Exception e)
{
//WriteDebug(string.Format("Updating doc {0} caused an {1}: {2}", docId, e.GetType(), e.Message), true);
return false;
}
}
private static void PublishMessage(Queue queue)
{
queue.Publish(String.Format("Message!!!"));
}
private static void UpdateDocument(string id)
{
using (IDocumentSession session = store.OpenSession())
{
session.Advanced.UseOptimisticConcurrency = true;
var messageCounter = session.Load<MessageCounter>(id);
messageCounter.PublishedMessages++;
session.SaveChanges();
}
}
private static void WriteDebug(string message, bool? error = null)
{
lock (lockObj)
{
if (error.HasValue && error.Value)
Console.ForegroundColor = ConsoleColor.Red;
if (error.HasValue && !error.Value)
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine(message);
Console.ResetColor();
Debug.WriteLine(message);
}
}
}
public class MessageCounter
{
public string Id { get; set; }
public int PublishedMessages { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Messaging;
using System.Transactions;
namespace TestRavenConcurrentDTC
{
public class Queue : IDisposable
{
private readonly MessageQueue queue;
public string Path { get; private set; }
public Queue(string queueName)
{
Path = String.Format(@".\PRIVATE$\{0}_{1}", Settings.Default.QueuePrefix, queueName);
queue = MessageQueue.Exists(Path) ? new MessageQueue(Path) : MessageQueue.Create(Path, true);
}
#region IDisposable
private readonly object disposedLock = new object();
private volatile bool disposed;
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
~Queue()
{
Dispose(false);
}
protected virtual void Dispose(bool disposing)
{
if (disposed)
return;
lock (disposedLock)
{
if (disposed)
return;
if (disposing)
{
if (queue != null)
{
//int messageCount = queue.GetAllMessages().Length;
//var logMessage = string.Format("Queue {0} contains {1} messages.", queue.Path, messageCount);
//Console.WriteLine(logMessage);
MessageQueue.Delete(queue.Path);
//Console.WriteLine("Queue deleted");
queue.Dispose();
}
}
disposed = true;
}
}
#endregion
public void Publish(string text)
{
MessageQueueTransactionType transactionType = Transaction.Current != null ? MessageQueueTransactionType.Automatic : MessageQueueTransactionType.Single;
//MessageQueueTransactionType transactionType = MessageQueueTransactionType.Automatic;
var message = new Message(text, new XmlMessageFormatter());
queue.Send(message, transactionType);
}
public int MessageCount
{
get
{
return queue.GetAllMessages().Count();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment