Skip to content

Instantly share code, notes, and snippets.

@plentysmart
Created November 8, 2014 14:23
Show Gist options
  • Save plentysmart/71254e57acea24e23ee6 to your computer and use it in GitHub Desktop.
Save plentysmart/71254e57acea24e23ee6 to your computer and use it in GitHub Desktop.
DocumentDB - Optimistic Concurrency
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;
using Newtonsoft.Json;
using Xunit;
namespace DocumentDB.Concurrency
{
public class DocumentDbConcurrencyTest
{
private string endpointUrl = "https://doctest.documents.azure.com:443/";
private string authorizationKey =
"+6O/+LV/17F+FkkQy4zsUNqHhaKlklhN3nNLweD87bX6/wT6jw3xe66NDXaOePCf8U0GlrVR+qpSLzSNcDRU+w==";
private string databaseId = "test-db";
private string collectionId = "test-collection";
private DocumentClient client;
public DocumentDbConcurrencyTest()
{
this.client = new DocumentClient(new Uri(endpointUrl), authorizationKey);
var db = ReadOrCreateDatabase();
client.DeleteDatabaseAsync(db.SelfLink);
}
[Fact]
public void UpdateDocument_ConcurrentWritesNoPrecondtion_ShouldReplaceDocument()
{
// Create documehnt
var db = ReadOrCreateDatabase();
var collection = ReadOrCreateCollection(db);
var id = "1-id";
var item = new Item() {Id = id, Counter = 0};
var resource = client.CreateDocumentAsync(collection.DocumentsLink, item).Result;
// Read document twice
dynamic persistedItemDocument = client.ReadDocumentAsync(resource.Resource.SelfLink).Result;
dynamic persistedItemDocument2 = client.ReadDocumentAsync(resource.Resource.SelfLink).Result;
Item persistedItem = persistedItemDocument.Resource;
Item persistedItem2 = persistedItemDocument2.Resource;
// Update both
persistedItem.Counter += 1;
persistedItem2.Counter += 1;
// Save both, one with non current Etag
client.ReplaceDocumentAsync(persistedItemDocument.Resource.SelfLink, persistedItem).Wait();
client.ReplaceDocumentAsync(persistedItemDocument2.Resource.SelfLink, persistedItem2).Wait();
// No exception thrown, document replaced
dynamic updateDocument = client.ReadDocumentAsync(resource.Resource.SelfLink).Result;
Item updatedItem = updateDocument.Resource;
Assert.Equal(1, updatedItem.Counter); // Counter has been incremented only once
}
[Fact]
public void UpdateDocument_ConcurrentWrites_ShouldThrowException()
{
var db = ReadOrCreateDatabase();
var collection = ReadOrCreateCollection(db);
var id = "1-id";
var item = new Item() {Id = id, Counter = 0};
var resource = client.CreateDocumentAsync(collection.DocumentsLink, item).Result;
dynamic persistedItemDocument = client.ReadDocumentAsync(resource.Resource.SelfLink).Result;
Item persistedItem = persistedItemDocument.Resource;
dynamic persistedItemDocument2 = client.ReadDocumentAsync(resource.Resource.SelfLink).Result;
Item persistedItem2 = persistedItemDocument2.Resource;
persistedItem.Counter += 1;
persistedItem2.Counter += 1;
var requestOptions = new RequestOptions()
{
AccessCondition = new AccessCondition()
{
Type = AccessConditionType.IfMatch,
Condition = persistedItemDocument.Resource.ETag
}
};
client.ReplaceDocumentAsync(persistedItemDocument.Resource.SelfLink, persistedItem, requestOptions).Wait();
client.ReplaceDocumentAsync(persistedItemDocument2.Resource.SelfLink, persistedItem2, requestOptions).Wait();
}
[Fact]
public void UpdateDocument_ConcurrentWritesInCasLoop_ShouldUpdateDocument()
{
var db = ReadOrCreateDatabase();
var collection = ReadOrCreateCollection(db);
var id = "1-id";
var item = new Item() {Id = id, Counter = 0};
var resource = client.CreateDocumentAsync(collection.DocumentsLink, item).Result;
var selfLink = resource.Resource.SelfLink;
Action incrementCounterAction = () =>
{
while (true)
{
dynamic persistedItemDocument = client.ReadDocumentAsync(selfLink).Result;
Item persistedItem = persistedItemDocument.Resource;
var requestOptions = new RequestOptions()
{
AccessCondition = new AccessCondition()
{
Type = AccessConditionType.IfMatch,
Condition = persistedItemDocument.Resource.ETag
}
};
persistedItem.Counter += 1;
try
{
client.ReplaceDocumentAsync(persistedItemDocument.Resource.SelfLink, persistedItem,
requestOptions).Wait();
Debug.WriteLine("Counter: {0}, Success", persistedItem.Counter);
break;
}
catch (AggregateException ex)
{
var preconditionExcepion = ex.InnerException as DocumentClientException;
if (preconditionExcepion != null)
{
if (preconditionExcepion.StatusCode == HttpStatusCode.PreconditionFailed)
{
Debug.WriteLine("Counter: {0}, ERROR:Document modified", persistedItem.Counter);
}
else
{
throw;
}
}
}
}
};
var taskList = new List<Task>();
var incrementCounter = 5;
for (int i = 0; i < incrementCounter; i++)
{
var task = new Task(incrementCounterAction);
taskList.Add(task);
task.Start();
}
Task.WaitAll(taskList.ToArray());
}
public class Item
{
[JsonProperty(PropertyName = "id")]
public string Id { get; set; }
public int Counter { get; set; }
}
private DocumentCollection ReadOrCreateCollection(Database db)
{
var col = client.CreateDocumentCollectionQuery(db.SelfLink)
.Where(c => c.Id == collectionId)
.AsEnumerable()
.FirstOrDefault();
if (col == null)
{
col =
client.CreateDocumentCollectionAsync(db.SelfLink, new DocumentCollection {Id = collectionId})
.Result;
}
return col;
}
private Database ReadOrCreateDatabase()
{
var db = client.CreateDatabaseQuery()
.Where(d => d.Id == databaseId)
.AsEnumerable()
.FirstOrDefault();
if (db == null)
{
db = client.CreateDatabaseAsync(new Database {Id = databaseId}).Result;
}
return db;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment