Skip to content

Instantly share code, notes, and snippets.

@AzimUddin AzimUddin/BulkImport.js

Last active Sep 18, 2015
Embed
What would you like to do?
An example of DocumentDB performance scale test with .Net SDK
This is a sample program with example for -
1. How to execute a DocumentDB request with Retry to deal with RequestRateTooLarge (HTTP status 429) errors
2. How to measure Request Charge for a request
3. How to test for scalability and throughput for Insert and Read requests
How to Build/debug the code:
===========================
1. Create a C# console application in Visual Studio 2013 (VS 2012 or higher should work)
2. Add DocumentDB .Net SDK Nuget package.
3. Add Student.cs, DocumentDBPerfScaleTest.cs and app.config to your projct
4. Add BulkImport.js (included in this git, copied from https://github.com/Azure/azure-documentdb-net/blob/master/samples/code-samples/ServerSideScripts/JS/BulkImport.js) on a local path and update the app.config "ScriptFilePath" config element.
5. DocumentDBPerfScaleTest.cs contains the Main method
6. Build and debug.
How to Use the sample for tesitng perf/scalability:
=================================================
1. I am using a 'Student' (Student.cs) Document for testing. You can replace this with your own document.
2. The program runs scale test for a specific operation (Insert, BulkInsert, ReadSingle, ReadBatch) for a specific Collection Tier (S1, S2, S3 etc).
Configurations are defined in app.config file. Change the 'OperationType' and 'CollectionTier' configurations for testing various operations for a collection tier.
Sample usage for scale test for Insert:
---------------------------------------
<add key="NumberOfDocuments" value="10000" />
<!-- OperationType configuration. Assumption is, we will do single Operation perf test in a single run of the application.
Accepted values are same as OperationType enums, such as - ReadSingle, ReadBatch, Insert, BulkInsert.
-->
<add key="OperationType" value ="Insert"/>
<!-- Number of threads configurations for various Operations-->
<add key="NumberOfThreadsForInsert" value="8" />
Sample usage for scale test for BulkInsert:
--------------------------------------------
<add key="NumberOfDocuments" value="10000" />
<!-- OperationType configuration. Assumption is, we will do single Operation perf test in a single run of the application.
Accepted values are same as OperationType enums, such as - ReadSingle, ReadBatch, Insert, BulkInsert.
-->
<add key="OperationType" value ="BulkInsert"/>
<!-- Number of threads configurations for various Operations-->
<add key="NumberOfThreadsForBulkLoad" value="4"/>
<!-- Batch size for BulkInsert and Batched Read-->
<add key ="BulkLoadBatchSize" value="250"/>
Sample usage for scale test for ReadSingle:
===========================================
<add key="NumberOfDocuments" value="10000" />
<!-- OperationType configuration. Assumption is, we will do single Operation perf test in a single run of the application.
Accepted values are same as OperationType enums, such as - ReadSingle, ReadBatch, Insert, BulkInsert.
-->
<add key="OperationType" value ="ReadSingle"/>
<!-- Number of threads configurations for various Operations-->
<add key="NumberOfThreadsForReadSingle" value="48" />
Sample usage for scale test for Batched read:
============================================
<add key="NumberOfDocuments" value="10000" />
<!-- OperationType configuration. Assumption is, we will do single Operation perf test in a single run of the application.
Accepted values are same as OperationType enums, such as - ReadSingle, ReadBatch, Insert, BulkInsert.
-->
<add key="OperationType" value ="ReadBatch"/>
<!-- Number of threads configurations for various Operations-->
<add key="NumberOfThreadsForReadMulti" value="8" />
<add key ="ReadBatchSize" value="1000"/>
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="DatabaseId" value="PerfTestDb" />
<add key="CollectionId" value="Students" />
<add key="CollectionTier" value="S3" />
<add key="EndPointUrl" value="https://yourDocDbAcctName.documents.azure.com:443/" />
<add key="AuthorizationKey" value="YourAuthKey" />
<add key="connectionMode" value="Direct"/>
<add key="protocol" value="Tcp"/>
<add key="Seed" value="79736584" />
<add key="NumberOfDocuments" value="10000" />
<add key="StudentName" value="John Doe"/>
<!-- OperationType configuration. Assumption is, we will do single Operation perf test in a single run of the application.
Accepted values are same as OperationType enums, such as - ReadSingle, ReadBatch, Insert, BulkInsert.
-->
<add key="OperationType" value ="ReadBatch"/>
<!-- Number of threads configurations for various Operations-->
<add key="NumberOfThreadsForInsert" value="8" />
<add key="NumberOfThreadsForReadSingle" value="48" />
<add key="NumberOfThreadsForReadMulti" value="4" />
<add key="NumberOfThreadsForBulkLoad" value="4"/>
<!-- Stored procedure script Path-->
<add key ="ScriptFilePath" value="C:\VS_2013_Projects\Supportability\DocDB\DocumentDBPerfScaleTest\DocumentDBPerfScaleTest"/>
<!-- Batch size for BulkInsert and Batched Read-->
<add key ="BulkLoadBatchSize" value="250"/>
<add key ="ReadBatchSize" value="1000"/>
<!-- Debug configurations-->
<add key="DisplayDebugInfo" value="false"/>
<add key="DisplayHttp429Error" value="true"/>
<!-- Console settings for displaying the output -->
<add key ="ConsoleWindowBufferWidth" value="400"/>
<add key ="ConsoleWindowBufferHeight" value="10000"/>
<add key ="ConsoleWindowWidth" value="150"/>
<add key ="ConsoleWindowHeight" value="50"/>
</appSettings>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
</startup>
</configuration>
/* Copied from: https://github.com/Azure/azure-documentdb-net/blob/master/samples/code-samples/ServerSideScripts/JS/BulkImport.js
*/
function bulkImport(docs) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
// The count of imported docs, also used as current doc index.
var count = 0;
// Validate input.
if (!docs) throw new Error("The array is undefined or null.");
var docsLength = docs.length;
if (docsLength == 0) {
getContext().getResponse().setBody(0);
}
// Call the CRUD API to create a document.
tryCreate(docs[count], callback);
// Note that there are 2 exit conditions:
// 1) The createDocument request was not accepted.
// In this case the callback will not be called, we just call setBody and we are done.
// 2) The callback was called docs.length times.
// In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
function tryCreate(doc, callback) {
var isAccepted = collection.createDocument(collectionLink, doc, callback);
// If the request was accepted, callback will be called.
// Otherwise report current count back to the client,
// which will call the script again with remaining set of docs.
// This condition will happen when this stored procedure has been running too long
// and is about to get cancelled by the server. This will allow the calling client
// to resume this batch from the point we got to before isAccepted was set to false
if (!isAccepted) getContext().getResponse().setBody(count);
}
// This is called when collection.createDocument is done and the document has been persisted.
function callback(err, doc, options) {
if (err) throw err;
// One more document has been inserted, increment the count.
count++;
if (count >= docsLength) {
// If we have created all documents, we are done. Just set the response.
getContext().getResponse().setBody(count);
} else {
// Create next document.
tryCreate(docs[count], callback);
}
}
}
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;
using Newtonsoft;
using Newtonsoft.Json;
using System.Reflection;
using System.Security.Cryptography;
using System.Text;
namespace DocumentDBPerfScaleTest
{
class Program
{
//Assign a name for your database
private static readonly string databaseId = ConfigurationManager.AppSettings["DatabaseId"];
private static readonly string collectionId = ConfigurationManager.AppSettings["CollectionId"];
//Read the DocumentDB endpointUrl and authorisationKeys from config
private static readonly string endpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
private static readonly string authorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
private static readonly string collectionTier = ConfigurationManager.AppSettings["CollectionTier"];
private static readonly int Seed = Convert.ToInt32(ConfigurationManager.AppSettings["Seed"]);
private static readonly int NumberOfDocuments = Convert.ToInt32(ConfigurationManager.AppSettings["NumberOfDocuments"]);
private static readonly string studentName = ConfigurationManager.AppSettings["StudentName"];
private static readonly int defaultNumberOfThreads = 8;
private static OperationType opType;
private static DocumentClient client;
private static DocumentCollection StudentsCollection;
private static string colSelfLink;
private static string bulkLoadSprocSelfLink;
private static double totalRequestCharge = 0.0;
private static double totalDurationInSeconds = 0.0;
private static int bulkLoadBatchSize = Convert.ToInt32(ConfigurationManager.AppSettings["BulkLoadBatchSize"]);
private static int readBatchSize = Convert.ToInt32(ConfigurationManager.AppSettings["ReadBatchSize"]);
private static bool displayDebugInfo = Convert.ToBoolean(ConfigurationManager.AppSettings["DisplayDebugInfo"]);
private static bool displayHttp429Error = Convert.ToBoolean(ConfigurationManager.AppSettings["DisplayHttp429Error"]);
private static int consoleWindowBufferWidth = Convert.ToInt32(ConfigurationManager.AppSettings["ConsoleWindowBufferWidth"]);
private static int consoleWindowBufferHeight = Convert.ToInt32(ConfigurationManager.AppSettings["ConsoleWindowBufferHeight"]);
private static int consoleWindowWidth = Convert.ToInt32(ConfigurationManager.AppSettings["ConsoleWindowWidth"]);
private static int consoleWindowHeight = Convert.ToInt32(ConfigurationManager.AppSettings["ConsoleWindowHeight"]);
private static int numberOfThreads = 0;
private static List<List<Student>> allStudentsCopy = null;
private static List<List<OperationReponseInfo>> allOperationResponses = null;
private static Student[] allStudentsOriginal = null;
/// <summary>
/// ReadSingle: Read where query results a single document
/// ReadBatch: Read where query results multiple documents and we read in a batch
/// Insert: Single insert via create API
/// BulkInsert: Bulk insert via stored procedure
/// </summary>
private enum OperationType
{
ReadSingle,
ReadBatch,
Insert,
BulkInsert
};
static void Main(string[] args)
{
try
{
// Initialize, create Database, collection, stored procedure etc
Init();
//Measure Request Charge for a specific Operation
MeasureRequestCharge(opType);
// Run Scale test of various operations - insert, read, bulkinsert and batch read
RunOperationsScaleTest(opType);
//Delete the Collection
Cleanup();
}
catch (DocumentClientException de)
{
HandleDocumentClientException(de);
}
catch (AggregateException ae)
{
// just a simplified example - AggregateException may be DocumentClientException or DocumentServiceQueryException
if (ae.GetBaseException().GetType() == typeof(DocumentClientException))
{
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException;
HandleDocumentClientException(baseException);
}
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
finally
{
Console.WriteLine("End of Test, press any key to exit.");
Console.ReadKey();
}
}
private static async Task InitAsync()
{
// create DocumentDb client and get database, collection, stored procedure etc
if (client == null)
{
// get a client
client = new DocumentClient(new Uri(endpointUrl), authorizationKey,
new ConnectionPolicy
{
ConnectionMode = (ConnectionMode)Enum.Parse(typeof(ConnectionMode), ConfigurationManager.AppSettings["connectionMode"]),
ConnectionProtocol = (Protocol)Enum.Parse(typeof(Protocol), ConfigurationManager.AppSettings["protocol"])
});
// get database
Database database = await GetOrCreateDatabaseAsync(client, databaseId);
// get collection
StudentsCollection = client.CreateDocumentCollectionQuery(database.SelfLink).Where(c => c.Id == collectionId).ToArray().FirstOrDefault();
if (StudentsCollection == null)
{
StudentsCollection = await CreateDocumentCollectionWithRetriesAsync(
client,
database,
new DocumentCollection { Id = collectionId }, collectionTier);
}
colSelfLink = StudentsCollection.SelfLink;
// get or create Stored procedure
StoredProcedure bulkLoadSProc = await GetOrCreateStoredProcedureAsync("BulkImport.js");
bulkLoadSprocSelfLink = bulkLoadSProc.SelfLink;
}
}
private static void Init()
{
// set console buffer size as we are going to write to console for debug logging
Console.SetBufferSize(consoleWindowBufferWidth, consoleWindowBufferHeight);
Console.SetWindowSize(consoleWindowWidth, consoleWindowHeight);
//decide Operation Type
string operationType = ConfigurationManager.AppSettings["OperationType"];
opType = (OperationType)Enum.Parse(typeof(OperationType), operationType);
// client initialization
if (client == null)
{
try
{
InitAsync().Wait();
}
catch (DocumentClientException de)
{
HandleDocumentClientException(de);
}
catch (AggregateException ae)
{
// just a simplified example - AggregateException may be DocumentClientException or DocumentServiceQueryException
if (ae.GetBaseException().GetType() == typeof(DocumentClientException))
{
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException;
HandleDocumentClientException(baseException);
}
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
}
}
private static async Task CleanupAsync()
{
//Database database = await GetNewDatabaseAsync(client, databaseId);
await client.DeleteDocumentCollectionAsync(colSelfLink);
}
private static void Cleanup()
{
try
{
CleanupAsync().Wait();
}
catch (DocumentClientException de)
{
HandleDocumentClientException(de);
}
catch (AggregateException ae)
{
if (ae.GetBaseException().GetType() == typeof(DocumentClientException))
{
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException;
HandleDocumentClientException(baseException);
}
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
}
private static void MeasureRequestCharge( OperationType operation)
{
Console.WriteLine("************** Measuring Request Charge **************************");
var program = new Program();
switch (operation)
{
case OperationType.Insert:
// Do a single single insert and measure Request charge
Student student = Student.CreateStudent(studentName);
program.InsertDocument(student, true);
break;
case OperationType.ReadSingle:
// Do a single read and measure Request Charge
program.ReadSingleDocument(studentName, true);
break;
case OperationType.ReadBatch:
// Do a single batch of Read
int batchSize = readBatchSize;
program.ReadMultiDocuments(studentName, true, batchSize);
break;
case OperationType.BulkInsert:
// Do a single batch of Bulk Load and measure Request charge
program.MeasureRequestChargeForBulkInsert();
break;
default:
break;
}
Console.WriteLine("************** End of Measuring Request Charge **************************");
}
private void MeasureRequestChargeForBulkInsert()
{
// list of StudentMasters in the batch
var StudentsInBatch = new List<Student>();
var numberOfDosInBatch = bulkLoadBatchSize;
for (var i = 0; i < numberOfDosInBatch; i++)
{
Student student = Student.CreateStudent(studentName);
StudentsInBatch.Add(student);
}
// Insert th documents in a batch
BulkInsertDocuments(StudentsInBatch, true);
}
private static void RunOperationsScaleTest( OperationType operation)
{
try
{
var program = new Program();
switch (operation)
{
case OperationType.Insert:
//Run insert test with collection tier specified in config
Console.WriteLine("************* Insert scale test with Collection Tier: {0} **************", collectionTier);
program.RunOperationInScale(OperationType.Insert);
Console.WriteLine("************* End of Insert scale test with Collection Tier: {0} **************", collectionTier);
break;
case OperationType.ReadSingle:
// Run Read test with collection tier specified in config
Console.WriteLine("************ Single document Read scale test with Collection Tier: {0} **************", collectionTier);
program.RunOperationInScale(OperationType.ReadSingle);
Console.WriteLine("************ End of Single document Read scale test with Collection Tier: {0} **************", collectionTier);
break;
case OperationType.ReadBatch:
// Run read batch scale test with collection tier specified in config
Console.WriteLine("************ Batch Read scale test with Collection Tier: {0} ***************", collectionTier);
program.RunOperationInScale(OperationType.ReadBatch);
Console.WriteLine("************ End of Batch Read scale test with Collection Tier: {0} ***************", collectionTier);
break;
case OperationType.BulkInsert:
// Run Bulk insert test with stored procedure
Console.WriteLine("************ Bulk insert scale test with Collection Tier: {0} *************", collectionTier);
program.RunOperationInScale(OperationType.BulkInsert);
Console.WriteLine("************ End of Bulk insert scale test with Collection Tier: {0} **************", collectionTier);
break;
default:
break;
}
}
catch (DocumentClientException de)
{
HandleDocumentClientException(de);
}
catch (AggregateException ae)
{
// just a simplified example - AggregateException may be DocumentClientException or DocumentServiceQueryException
if (ae.GetBaseException().GetType() == typeof(DocumentClientException))
{
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException;
HandleDocumentClientException(baseException);
}
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
finally
{
Console.WriteLine("Done with DocumentDB Scale Test");
}
}
private static void HandleDocumentClientException(DocumentClientException de)
{
// We can take custom actions (like Retry etc) based on the information obtained from DocumentClientException object
Console.WriteLine("\nActivityID: {0}", de.ActivityId);
Console.WriteLine("\nError: {0}", de.Error);
Console.WriteLine("\nHTTP Status Code: {0}", de.StatusCode.ToString());
Console.WriteLine("\nRetryAfter: {0}", de.RetryAfter.TotalSeconds.ToString());
Console.WriteLine("\nStack Trace: {0}", de.StackTrace);
Console.WriteLine("\nResponse Headers:\n ");
if (de.ResponseHeaders != null && de.ResponseHeaders.Count > 0)
{
foreach (string key in de.ResponseHeaders)
{
Console.WriteLine("{0} {1}", key, de.ResponseHeaders[key]);
}
}
Exception baseException = de.GetBaseException();
Console.WriteLine("Message: {0} \n {1}", de.Message, baseException.Message);
}
/// <summary>
/// Create a new database if not exists. If exists, get the database
/// </summary>
/// <param name="client">The DocumentDB client instance.</param>
/// <param name="id">The id of the Database to search for, or create.</param>
/// <returns>The created Database object</returns>
public static async Task<Database> GetOrCreateDatabaseAsync(DocumentClient client, string id)
{
Database database = client.CreateDatabaseQuery().Where(db => db.Id == id).ToArray().FirstOrDefault();
if (database == null)
{
database = await client.CreateDatabaseAsync(new Database { Id = id });
}
return database;
}
/// <summary>
/// Create a DocumentCollection, and retry if throttled.
/// </summary>
/// <param name="client">The DocumentDB client instance.</param>
/// <param name="database">The database to use.</param>
/// <param name="collectionDefinition">The collection definition to use.</param>
/// <param name="offerType">The offer type for the collection.</param>
/// <returns>The created DocumentCollection.</returns>
private static async Task<DocumentCollection> CreateDocumentCollectionWithRetriesAsync(
DocumentClient client,
Database database,
DocumentCollection collectionDefinition,
string offerType = "S1")
{
return await ExecuteWithRetries(client, () => client.CreateDocumentCollectionAsync(
database.SelfLink,
collectionDefinition,
new RequestOptions { OfferType = offerType }), displayHttp429Error);
}
/// <summary>
/// Create a new StoredProcedure for performance benchmarking.
/// </summary>
/// <param name="client">The DocumentDB client instance.</param>
/// <param name="collection">The collection to create the StoredProcedure in.</param>
/// <returns>The created StoredProcedure object</returns>
private static async Task<StoredProcedure> GetOrCreateStoredProcedureAsync(string scriptName)
{
//Use script name (minus file extension) for the procedure id
string procedureId = Path.GetFileNameWithoutExtension(scriptName);
StoredProcedure storedProcedure = client.CreateStoredProcedureQuery(colSelfLink).Where(s => s.Id == procedureId).AsEnumerable().FirstOrDefault();
if (storedProcedure == null)
{
//Stored procedure doesn't exist. Create it ...
string scriptFileFullPath = string.Format(@"{0}\{1}", ConfigurationManager.AppSettings["ScriptFilePath"], scriptName);
string contents = File.ReadAllText(scriptFileFullPath);
storedProcedure = new StoredProcedure
{
Id = procedureId,
Body = contents
};
storedProcedure = await ExecuteWithRetries(
client,
() => client.CreateStoredProcedureAsync(colSelfLink, storedProcedure), displayHttp429Error);
}
return storedProcedure;
}
/// <summary>
/// Execute the function with retries on throttle.
/// </summary>
/// <typeparam name="V"></typeparam>
/// <param name="client"></param>
/// <param name="function"></param>
/// <param name="showDebugInfo"></param>
/// <param name="threadNumber"></param>
/// <param name="iterationNumber"></param>
/// <returns></returns>
private static async Task<V> ExecuteWithRetries<V>(DocumentClient client, Func<Task<V>> function, bool showHttp429Error, int threadNumber, int iterationNumber)
{
TimeSpan sleepTime = TimeSpan.Zero;
while (true)
{
try
{
return await function();
}
catch (DocumentClientException de)
{
if ((int)de.StatusCode != 429)
{
throw;
}
if (true == showHttp429Error)
{
Console.WriteLine("{0}\t{1} in Thread:{2} Iteration:{3} resulted a HTTP 429", DateTime.UtcNow, function.Method.Name, threadNumber, iterationNumber);
}
sleepTime = de.RetryAfter;
}
catch (AggregateException ae)
{
if (!(ae.InnerException is DocumentClientException))
{
throw;
}
DocumentClientException de = (DocumentClientException)ae.InnerException;
if ((int)de.StatusCode != 429)
{
throw;
}
if (true == showHttp429Error)
{
Console.WriteLine("{0}\t{1} in Thread:{2} Iteration:{3} resulted a HTTP 429", DateTime.UtcNow, function.Method.Name, threadNumber, iterationNumber);
}
sleepTime = de.RetryAfter;
}
await Task.Delay(sleepTime);
}
}
/// <summary>
/// Execute the function with retries on throttle
/// </summary>
/// <typeparam name="V"></typeparam>
/// <param name="client"></param>
/// <param name="function"></param>
/// <param name="showDebugInfo"></param>
/// <param name="threadNumber"></param>
/// <param name="iterationNumber"></param>
/// <returns></returns>
private static async Task<V> ExecuteWithRetries<V>(DocumentClient client, Func<Task<V>> function, bool showHttp429Error)
{
TimeSpan sleepTime = TimeSpan.Zero;
while (true)
{
try
{
return await function();
}
catch (DocumentClientException de)
{
if ((int)de.StatusCode != 429)
{
throw;
}
if (true == showHttp429Error)
{
Console.WriteLine("{0}\t{1} resulted a HTTP 429", DateTime.UtcNow, function.Method.Name);
}
sleepTime = de.RetryAfter;
}
catch (AggregateException ae)
{
if (!(ae.InnerException is DocumentClientException))
{
throw;
}
DocumentClientException de = (DocumentClientException)ae.InnerException;
if ((int)de.StatusCode != 429)
{
throw;
}
if (true == showHttp429Error)
{
Console.WriteLine("{0}\t{1} resulted a HTTP 429", DateTime.UtcNow, function.Method.Name);
}
sleepTime = de.RetryAfter;
}
await Task.Delay(sleepTime);
}
}
private Student[] CreateOrGetListOfStudents()
{
if (null == allStudentsOriginal)
{
allStudentsOriginal = new Student[NumberOfDocuments];
for (int i = 0; i < NumberOfDocuments; i++)
{
var student = Student.CreateStudent(studentName);
allStudentsOriginal[i] = student;
}
}
return allStudentsOriginal;
}
private void AssignListOfStudentsToThreads(OperationType operation)
{
Student[] allStudentsOrig = CreateOrGetListOfStudents();
allStudentsCopy = new List<List<Student>>();
var remainingDocuments = NumberOfDocuments;
var remainingThreads = numberOfThreads;
int start = 0;
while (remainingDocuments > 0)
{
var numberOfDocumentsPerThread = Convert.ToInt32(Math.Round((double)remainingDocuments / remainingThreads, MidpointRounding.AwayFromZero));
int end = start + numberOfDocumentsPerThread;
var StudentsForEachThread = new List<Student>();
for (var i = start; i < end; i++)
{
StudentsForEachThread.Add(allStudentsOrig[i]);
}
allStudentsCopy.Add(StudentsForEachThread);
remainingDocuments -= numberOfDocumentsPerThread;
remainingThreads--;
start = end;
}
}
private void CreateResponseInfoListForEachThread(int NumberOfThreads)
{
//create a
allOperationResponses = new List<List<OperationReponseInfo>>();
for (int i = 0; i < NumberOfThreads; i++)
{
var operationResponeListPerThread = new List<OperationReponseInfo>();
allOperationResponses.Add(operationResponeListPerThread);
}
}
private void RunOperationInScale(OperationType operation)
{
totalDurationInSeconds = 0;
// determine number of Threads to use
switch(operation)
{
case OperationType.Insert:
numberOfThreads = Convert.ToInt32(ConfigurationManager.AppSettings["NumberOfThreadsForInsert"]);
break;
case OperationType.ReadSingle:
numberOfThreads = Convert.ToInt32(ConfigurationManager.AppSettings["NumberOfThreadsForReadSingle"]);
break;
case OperationType.ReadBatch:
numberOfThreads = Convert.ToInt32(ConfigurationManager.AppSettings["NumberOfThreadsForReadMulti"]);
break;
case OperationType.BulkInsert:
numberOfThreads = Convert.ToInt32(ConfigurationManager.AppSettings["NumberOfThreadsForBulkLoad"]);
break;
default:
numberOfThreads = defaultNumberOfThreads;
break;
}
// Create a list of Students and assign them into multiple threads
// This is more relevant for inserts, because we will insert these Student Documents
// For Read, List of documents for each thread just tells us how many reads per Thread - for simplicity, we will use same query on every read
AssignListOfStudentsToThreads(operation);
//Create Operation ResponseInfo List for each Thread
// ResponseInfo tracks Response counters (we will use for summary) for every request
CreateResponseInfoListForEachThread(numberOfThreads);
var threads = new Thread[numberOfThreads];
// start a stopwatch
Stopwatch totalDurationClock = new Stopwatch();
totalDurationClock.Start();
for (var i = 0; i < numberOfThreads; i++)
{
//allOperationResponses.Add(new List<OperationReponseInfo>());
Thread newThread = null;
switch (operation)
{
case OperationType.Insert:
{
newThread = new Thread(DoInsertWork);
threads[i] = newThread;
threads[i].Start(new DoInsertWorkArgs() { Students = allStudentsCopy[i], ShowDebugInfo = displayDebugInfo, Responses = allOperationResponses[i], ThreadNumber = i + 1 });
break;
}
case OperationType.ReadSingle:
{
newThread = new Thread(DoSingleReadWork);
threads[i] = newThread;
int readsPerThread = allStudentsCopy[i].Count;
threads[i].Start(new DoSingleReadWorkArgs() { NumberOfReadsPerThread = readsPerThread, ShowDebugInfo = displayDebugInfo, Responses = allOperationResponses[i], ThreadNumber = i + 1 });
break;
}
case OperationType.BulkInsert:
{
newThread = new Thread(DoBulkInsertWork);
threads[i] = newThread;
threads[i].Start(new DoBulkInsertWorkArgs() { Students = allStudentsCopy[i], ShowDebugInfo = displayDebugInfo, Responses = allOperationResponses[i], ThreadNumber = i + 1 });
break;
}
case OperationType.ReadBatch:
{
newThread = new Thread(DoMultiReadWork);
threads[i] = newThread;
int readsPerThread = allStudentsCopy[i].Count;
threads[i].Start(new DoMultiReadWorkArgs { NumberOfReadsPerThread = readsPerThread, ShowDebugInfo = displayDebugInfo, Responses = allOperationResponses[i], ThreadNumber = i + 1 });
break;
}
default:
break;
}
}
for (var i = 0; i < numberOfThreads; i++)
{
threads[i].Join();
}
// measure the total duration of the test
totalDurationClock.Stop();
long totalElapsedMilliseconds = totalDurationClock.ElapsedMilliseconds;
totalDurationInSeconds = (double)totalElapsedMilliseconds / 1000;
// Show Run Results summary
ShowRunResults(operation);
}
private void ShowRunResults(OperationType operation)
{
// Initialize totalRequestCharge and totalDurationInSeconds
totalRequestCharge = 0;
//start adding all Request Charges and Elapsed time for all operations from all threads
long timeElapsedinMilliseconds = 0;
for (var i = 0; i < allOperationResponses.Count; i++)
{
List<OperationReponseInfo> opResponseListPerThread = allOperationResponses[i];
for (var j = 0; j < opResponseListPerThread.Count; j++)
{
OperationReponseInfo opResponse = opResponseListPerThread[j];
timeElapsedinMilliseconds += opResponse.ElapsedMilliseconds;
totalRequestCharge += opResponse.OpRequestCharge;
}
}
Console.WriteLine("+++++++++++++++++++++++++++++++");
Console.WriteLine("{0}\t Result for Collection: {1} in Tier: {2}", DateTime.UtcNow, StudentsCollection.Id, collectionTier);
Console.WriteLine(" Number of Threads used: {0}", numberOfThreads);
if (operation == OperationType.BulkInsert)
{
Console.WriteLine("Bulk Insert Batch Size = {0}", bulkLoadBatchSize);
}
else if (operation == OperationType.ReadBatch)
{
Console.WriteLine("Read Batch Size = {0}", readBatchSize);
}
//
string operationDone = "";
if (operation == OperationType.BulkInsert || operation == OperationType.Insert)
{
operationDone = "Inserted";
}
else if (operation == OperationType.ReadBatch || operation == OperationType.ReadSingle)
{
operationDone = "Read";
}
double operationsPerSecond = 0.0;
double RequestUnits = 0.0;
Console.WriteLine("++++++++++++++++++++++++++++++++");
Console.WriteLine("Result Summary (Total Duration measures the total time spent on the test for all threads):");
Console.WriteLine("{0} Documents {1} in {2} seconds", NumberOfDocuments, operationDone, totalDurationInSeconds);
operationsPerSecond = (double)NumberOfDocuments / totalDurationInSeconds;
RequestUnits = totalRequestCharge / totalDurationInSeconds;
Console.WriteLine(" Operations per Second: {0}", operationsPerSecond);
Console.WriteLine("Average Request Units: {0} per second", RequestUnits);
}
private async Task<int> BulkInsertDocumentsAsync(List<Student> Students, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int iterationNumber)
{
// Keep track of the number of documents loaded
int documentsLoaded = 0;
// Save original batch size in case complete batch doesn't get loaded
int batchSize = Students.Count;
do
{
Stopwatch clock = new Stopwatch();
clock.Start();
StoredProcedureResponse<int> scriptResult = await ExecuteWithRetries(client, () => client.ExecuteStoredProcedureAsync<int>(bulkLoadSprocSelfLink, Students), displayHttp429Error, threadNumber, iterationNumber);
clock.Stop();
int countOfLoadedDocs = scriptResult.Response;
double lastRequestCharge = scriptResult.RequestCharge;
if (opsRespInfo != null)
{
opsRespInfo.ElapsedMilliseconds = clock.ElapsedMilliseconds;
opsRespInfo.OpRequestCharge = lastRequestCharge;
}
//Keep track of the number of documents we've loaded so far
documentsLoaded += countOfLoadedDocs;
//Check to see if the entire batch was loaded.
if (countOfLoadedDocs < Students.Count)
{
//Remove elements from the Students List<Student> that have already been loaded before we re-send the documents that weren't processed ...
Students.RemoveRange(0, countOfLoadedDocs);
}
//for debug
if (true == showDebugInfo)
{
Console.WriteLine("{0}\tBulk Insert of {1} documents by Thread:{2} and Iteration:{3}, # of RUs: {4}", DateTime.UtcNow, countOfLoadedDocs, threadNumber, iterationNumber, lastRequestCharge);
}
} while (documentsLoaded < batchSize);
//Return count of documents loaded
return documentsLoaded;
}
private async Task<int> BulkInsertDocumentsAsync(List<Student> Students, bool showDebugInfo)
{
// Keep track of the number of documents loaded
int documentsLoaded = 0;
// Save original batch size in case complete batch doesn't get loaded
int batchSize = Students.Count;
do
{
Stopwatch clock = new Stopwatch();
clock.Start();
StoredProcedureResponse<int> scriptResult = await ExecuteWithRetries(client, () => client.ExecuteStoredProcedureAsync<int>(bulkLoadSprocSelfLink, Students), displayHttp429Error);
clock.Stop();
int countOfLoadedDocs = scriptResult.Response;
double lastRequestCharge = scriptResult.RequestCharge;
//Keep track of the number of documents we've loaded so far
documentsLoaded += countOfLoadedDocs;
//Check to see if the entire batch was loaded.
if (countOfLoadedDocs < Students.Count)
{
//Remove elements from the Students List<Student> that have already been loaded before we re-send the documents that weren't processed ...
Students.RemoveRange(0, countOfLoadedDocs);
}
//for debug
if (true == showDebugInfo)
{
// since we already increased totalOperationsCount, iteration# will be totalOperationsCount, and NOT (totalOperationsCount+1)
Console.WriteLine("{0}\tBulk Insert of {1} documents, # of RUs: {2}", DateTime.UtcNow, countOfLoadedDocs, lastRequestCharge);
}
} while (documentsLoaded < batchSize);
//Return count of documents loaded
return documentsLoaded;
}
private void BulkInsertDocuments(List<Student> Students, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int iterationNumber)
{
try
{
BulkInsertDocumentsAsync(Students, showDebugInfo, opsRespInfo, threadNumber, iterationNumber).Wait();
}
catch (DocumentClientException de)
{
HandleDocumentClientException(de);
}
catch (AggregateException ae)
{
if (ae.GetBaseException().GetType() == typeof(DocumentClientException))
{
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException;
HandleDocumentClientException(baseException);
}
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
}
private void BulkInsertDocuments(List<Student> Students, bool showDebugInfo)
{
try
{
BulkInsertDocumentsAsync(Students, showDebugInfo).Wait();
}
catch (DocumentClientException de)
{
HandleDocumentClientException(de);
}
catch (AggregateException ae)
{
if (ae.GetBaseException().GetType() == typeof(DocumentClientException))
{
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException;
HandleDocumentClientException(baseException);
}
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
}
public class DoBulkInsertWorkArgs
{
public List<Student> Students { get; set; }
public bool ShowDebugInfo { get; set; }
public List<OperationReponseInfo> Responses { get; set; }
public int ThreadNumber { get; set; }
}
public void DoBulkInsertWork(object data)
{
var args = (DoBulkInsertWorkArgs)data;
List<Student> StudentsPerThread = args.Students;
var opsResponseInfoListPerThread = args.Responses;
var threadNumber = args.ThreadNumber;
List<Student> StudentsPerBatch = null;
int batchStart = 0;
int numStudentsPerThread = args.Students.Count;
int remainingStudents = numStudentsPerThread;
var effectiveBatchSize = 0;
var iterationCount = 0;
while (remainingStudents > 0)
{
// create a list of Students to be bulkloaded in a single batch
StudentsPerBatch = new List<Student>();
// Effective batchSize will be least of remainingStudents and bulkLoadBatchSize
effectiveBatchSize = (remainingStudents <= bulkLoadBatchSize) ? remainingStudents : bulkLoadBatchSize;
int batchEnd = batchStart + effectiveBatchSize;
for (int i = batchStart; i < batchEnd; i++)
{
StudentsPerBatch.Add(StudentsPerThread[i]);
}
//Create a new OperationReponseInfo
var opsResponseInfo = new OperationReponseInfo();
opsResponseInfoListPerThread.Add(opsResponseInfo);
++iterationCount;
// bulkload docs equal to Effective batchSize
BulkInsertDocuments(StudentsPerBatch, args.ShowDebugInfo, opsResponseInfo, threadNumber, iterationCount);
remainingStudents -= effectiveBatchSize;
batchStart = batchEnd;
}
}
/// <summary>
/// Async method for inserting a document, with retries, when insert is being done on multiple threads and we would like to keep track of Request charge
/// and Elapsed time on each thread and iteration
/// </summary>
/// <param name="student"></param>
/// <param name="showDebugInfo"></param>
/// <param name="opsRespInfo"></param>
/// <param name="threadNumber"></param>
/// <param name="iterationNumber"></param>
/// <returns></returns>
private async Task InsertDocumentAsync(Student student, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int iterationNumber)
{
Stopwatch clock = new Stopwatch();
clock.Start();
ResourceResponse<Document> response = await ExecuteWithRetries(client, () => client.CreateDocumentAsync(colSelfLink, student), displayHttp429Error, threadNumber, iterationNumber);
clock.Stop();
if (opsRespInfo != null)
{
opsRespInfo.ElapsedMilliseconds = clock.ElapsedMilliseconds;
opsRespInfo.OpRequestCharge = response.RequestCharge;
}
if (true == showDebugInfo)
{
Console.WriteLine("{0}\tInsert by Thread:{1} and Iteration:{2}, # of RUs: {3}, Elapsed time: {4} ms", DateTime.UtcNow, threadNumber, iterationNumber, response.RequestCharge, clock.ElapsedMilliseconds);
}
}
/// <summary>
/// Async method for inserting a single document, with retries, on a single thread
/// </summary>
/// <param name="student"></param>
/// <param name="showDebugInfo"></param>
/// <returns></returns>
private async Task InsertDocumentAsync(Student student, bool showDebugInfo)
{
ResourceResponse<Document> response = await ExecuteWithRetries(client, () => client.CreateDocumentAsync(colSelfLink, student), displayHttp429Error);
if (true == showDebugInfo)
{
Console.WriteLine("{0}\tInsert Operation, # of RUs: {1}", DateTime.UtcNow, response.RequestCharge);
}
}
private void InsertDocument(Student student, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int iterationNumber)
{
try
{
InsertDocumentAsync(student, showDebugInfo, opsRespInfo, threadNumber, iterationNumber).Wait();
}
catch (DocumentClientException de)
{
HandleDocumentClientException(de);
}
catch (AggregateException ae)
{
if (ae.GetBaseException().GetType() == typeof(DocumentClientException))
{
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException;
HandleDocumentClientException(baseException);
}
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
//return responseDetails;
}
private void InsertDocument(Student student, bool showDebugInfo)
{
try
{
InsertDocumentAsync(student, showDebugInfo).Wait();
}
catch (DocumentClientException de)
{
HandleDocumentClientException(de);
}
catch (AggregateException ae)
{
if (ae.GetBaseException().GetType() == typeof(DocumentClientException))
{
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException;
HandleDocumentClientException(baseException);
}
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
//return responseDetails;
}
public class DoInsertWorkArgs
{
public List<Student> Students { get; set; }
public bool ShowDebugInfo { get; set; }
public List<OperationReponseInfo> Responses { get; set; }
public int ThreadNumber { get; set; }
}
public void DoInsertWork(object data)
{
var args = (DoInsertWorkArgs)data;
var opsResponseInfoListPerThread = args.Responses;
var threadNumber = args.ThreadNumber;
var iterationCount = 0;
foreach (var student in args.Students)
{
//Create a new OperationReponseInfo
var opsResponseInfo = new OperationReponseInfo();
opsResponseInfoListPerThread.Add(opsResponseInfo);
++iterationCount;
InsertDocument(student, args.ShowDebugInfo, opsResponseInfo, threadNumber, iterationCount);
}
}
public class DoSingleReadWorkArgs
{
public int NumberOfReadsPerThread { get; set; }
public bool ShowDebugInfo { get; set; }
public List<OperationReponseInfo> Responses { get; set; }
public int ThreadNumber { get; set; }
}
public void DoSingleReadWork(object data)
{
var args = (DoSingleReadWorkArgs)data;
var opsResponseInfoListPerThread = args.Responses;
var threadNumber = args.ThreadNumber;
var readsPerThread = args.NumberOfReadsPerThread;
var iterationCount = 0;
//string studentName = "John Doe";
while (iterationCount < readsPerThread)
{
//Create a new OperationReponseInfo
var opsResponseInfo = new OperationReponseInfo();
opsResponseInfoListPerThread.Add(opsResponseInfo);
++iterationCount;
ReadSingleDocument(studentName, args.ShowDebugInfo, opsResponseInfo, threadNumber, iterationCount);
}
}
private async Task<FeedResponse<Student>> QuerySingleDocumentAsync(IDocumentQuery<dynamic> query)
{
return await query.ExecuteNextAsync<Student>();
}
private async Task<Student> ReadSingleDocumentAsync(string studentName, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int iterationNumber)
{
//tell server we only want 1 record
FeedOptions options = new FeedOptions { MaxItemCount = 1 };
string queryString = "SELECT * FROM Students s WHERE s.Name = '" + studentName + "'";
IDocumentQuery<dynamic> query = client.CreateDocumentQuery(colSelfLink, queryString, options).AsDocumentQuery();
// start timer and measure time for running a query, including retries
Stopwatch clock = new Stopwatch();
clock.Start();
FeedResponse<Student> queryResponse = await ExecuteWithRetries(client, () => QuerySingleDocumentAsync(query), displayHttp429Error, threadNumber, iterationNumber);
clock.Stop();
// populate OperationReponseInfo counters that will use later for Summary
if (opsRespInfo != null)
{
opsRespInfo.ElapsedMilliseconds = clock.ElapsedMilliseconds;
opsRespInfo.OpRequestCharge = queryResponse.RequestCharge;
opsRespInfo.NumberOfDocumentsRead = queryResponse.Count;
}
// for debug
if (true == showDebugInfo)
{
Console.WriteLine("{0}\tSingle Document Read by Thread:{1} and Iteration:{2}, # of RUs: {3}", DateTime.UtcNow, threadNumber, iterationNumber, queryResponse.RequestCharge);
}
Student student = queryResponse.AsEnumerable().FirstOrDefault();
return student;
}
private async Task<Student> ReadSingleDocumentAsync(string studentName, bool showDebugInfo)
{
//tell server we only want 1 record
FeedOptions options = new FeedOptions { MaxItemCount = 1 };
string queryString = "SELECT * FROM Students s WHERE s.Name = '" + studentName + "'";
IDocumentQuery<dynamic> query = client.CreateDocumentQuery(colSelfLink, queryString, options).AsDocumentQuery();
// start timer and measure time for running a query, including retries
Stopwatch clock = new Stopwatch();
clock.Start();
FeedResponse<Student> queryResponse = await ExecuteWithRetries(client, () => QuerySingleDocumentAsync(query), displayHttp429Error);
clock.Stop();
// for debug
if (true == showDebugInfo)
{
Console.WriteLine("{0}\tSingle Document Read, # of RUs: {1}", DateTime.UtcNow, queryResponse.RequestCharge);
}
Student student = queryResponse.AsEnumerable().FirstOrDefault();
return student;
}
private void ReadSingleDocument(string studentName, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int iterationNumber)
{
try
{
// Async query
ReadSingleDocumentAsync(studentName, showDebugInfo, opsRespInfo, threadNumber, iterationNumber).Wait();
}
catch (DocumentClientException de)
{
HandleDocumentClientException(de);
}
catch (AggregateException ae)
{
if (ae.GetBaseException().GetType() == typeof(DocumentClientException))
{
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException;
HandleDocumentClientException(baseException);
}
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
}
private void ReadSingleDocument(string studentName, bool showDebugInfo)
{
try
{
// Async query
ReadSingleDocumentAsync(studentName, showDebugInfo).Wait();
}
catch (DocumentClientException de)
{
HandleDocumentClientException(de);
}
catch (AggregateException ae)
{
if (ae.GetBaseException().GetType() == typeof(DocumentClientException))
{
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException;
HandleDocumentClientException(baseException);
}
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
}
public class DoMultiReadWorkArgs
{
public int NumberOfReadsPerThread { get; set; }
public bool ShowDebugInfo { get; set; }
public List<OperationReponseInfo> Responses { get; set; }
public int ThreadNumber { get; set; }
}
public void DoMultiReadWork(object data)
{
var args = (DoMultiReadWorkArgs)data;
var opsResponseInfoListPerThread = args.Responses;
var threadNumber = args.ThreadNumber;
var readsPerThread = args.NumberOfReadsPerThread;
var remainingDocsToRead = readsPerThread;
var effectiveBatchSize = 0;
//string studentName = "John Doe";
while ( remainingDocsToRead > 0)
{
//Create a new OperationReponseInfo
var opsResponseInfo = new OperationReponseInfo();
opsResponseInfoListPerThread.Add(opsResponseInfo);
// Effective batchSize will be least of remainingStudents and bulkLoadBatchSize
effectiveBatchSize = (remainingDocsToRead <= readBatchSize) ? remainingDocsToRead : readBatchSize;
// read a single batch
ReadMultiDocuments(studentName, args.ShowDebugInfo, opsResponseInfo, threadNumber, effectiveBatchSize);
remainingDocsToRead -= effectiveBatchSize;
}
}
private async Task<List<Student>> ReadMultiDocumentAsync(string studentName, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int batchSize)
{
// Fetch records per batchSize
FeedOptions options = new FeedOptions { MaxItemCount = batchSize };
string queryString = "SELECT * FROM Students s WHERE s.Name = '" + studentName + "'";
IDocumentQuery<dynamic> query = client.CreateDocumentQuery(colSelfLink, queryString, options).AsDocumentQuery();
FeedResponse<Student> queryResponse = null;
List<Student> studentsRetrieved = new List<Student>();
Stopwatch clock = new Stopwatch();
var iterationNumber = 0;
while ( true == query.HasMoreResults)
{
iterationNumber++;
// start timer and measure time for running a query, including retries
clock.Start();
queryResponse = await ExecuteWithRetries(client, () => QuerySingleDocumentAsync(query), displayHttp429Error, threadNumber, iterationNumber);
clock.Stop();
// for debug
if (true == showDebugInfo)
{
Console.WriteLine("{0}\t {1} Documents Read by Thread:{2} in {3} ms, # of RUs: {4}", DateTime.UtcNow, queryResponse.Count, threadNumber, clock.ElapsedMilliseconds, queryResponse.RequestCharge);
}
// populate OperationReponseInfo counters that will use later for Summary
if (opsRespInfo != null)
{
opsRespInfo.ElapsedMilliseconds += clock.ElapsedMilliseconds;
opsRespInfo.OpRequestCharge += queryResponse.RequestCharge;
opsRespInfo.NumberOfDocumentsRead += queryResponse.Count;
}
//add to retrieved Students List
List<Student> students = queryResponse.AsEnumerable() as List<Student>;
studentsRetrieved.AddRange(students);
}
return studentsRetrieved;
}
private async Task<List<Student>> ReadMultiDocumentAsync(string studentName, bool showDebugInfo, int batchSize)
{
// Fetch records per batchSize
FeedOptions options = new FeedOptions { MaxItemCount = batchSize };
string queryString = "SELECT * FROM Students s WHERE s.Name = '" + studentName + "'";
IDocumentQuery<dynamic> query = client.CreateDocumentQuery(colSelfLink, queryString, options).AsDocumentQuery();
FeedResponse<Student> queryResponse = null;
List<Student> studentsRetrieved = new List<Student>();
Stopwatch clock = new Stopwatch();
while (true == query.HasMoreResults)
{
// start timer and measure time for running a query, including retries
clock.Start();
queryResponse = await ExecuteWithRetries(client, () => QuerySingleDocumentAsync(query), displayHttp429Error);
clock.Stop();
// for debug
if (true == showDebugInfo)
{
Console.WriteLine("{0}\t {1} Documents Read in {2} ms, # of RUs: {3}", DateTime.UtcNow, queryResponse.Count, clock.ElapsedMilliseconds, queryResponse.RequestCharge);
}
//add to retrieved Students List
List<Student> students = queryResponse.AsEnumerable() as List<Student>;
studentsRetrieved.AddRange(students);
}
return studentsRetrieved;
}
private void ReadMultiDocuments(string studentName, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int batchSize)
{
try
{
// Async query
ReadMultiDocumentAsync(studentName, showDebugInfo, opsRespInfo, threadNumber, batchSize).Wait();
}
catch (DocumentClientException de)
{
HandleDocumentClientException(de);
}
catch (AggregateException ae)
{
if (ae.GetBaseException().GetType() == typeof(DocumentClientException))
{
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException;
HandleDocumentClientException(baseException);
}
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
}
private void ReadMultiDocuments(string studentName, bool showDebugInfo, int batchSize)
{
try
{
// Async query
ReadMultiDocumentAsync(studentName, showDebugInfo, batchSize).Wait();
}
catch (DocumentClientException de)
{
HandleDocumentClientException(de);
}
catch (AggregateException ae)
{
if (ae.GetBaseException().GetType() == typeof(DocumentClientException))
{
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException;
HandleDocumentClientException(baseException);
}
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
}
}
public class OperationReponseInfo
{
public List<Student> ListOfStudents { get; set; }
public int NumberOfDocumentsRead { get; set; }
public int NumberOfDocumentsInserted { get; set; }
public double OpRequestCharge { get; set; }
public long ElapsedMilliseconds { get; set; }
public OperationReponseInfo()
{
ListOfStudents = null;
NumberOfDocumentsRead = 0;
NumberOfDocumentsInserted = 0;
OpRequestCharge = 0.0;
ElapsedMilliseconds = 0;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
namespace DocumentDBPerfScaleTest
{
public class Student
{
private static readonly int Seed = 79736584;
public Guid StudentId { get; set; }
public string Name { get; set; }
public IList<StudentScore> Scores { get; set; }
public string Department { get; set; }
public int Year { get; set; }
public int Semester { get; set; }
public static Student CreateStudent( string StudentName)
{
return new Student()
{
StudentId = System.Guid.NewGuid(),
Name = StudentName,
Department = "Language",
Year = 1,
Semester = 2,
Scores = new StudentScore[]
{
new StudentScore()
{
Type = "Exam", Score = 44.871863
},
new StudentScore()
{
Type = "Homework", Score = 10.530585
},
new StudentScore()
{
Type = "Quiz", Score = 19.218864
}
}
};
}
}
public class StudentScore
{
public string Type { get; set; }
public double Score { get; set; }
}
public enum ScoreType
{
Exam,
Quiz,
Homework
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.