Skip to content

Instantly share code, notes, and snippets.

@sharwell
Created December 3, 2013 23:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sharwell/7779874 to your computer and use it in GitHub Desktop.
Save sharwell/7779874 to your computer and use it in GitHub Desktop.
Cloud Queues example using multiple programming strategies
namespace OpenStackNet.Testing.Unit.Providers.Rackspace
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using net.openstack.Core.Domain.Queues;
using net.openstack.Core.Providers;
using net.openstack.Core.Synchronous;
using Newtonsoft.Json;
using Path = System.IO.Path;
/// <summary>
/// This class demonstrates the creation, claiming, renewal, and removal of a message in Cloud Queues
/// for projects targeting the .NET Framework 4.0.
/// </summary>
public class SampleQueues
{
/// <summary>
/// The prefix to use for names of queues created during integration testing.
/// </summary>
public static readonly string TestQueuePrefix = "UnitTestQueue-";
public async Task TestQueueClaimsAsyncAwait(IQueueingService provider, CancellationToken cancellationToken)
{
QueueName queueName = CreateRandomQueueName();
await provider.CreateQueueAsync(queueName, cancellationToken);
await provider.PostMessagesAsync(queueName, cancellationToken, new Message<SampleMetadata>(TimeSpan.FromSeconds(120), new SampleMetadata(3, "yes")));
QueueStatistics statistics;
using (Claim claim = await provider.ClaimMessageAsync(queueName, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(1), cancellationToken))
{
Assert.AreEqual(TimeSpan.FromMinutes(5), claim.TimeToLive);
Assert.IsNotNull(claim.Messages);
Assert.AreEqual(1, claim.Messages.Count);
statistics = await provider.GetQueueStatisticsAsync(queueName, cancellationToken);
Assert.AreEqual(1, statistics.MessageStatistics.Claimed);
QueuedMessage message = await provider.GetMessageAsync(queueName, claim.Messages[0].Id, cancellationToken);
Assert.IsNotNull(message);
TimeSpan age = claim.Age;
await TaskEx.Delay(TimeSpan.FromSeconds(2));
await claim.RefreshAsync(cancellationToken);
Assert.IsTrue(claim.Age >= age + TimeSpan.FromSeconds(2));
await claim.RenewAsync(TimeSpan.FromMinutes(10), cancellationToken);
Assert.AreEqual(TimeSpan.FromMinutes(10), claim.TimeToLive);
}
statistics = await provider.GetQueueStatisticsAsync(queueName, cancellationToken);
Assert.AreEqual(0, statistics.MessageStatistics.Claimed);
await provider.DeleteQueueAsync(queueName, cancellationToken);
}
public Task TestQueueClaimsAsync(IQueueingService provider, CancellationToken cancellationToken)
{
QueueName queueName = CreateRandomQueueName();
Task result = provider.CreateQueueAsync(queueName, cancellationToken);
result = result.ContinueWith(
task =>
{
task.PropagateExceptions();
return provider.PostMessagesAsync(queueName, cancellationToken, new Message<SampleMetadata>(TimeSpan.FromSeconds(120), new SampleMetadata(3, "yes")));
}).Unwrap();
QueueStatistics statistics = null;
Claim claim = null;
result = result.ContinueWith(
task =>
{
task.PropagateExceptions();
return provider.ClaimMessageAsync(queueName, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(1), cancellationToken);
}).Unwrap()
.ContinueWith(
task =>
{
claim = task.Result;
}, TaskContinuationOptions.ExecuteSynchronously);
result = result.ContinueWith(
task =>
{
task.PropagateExceptions();
Assert.AreEqual(TimeSpan.FromMinutes(5), claim.TimeToLive);
Assert.IsNotNull(claim.Messages);
Assert.AreEqual(1, claim.Messages.Count);
return provider.GetQueueStatisticsAsync(queueName, cancellationToken);
}).Unwrap()
.ContinueWith(
task =>
{
statistics = task.Result;
}, TaskContinuationOptions.ExecuteSynchronously);
QueuedMessage message = null;
result = result.ContinueWith(
task =>
{
task.PropagateExceptions();
Assert.AreEqual(1, statistics.MessageStatistics.Claimed);
return provider.GetMessageAsync(queueName, claim.Messages[0].Id, cancellationToken);
}).Unwrap()
.ContinueWith(
task =>
{
message = task.Result;
}, TaskContinuationOptions.ExecuteSynchronously);
TimeSpan age = default(TimeSpan);
result = result.ContinueWith(
task =>
{
task.PropagateExceptions();
Assert.IsNotNull(message);
age = claim.Age;
Thread.Sleep(TimeSpan.FromSeconds(2));
});
result = result.ContinueWith(
task =>
{
task.PropagateExceptions();
return claim.RefreshAsync(cancellationToken);
}).Unwrap();
result = result.ContinueWith(
task =>
{
task.PropagateExceptions();
Assert.IsTrue(claim.Age >= age + TimeSpan.FromSeconds(2));
return claim.RenewAsync(TimeSpan.FromMinutes(10), cancellationToken);
}).Unwrap();
result = result.ContinueWith(
task =>
{
task.PropagateExceptions();
Assert.AreEqual(TimeSpan.FromMinutes(10), claim.TimeToLive);
return claim.DisposeAsync(cancellationToken);
}).Unwrap();
result = result.ContinueWith(
task =>
{
task.PropagateExceptions();
return provider.GetQueueStatisticsAsync(queueName, cancellationToken);
}).Unwrap()
.ContinueWith(
task =>
{
statistics = task.Result;
}, TaskContinuationOptions.ExecuteSynchronously);
result = result.ContinueWith(
task =>
{
task.PropagateExceptions();
Assert.AreEqual(0, statistics.MessageStatistics.Claimed);
return provider.DeleteQueueAsync(queueName, cancellationToken);
}).Unwrap();
return result;
}
public void TestQueueClaimsManualSync(IQueueingService provider, CancellationToken cancellationToken)
{
QueueName queueName = CreateRandomQueueName();
provider.CreateQueueAsync(queueName, cancellationToken).Wait();
cancellationToken.ThrowIfCancellationRequested();
provider.PostMessagesAsync(queueName, cancellationToken, new Message<SampleMetadata>(TimeSpan.FromSeconds(120), new SampleMetadata(3, "yes"))).Wait();
cancellationToken.ThrowIfCancellationRequested();
QueueStatistics statistics;
using (Claim claim = provider.ClaimMessageAsync(queueName, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(1), cancellationToken).Result)
{
cancellationToken.ThrowIfCancellationRequested();
Assert.AreEqual(TimeSpan.FromMinutes(5), claim.TimeToLive);
Assert.IsNotNull(claim.Messages);
Assert.AreEqual(1, claim.Messages.Count);
statistics = provider.GetQueueStatisticsAsync(queueName, cancellationToken).Result;
cancellationToken.ThrowIfCancellationRequested();
Assert.AreEqual(1, statistics.MessageStatistics.Claimed);
QueuedMessage message = provider.GetMessageAsync(queueName, claim.Messages[0].Id, cancellationToken).Result;
cancellationToken.ThrowIfCancellationRequested();
Assert.IsNotNull(message);
TimeSpan age = claim.Age;
Thread.Sleep(2); // you wouldn't want to actually do this, but it shows how the refresh and renew operations work!
claim.RefreshAsync(cancellationToken).Wait();
Assert.IsTrue(claim.Age >= age + TimeSpan.FromSeconds(2));
claim.RenewAsync(TimeSpan.FromMinutes(10), cancellationToken).Wait();
Assert.AreEqual(TimeSpan.FromMinutes(10), claim.TimeToLive);
}
statistics = provider.GetQueueStatisticsAsync(queueName, cancellationToken).Result;
cancellationToken.ThrowIfCancellationRequested();
Assert.AreEqual(0, statistics.MessageStatistics.Claimed);
provider.DeleteQueueAsync(queueName, cancellationToken).Wait();
cancellationToken.ThrowIfCancellationRequested();
}
public void SynchronousTestQueueClaims(IQueueingService provider)
{
QueueName queueName = CreateRandomQueueName();
provider.CreateQueue(queueName);
provider.PostMessages(queueName, new Message<SampleMetadata>(TimeSpan.FromSeconds(120), new SampleMetadata(3, "yes")));
QueueStatistics statistics;
using (Claim claim = provider.ClaimMessage(queueName, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(1)))
{
Assert.AreEqual(TimeSpan.FromMinutes(5), claim.TimeToLive);
Assert.IsNotNull(claim.Messages);
Assert.AreEqual(1, claim.Messages.Count);
statistics = provider.GetQueueStatistics(queueName);
Assert.AreEqual(1, statistics.MessageStatistics.Claimed);
QueuedMessage message = provider.GetMessage(queueName, claim.Messages[0].Id);
Assert.IsNotNull(message);
TimeSpan age = claim.Age;
Thread.Sleep(TimeSpan.FromSeconds(2));
claim.Refresh();
Assert.IsTrue(claim.Age >= age + TimeSpan.FromSeconds(2));
claim.Renew(TimeSpan.FromMinutes(10));
Assert.AreEqual(TimeSpan.FromMinutes(10), claim.TimeToLive);
}
statistics = provider.GetQueueStatistics(queueName);
Assert.AreEqual(0, statistics.MessageStatistics.Claimed);
provider.DeleteQueue(queueName);
}
[JsonObject(MemberSerialization.OptIn)]
private class SampleMetadata
{
public SampleMetadata(int valueA, string valueB)
{
ValueA = valueA;
ValueB = valueB;
}
[JsonProperty("valueA")]
public int ValueA
{
get;
private set;
}
[JsonProperty("valueB")]
public string ValueB
{
get;
private set;
}
}
/// <summary>
/// Creates a random queue name with the proper prefix for integration testing.
/// </summary>
/// <returns>A unique, randomly-generated queue name.</returns>
private QueueName CreateRandomQueueName()
{
return new QueueName(TestQueuePrefix + Path.GetRandomFileName().Replace('.', '_'));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment