|
using System; |
|
using System.Collections.Generic; |
|
using System.Configuration; |
|
using System.IO; |
|
using System.Linq; |
|
using System.Diagnostics; |
|
using System.Threading.Tasks; |
|
using System.Threading; |
|
using Microsoft.Azure.Documents; |
|
using Microsoft.Azure.Documents.Client; |
|
using Microsoft.Azure.Documents.Linq; |
|
using Newtonsoft; |
|
using Newtonsoft.Json; |
|
using System.Reflection; |
|
using System.Security.Cryptography; |
|
using System.Text; |
|
|
|
namespace DocumentDBPerfScaleTest |
|
{ |
|
|
|
class Program |
|
{ |
|
|
|
//Assign a name for your database |
|
private static readonly string databaseId = ConfigurationManager.AppSettings["DatabaseId"]; |
|
private static readonly string collectionId = ConfigurationManager.AppSettings["CollectionId"]; |
|
|
|
//Read the DocumentDB endpointUrl and authorisationKeys from config |
|
private static readonly string endpointUrl = ConfigurationManager.AppSettings["EndPointUrl"]; |
|
private static readonly string authorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"]; |
|
|
|
private static readonly string collectionTier = ConfigurationManager.AppSettings["CollectionTier"]; |
|
|
|
private static readonly int Seed = Convert.ToInt32(ConfigurationManager.AppSettings["Seed"]); |
|
private static readonly int NumberOfDocuments = Convert.ToInt32(ConfigurationManager.AppSettings["NumberOfDocuments"]); |
|
private static readonly string studentName = ConfigurationManager.AppSettings["StudentName"]; |
|
|
|
private static readonly int defaultNumberOfThreads = 8; |
|
|
|
private static OperationType opType; |
|
|
|
private static DocumentClient client; |
|
private static DocumentCollection StudentsCollection; |
|
|
|
|
|
private static string colSelfLink; |
|
private static string bulkLoadSprocSelfLink; |
|
|
|
private static double totalRequestCharge = 0.0; |
|
private static double totalDurationInSeconds = 0.0; |
|
private static int bulkLoadBatchSize = Convert.ToInt32(ConfigurationManager.AppSettings["BulkLoadBatchSize"]); |
|
private static int readBatchSize = Convert.ToInt32(ConfigurationManager.AppSettings["ReadBatchSize"]); |
|
private static bool displayDebugInfo = Convert.ToBoolean(ConfigurationManager.AppSettings["DisplayDebugInfo"]); |
|
private static bool displayHttp429Error = Convert.ToBoolean(ConfigurationManager.AppSettings["DisplayHttp429Error"]); |
|
|
|
private static int consoleWindowBufferWidth = Convert.ToInt32(ConfigurationManager.AppSettings["ConsoleWindowBufferWidth"]); |
|
private static int consoleWindowBufferHeight = Convert.ToInt32(ConfigurationManager.AppSettings["ConsoleWindowBufferHeight"]); |
|
private static int consoleWindowWidth = Convert.ToInt32(ConfigurationManager.AppSettings["ConsoleWindowWidth"]); |
|
private static int consoleWindowHeight = Convert.ToInt32(ConfigurationManager.AppSettings["ConsoleWindowHeight"]); |
|
|
|
|
|
private static int numberOfThreads = 0; |
|
private static List<List<Student>> allStudentsCopy = null; |
|
|
|
private static List<List<OperationReponseInfo>> allOperationResponses = null; |
|
private static Student[] allStudentsOriginal = null; |
|
|
|
|
|
|
|
/// <summary> |
|
/// ReadSingle: Read where query results a single document |
|
/// ReadBatch: Read where query results multiple documents and we read in a batch |
|
/// Insert: Single insert via create API |
|
/// BulkInsert: Bulk insert via stored procedure |
|
/// </summary> |
|
private enum OperationType |
|
{ |
|
ReadSingle, |
|
ReadBatch, |
|
Insert, |
|
BulkInsert |
|
}; |
|
|
|
static void Main(string[] args) |
|
{ |
|
|
|
try |
|
{ |
|
|
|
// Initialize, create Database, collection, stored procedure etc |
|
Init(); |
|
|
|
//Measure Request Charge for a specific Operation |
|
MeasureRequestCharge(opType); |
|
|
|
// Run Scale test of various operations - insert, read, bulkinsert and batch read |
|
RunOperationsScaleTest(opType); |
|
|
|
//Delete the Collection |
|
Cleanup(); |
|
|
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
HandleDocumentClientException(de); |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
// just a simplified example - AggregateException may be DocumentClientException or DocumentServiceQueryException |
|
if (ae.GetBaseException().GetType() == typeof(DocumentClientException)) |
|
{ |
|
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException; |
|
HandleDocumentClientException(baseException); |
|
} |
|
} |
|
catch (Exception e) |
|
{ |
|
Exception baseException = e.GetBaseException(); |
|
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); |
|
} |
|
finally |
|
{ |
|
Console.WriteLine("End of Test, press any key to exit."); |
|
Console.ReadKey(); |
|
} |
|
|
|
|
|
} |
|
|
|
private static async Task InitAsync() |
|
{ |
|
|
|
// create DocumentDb client and get database, collection, stored procedure etc |
|
if (client == null) |
|
{ |
|
// get a client |
|
client = new DocumentClient(new Uri(endpointUrl), authorizationKey, |
|
new ConnectionPolicy |
|
{ |
|
ConnectionMode = (ConnectionMode)Enum.Parse(typeof(ConnectionMode), ConfigurationManager.AppSettings["connectionMode"]), |
|
ConnectionProtocol = (Protocol)Enum.Parse(typeof(Protocol), ConfigurationManager.AppSettings["protocol"]) |
|
}); |
|
|
|
|
|
// get database |
|
Database database = await GetOrCreateDatabaseAsync(client, databaseId); |
|
|
|
// get collection |
|
StudentsCollection = client.CreateDocumentCollectionQuery(database.SelfLink).Where(c => c.Id == collectionId).ToArray().FirstOrDefault(); |
|
if (StudentsCollection == null) |
|
{ |
|
StudentsCollection = await CreateDocumentCollectionWithRetriesAsync( |
|
client, |
|
database, |
|
new DocumentCollection { Id = collectionId }, collectionTier); |
|
} |
|
|
|
colSelfLink = StudentsCollection.SelfLink; |
|
|
|
// get or create Stored procedure |
|
StoredProcedure bulkLoadSProc = await GetOrCreateStoredProcedureAsync("BulkImport.js"); |
|
bulkLoadSprocSelfLink = bulkLoadSProc.SelfLink; |
|
|
|
|
|
} |
|
} |
|
|
|
private static void Init() |
|
{ |
|
// set console buffer size as we are going to write to console for debug logging |
|
Console.SetBufferSize(consoleWindowBufferWidth, consoleWindowBufferHeight); |
|
Console.SetWindowSize(consoleWindowWidth, consoleWindowHeight); |
|
|
|
//decide Operation Type |
|
string operationType = ConfigurationManager.AppSettings["OperationType"]; |
|
opType = (OperationType)Enum.Parse(typeof(OperationType), operationType); |
|
|
|
|
|
// client initialization |
|
if (client == null) |
|
{ |
|
try |
|
{ |
|
InitAsync().Wait(); |
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
HandleDocumentClientException(de); |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
// just a simplified example - AggregateException may be DocumentClientException or DocumentServiceQueryException |
|
if (ae.GetBaseException().GetType() == typeof(DocumentClientException)) |
|
{ |
|
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException; |
|
HandleDocumentClientException(baseException); |
|
} |
|
} |
|
catch (Exception e) |
|
{ |
|
Exception baseException = e.GetBaseException(); |
|
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); |
|
} |
|
|
|
} |
|
} |
|
|
|
private static async Task CleanupAsync() |
|
{ |
|
//Database database = await GetNewDatabaseAsync(client, databaseId); |
|
await client.DeleteDocumentCollectionAsync(colSelfLink); |
|
} |
|
|
|
private static void Cleanup() |
|
{ |
|
|
|
try |
|
{ |
|
CleanupAsync().Wait(); |
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
HandleDocumentClientException(de); |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
if (ae.GetBaseException().GetType() == typeof(DocumentClientException)) |
|
{ |
|
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException; |
|
HandleDocumentClientException(baseException); |
|
} |
|
} |
|
catch (Exception e) |
|
{ |
|
Exception baseException = e.GetBaseException(); |
|
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); |
|
} |
|
|
|
|
|
} |
|
|
|
private static void MeasureRequestCharge( OperationType operation) |
|
{ |
|
|
|
Console.WriteLine("************** Measuring Request Charge **************************"); |
|
var program = new Program(); |
|
|
|
switch (operation) |
|
{ |
|
case OperationType.Insert: |
|
// Do a single single insert and measure Request charge |
|
Student student = Student.CreateStudent(studentName); |
|
program.InsertDocument(student, true); |
|
break; |
|
case OperationType.ReadSingle: |
|
// Do a single read and measure Request Charge |
|
program.ReadSingleDocument(studentName, true); |
|
break; |
|
case OperationType.ReadBatch: |
|
// Do a single batch of Read |
|
int batchSize = readBatchSize; |
|
program.ReadMultiDocuments(studentName, true, batchSize); |
|
break; |
|
case OperationType.BulkInsert: |
|
// Do a single batch of Bulk Load and measure Request charge |
|
program.MeasureRequestChargeForBulkInsert(); |
|
break; |
|
default: |
|
break; |
|
} |
|
|
|
Console.WriteLine("************** End of Measuring Request Charge **************************"); |
|
|
|
} |
|
|
|
private void MeasureRequestChargeForBulkInsert() |
|
{ |
|
// list of StudentMasters in the batch |
|
var StudentsInBatch = new List<Student>(); |
|
|
|
var numberOfDosInBatch = bulkLoadBatchSize; |
|
|
|
for (var i = 0; i < numberOfDosInBatch; i++) |
|
{ |
|
Student student = Student.CreateStudent(studentName); |
|
StudentsInBatch.Add(student); |
|
} |
|
|
|
// Insert th documents in a batch |
|
BulkInsertDocuments(StudentsInBatch, true); |
|
} |
|
|
|
|
|
|
|
private static void RunOperationsScaleTest( OperationType operation) |
|
{ |
|
try |
|
{ |
|
|
|
var program = new Program(); |
|
|
|
switch (operation) |
|
{ |
|
case OperationType.Insert: |
|
//Run insert test with collection tier specified in config |
|
Console.WriteLine("************* Insert scale test with Collection Tier: {0} **************", collectionTier); |
|
program.RunOperationInScale(OperationType.Insert); |
|
Console.WriteLine("************* End of Insert scale test with Collection Tier: {0} **************", collectionTier); |
|
break; |
|
case OperationType.ReadSingle: |
|
// Run Read test with collection tier specified in config |
|
Console.WriteLine("************ Single document Read scale test with Collection Tier: {0} **************", collectionTier); |
|
program.RunOperationInScale(OperationType.ReadSingle); |
|
Console.WriteLine("************ End of Single document Read scale test with Collection Tier: {0} **************", collectionTier); |
|
break; |
|
case OperationType.ReadBatch: |
|
// Run read batch scale test with collection tier specified in config |
|
Console.WriteLine("************ Batch Read scale test with Collection Tier: {0} ***************", collectionTier); |
|
program.RunOperationInScale(OperationType.ReadBatch); |
|
Console.WriteLine("************ End of Batch Read scale test with Collection Tier: {0} ***************", collectionTier); |
|
break; |
|
case OperationType.BulkInsert: |
|
// Run Bulk insert test with stored procedure |
|
Console.WriteLine("************ Bulk insert scale test with Collection Tier: {0} *************", collectionTier); |
|
program.RunOperationInScale(OperationType.BulkInsert); |
|
Console.WriteLine("************ End of Bulk insert scale test with Collection Tier: {0} **************", collectionTier); |
|
break; |
|
default: |
|
break; |
|
} |
|
|
|
|
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
HandleDocumentClientException(de); |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
// just a simplified example - AggregateException may be DocumentClientException or DocumentServiceQueryException |
|
if (ae.GetBaseException().GetType() == typeof(DocumentClientException)) |
|
{ |
|
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException; |
|
HandleDocumentClientException(baseException); |
|
} |
|
} |
|
catch (Exception e) |
|
{ |
|
Exception baseException = e.GetBaseException(); |
|
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); |
|
} |
|
finally |
|
{ |
|
Console.WriteLine("Done with DocumentDB Scale Test"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
private static void HandleDocumentClientException(DocumentClientException de) |
|
{ |
|
// We can take custom actions (like Retry etc) based on the information obtained from DocumentClientException object |
|
Console.WriteLine("\nActivityID: {0}", de.ActivityId); |
|
Console.WriteLine("\nError: {0}", de.Error); |
|
Console.WriteLine("\nHTTP Status Code: {0}", de.StatusCode.ToString()); |
|
Console.WriteLine("\nRetryAfter: {0}", de.RetryAfter.TotalSeconds.ToString()); |
|
Console.WriteLine("\nStack Trace: {0}", de.StackTrace); |
|
Console.WriteLine("\nResponse Headers:\n "); |
|
if (de.ResponseHeaders != null && de.ResponseHeaders.Count > 0) |
|
{ |
|
foreach (string key in de.ResponseHeaders) |
|
{ |
|
Console.WriteLine("{0} {1}", key, de.ResponseHeaders[key]); |
|
} |
|
} |
|
Exception baseException = de.GetBaseException(); |
|
Console.WriteLine("Message: {0} \n {1}", de.Message, baseException.Message); |
|
|
|
} |
|
|
|
/// <summary> |
|
/// Create a new database if not exists. If exists, get the database |
|
/// </summary> |
|
/// <param name="client">The DocumentDB client instance.</param> |
|
/// <param name="id">The id of the Database to search for, or create.</param> |
|
/// <returns>The created Database object</returns> |
|
public static async Task<Database> GetOrCreateDatabaseAsync(DocumentClient client, string id) |
|
{ |
|
Database database = client.CreateDatabaseQuery().Where(db => db.Id == id).ToArray().FirstOrDefault(); |
|
if (database == null) |
|
{ |
|
database = await client.CreateDatabaseAsync(new Database { Id = id }); |
|
} |
|
|
|
return database; |
|
} |
|
|
|
/// <summary> |
|
/// Create a DocumentCollection, and retry if throttled. |
|
/// </summary> |
|
/// <param name="client">The DocumentDB client instance.</param> |
|
/// <param name="database">The database to use.</param> |
|
/// <param name="collectionDefinition">The collection definition to use.</param> |
|
/// <param name="offerType">The offer type for the collection.</param> |
|
/// <returns>The created DocumentCollection.</returns> |
|
private static async Task<DocumentCollection> CreateDocumentCollectionWithRetriesAsync( |
|
DocumentClient client, |
|
Database database, |
|
DocumentCollection collectionDefinition, |
|
string offerType = "S1") |
|
{ |
|
return await ExecuteWithRetries(client, () => client.CreateDocumentCollectionAsync( |
|
database.SelfLink, |
|
collectionDefinition, |
|
new RequestOptions { OfferType = offerType }), displayHttp429Error); |
|
} |
|
|
|
|
|
/// <summary> |
|
/// Create a new StoredProcedure for performance benchmarking. |
|
/// </summary> |
|
/// <param name="client">The DocumentDB client instance.</param> |
|
/// <param name="collection">The collection to create the StoredProcedure in.</param> |
|
/// <returns>The created StoredProcedure object</returns> |
|
private static async Task<StoredProcedure> GetOrCreateStoredProcedureAsync(string scriptName) |
|
{ |
|
//Use script name (minus file extension) for the procedure id |
|
string procedureId = Path.GetFileNameWithoutExtension(scriptName); |
|
|
|
StoredProcedure storedProcedure = client.CreateStoredProcedureQuery(colSelfLink).Where(s => s.Id == procedureId).AsEnumerable().FirstOrDefault(); |
|
if (storedProcedure == null) |
|
{ |
|
//Stored procedure doesn't exist. Create it ... |
|
string scriptFileFullPath = string.Format(@"{0}\{1}", ConfigurationManager.AppSettings["ScriptFilePath"], scriptName); |
|
string contents = File.ReadAllText(scriptFileFullPath); |
|
|
|
storedProcedure = new StoredProcedure |
|
{ |
|
Id = procedureId, |
|
Body = contents |
|
}; |
|
storedProcedure = await ExecuteWithRetries( |
|
client, |
|
() => client.CreateStoredProcedureAsync(colSelfLink, storedProcedure), displayHttp429Error); |
|
|
|
} |
|
|
|
return storedProcedure; |
|
} |
|
|
|
|
|
/// <summary> |
|
/// Execute the function with retries on throttle. |
|
/// </summary> |
|
/// <typeparam name="V"></typeparam> |
|
/// <param name="client"></param> |
|
/// <param name="function"></param> |
|
/// <param name="showDebugInfo"></param> |
|
/// <param name="threadNumber"></param> |
|
/// <param name="iterationNumber"></param> |
|
/// <returns></returns> |
|
private static async Task<V> ExecuteWithRetries<V>(DocumentClient client, Func<Task<V>> function, bool showHttp429Error, int threadNumber, int iterationNumber) |
|
{ |
|
TimeSpan sleepTime = TimeSpan.Zero; |
|
|
|
while (true) |
|
{ |
|
try |
|
{ |
|
return await function(); |
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
if ((int)de.StatusCode != 429) |
|
{ |
|
throw; |
|
} |
|
if (true == showHttp429Error) |
|
{ |
|
Console.WriteLine("{0}\t{1} in Thread:{2} Iteration:{3} resulted a HTTP 429", DateTime.UtcNow, function.Method.Name, threadNumber, iterationNumber); |
|
} |
|
sleepTime = de.RetryAfter; |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
if (!(ae.InnerException is DocumentClientException)) |
|
{ |
|
throw; |
|
} |
|
|
|
DocumentClientException de = (DocumentClientException)ae.InnerException; |
|
if ((int)de.StatusCode != 429) |
|
{ |
|
throw; |
|
} |
|
if (true == showHttp429Error) |
|
{ |
|
Console.WriteLine("{0}\t{1} in Thread:{2} Iteration:{3} resulted a HTTP 429", DateTime.UtcNow, function.Method.Name, threadNumber, iterationNumber); |
|
} |
|
sleepTime = de.RetryAfter; |
|
} |
|
|
|
await Task.Delay(sleepTime); |
|
} |
|
} |
|
|
|
/// <summary> |
|
/// Execute the function with retries on throttle |
|
/// </summary> |
|
/// <typeparam name="V"></typeparam> |
|
/// <param name="client"></param> |
|
/// <param name="function"></param> |
|
/// <param name="showDebugInfo"></param> |
|
/// <param name="threadNumber"></param> |
|
/// <param name="iterationNumber"></param> |
|
/// <returns></returns> |
|
private static async Task<V> ExecuteWithRetries<V>(DocumentClient client, Func<Task<V>> function, bool showHttp429Error) |
|
{ |
|
TimeSpan sleepTime = TimeSpan.Zero; |
|
|
|
while (true) |
|
{ |
|
try |
|
{ |
|
return await function(); |
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
if ((int)de.StatusCode != 429) |
|
{ |
|
throw; |
|
} |
|
if (true == showHttp429Error) |
|
{ |
|
Console.WriteLine("{0}\t{1} resulted a HTTP 429", DateTime.UtcNow, function.Method.Name); |
|
} |
|
sleepTime = de.RetryAfter; |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
if (!(ae.InnerException is DocumentClientException)) |
|
{ |
|
throw; |
|
} |
|
|
|
DocumentClientException de = (DocumentClientException)ae.InnerException; |
|
if ((int)de.StatusCode != 429) |
|
{ |
|
throw; |
|
} |
|
if (true == showHttp429Error) |
|
{ |
|
Console.WriteLine("{0}\t{1} resulted a HTTP 429", DateTime.UtcNow, function.Method.Name); |
|
} |
|
sleepTime = de.RetryAfter; |
|
} |
|
|
|
await Task.Delay(sleepTime); |
|
} |
|
} |
|
|
|
|
|
private Student[] CreateOrGetListOfStudents() |
|
{ |
|
if (null == allStudentsOriginal) |
|
{ |
|
allStudentsOriginal = new Student[NumberOfDocuments]; |
|
for (int i = 0; i < NumberOfDocuments; i++) |
|
{ |
|
var student = Student.CreateStudent(studentName); |
|
allStudentsOriginal[i] = student; |
|
} |
|
} |
|
return allStudentsOriginal; |
|
|
|
} |
|
|
|
private void AssignListOfStudentsToThreads(OperationType operation) |
|
{ |
|
Student[] allStudentsOrig = CreateOrGetListOfStudents(); |
|
|
|
allStudentsCopy = new List<List<Student>>(); |
|
var remainingDocuments = NumberOfDocuments; |
|
var remainingThreads = numberOfThreads; |
|
|
|
int start = 0; |
|
while (remainingDocuments > 0) |
|
{ |
|
var numberOfDocumentsPerThread = Convert.ToInt32(Math.Round((double)remainingDocuments / remainingThreads, MidpointRounding.AwayFromZero)); |
|
int end = start + numberOfDocumentsPerThread; |
|
var StudentsForEachThread = new List<Student>(); |
|
for (var i = start; i < end; i++) |
|
{ |
|
StudentsForEachThread.Add(allStudentsOrig[i]); |
|
} |
|
allStudentsCopy.Add(StudentsForEachThread); |
|
remainingDocuments -= numberOfDocumentsPerThread; |
|
remainingThreads--; |
|
start = end; |
|
} |
|
} |
|
|
|
|
|
|
|
private void CreateResponseInfoListForEachThread(int NumberOfThreads) |
|
{ |
|
//create a |
|
allOperationResponses = new List<List<OperationReponseInfo>>(); |
|
for (int i = 0; i < NumberOfThreads; i++) |
|
{ |
|
var operationResponeListPerThread = new List<OperationReponseInfo>(); |
|
allOperationResponses.Add(operationResponeListPerThread); |
|
} |
|
} |
|
|
|
private void RunOperationInScale(OperationType operation) |
|
{ |
|
totalDurationInSeconds = 0; |
|
|
|
// determine number of Threads to use |
|
switch(operation) |
|
{ |
|
case OperationType.Insert: |
|
numberOfThreads = Convert.ToInt32(ConfigurationManager.AppSettings["NumberOfThreadsForInsert"]); |
|
break; |
|
case OperationType.ReadSingle: |
|
numberOfThreads = Convert.ToInt32(ConfigurationManager.AppSettings["NumberOfThreadsForReadSingle"]); |
|
break; |
|
case OperationType.ReadBatch: |
|
numberOfThreads = Convert.ToInt32(ConfigurationManager.AppSettings["NumberOfThreadsForReadMulti"]); |
|
break; |
|
case OperationType.BulkInsert: |
|
numberOfThreads = Convert.ToInt32(ConfigurationManager.AppSettings["NumberOfThreadsForBulkLoad"]); |
|
break; |
|
default: |
|
numberOfThreads = defaultNumberOfThreads; |
|
break; |
|
} |
|
|
|
// Create a list of Students and assign them into multiple threads |
|
// This is more relevant for inserts, because we will insert these Student Documents |
|
// For Read, List of documents for each thread just tells us how many reads per Thread - for simplicity, we will use same query on every read |
|
AssignListOfStudentsToThreads(operation); |
|
|
|
//Create Operation ResponseInfo List for each Thread |
|
// ResponseInfo tracks Response counters (we will use for summary) for every request |
|
CreateResponseInfoListForEachThread(numberOfThreads); |
|
|
|
var threads = new Thread[numberOfThreads]; |
|
|
|
// start a stopwatch |
|
Stopwatch totalDurationClock = new Stopwatch(); |
|
totalDurationClock.Start(); |
|
|
|
|
|
for (var i = 0; i < numberOfThreads; i++) |
|
{ |
|
//allOperationResponses.Add(new List<OperationReponseInfo>()); |
|
Thread newThread = null; |
|
switch (operation) |
|
{ |
|
case OperationType.Insert: |
|
{ |
|
newThread = new Thread(DoInsertWork); |
|
threads[i] = newThread; |
|
threads[i].Start(new DoInsertWorkArgs() { Students = allStudentsCopy[i], ShowDebugInfo = displayDebugInfo, Responses = allOperationResponses[i], ThreadNumber = i + 1 }); |
|
break; |
|
} |
|
|
|
case OperationType.ReadSingle: |
|
{ |
|
newThread = new Thread(DoSingleReadWork); |
|
threads[i] = newThread; |
|
int readsPerThread = allStudentsCopy[i].Count; |
|
threads[i].Start(new DoSingleReadWorkArgs() { NumberOfReadsPerThread = readsPerThread, ShowDebugInfo = displayDebugInfo, Responses = allOperationResponses[i], ThreadNumber = i + 1 }); |
|
break; |
|
} |
|
|
|
case OperationType.BulkInsert: |
|
{ |
|
newThread = new Thread(DoBulkInsertWork); |
|
threads[i] = newThread; |
|
threads[i].Start(new DoBulkInsertWorkArgs() { Students = allStudentsCopy[i], ShowDebugInfo = displayDebugInfo, Responses = allOperationResponses[i], ThreadNumber = i + 1 }); |
|
break; |
|
} |
|
case OperationType.ReadBatch: |
|
{ |
|
newThread = new Thread(DoMultiReadWork); |
|
threads[i] = newThread; |
|
int readsPerThread = allStudentsCopy[i].Count; |
|
threads[i].Start(new DoMultiReadWorkArgs { NumberOfReadsPerThread = readsPerThread, ShowDebugInfo = displayDebugInfo, Responses = allOperationResponses[i], ThreadNumber = i + 1 }); |
|
break; |
|
} |
|
default: |
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
for (var i = 0; i < numberOfThreads; i++) |
|
{ |
|
threads[i].Join(); |
|
} |
|
|
|
// measure the total duration of the test |
|
totalDurationClock.Stop(); |
|
long totalElapsedMilliseconds = totalDurationClock.ElapsedMilliseconds; |
|
totalDurationInSeconds = (double)totalElapsedMilliseconds / 1000; |
|
|
|
// Show Run Results summary |
|
ShowRunResults(operation); |
|
|
|
|
|
} |
|
|
|
private void ShowRunResults(OperationType operation) |
|
{ |
|
|
|
// Initialize totalRequestCharge and totalDurationInSeconds |
|
totalRequestCharge = 0; |
|
|
|
//start adding all Request Charges and Elapsed time for all operations from all threads |
|
long timeElapsedinMilliseconds = 0; |
|
for (var i = 0; i < allOperationResponses.Count; i++) |
|
{ |
|
List<OperationReponseInfo> opResponseListPerThread = allOperationResponses[i]; |
|
for (var j = 0; j < opResponseListPerThread.Count; j++) |
|
{ |
|
OperationReponseInfo opResponse = opResponseListPerThread[j]; |
|
timeElapsedinMilliseconds += opResponse.ElapsedMilliseconds; |
|
totalRequestCharge += opResponse.OpRequestCharge; |
|
} |
|
|
|
} |
|
|
|
Console.WriteLine("+++++++++++++++++++++++++++++++"); |
|
Console.WriteLine("{0}\t Result for Collection: {1} in Tier: {2}", DateTime.UtcNow, StudentsCollection.Id, collectionTier); |
|
Console.WriteLine(" Number of Threads used: {0}", numberOfThreads); |
|
if (operation == OperationType.BulkInsert) |
|
{ |
|
Console.WriteLine("Bulk Insert Batch Size = {0}", bulkLoadBatchSize); |
|
} |
|
else if (operation == OperationType.ReadBatch) |
|
{ |
|
Console.WriteLine("Read Batch Size = {0}", readBatchSize); |
|
} |
|
|
|
// |
|
string operationDone = ""; |
|
if (operation == OperationType.BulkInsert || operation == OperationType.Insert) |
|
{ |
|
operationDone = "Inserted"; |
|
} |
|
else if (operation == OperationType.ReadBatch || operation == OperationType.ReadSingle) |
|
{ |
|
operationDone = "Read"; |
|
} |
|
|
|
double operationsPerSecond = 0.0; |
|
double RequestUnits = 0.0; |
|
|
|
Console.WriteLine("++++++++++++++++++++++++++++++++"); |
|
Console.WriteLine("Result Summary (Total Duration measures the total time spent on the test for all threads):"); |
|
Console.WriteLine("{0} Documents {1} in {2} seconds", NumberOfDocuments, operationDone, totalDurationInSeconds); |
|
operationsPerSecond = (double)NumberOfDocuments / totalDurationInSeconds; |
|
RequestUnits = totalRequestCharge / totalDurationInSeconds; |
|
Console.WriteLine(" Operations per Second: {0}", operationsPerSecond); |
|
Console.WriteLine("Average Request Units: {0} per second", RequestUnits); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task<int> BulkInsertDocumentsAsync(List<Student> Students, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int iterationNumber) |
|
{ |
|
// Keep track of the number of documents loaded |
|
int documentsLoaded = 0; |
|
|
|
// Save original batch size in case complete batch doesn't get loaded |
|
int batchSize = Students.Count; |
|
do |
|
{ |
|
Stopwatch clock = new Stopwatch(); |
|
clock.Start(); |
|
StoredProcedureResponse<int> scriptResult = await ExecuteWithRetries(client, () => client.ExecuteStoredProcedureAsync<int>(bulkLoadSprocSelfLink, Students), displayHttp429Error, threadNumber, iterationNumber); |
|
clock.Stop(); |
|
|
|
int countOfLoadedDocs = scriptResult.Response; |
|
double lastRequestCharge = scriptResult.RequestCharge; |
|
|
|
if (opsRespInfo != null) |
|
{ |
|
opsRespInfo.ElapsedMilliseconds = clock.ElapsedMilliseconds; |
|
opsRespInfo.OpRequestCharge = lastRequestCharge; |
|
} |
|
|
|
//Keep track of the number of documents we've loaded so far |
|
documentsLoaded += countOfLoadedDocs; |
|
|
|
//Check to see if the entire batch was loaded. |
|
if (countOfLoadedDocs < Students.Count) |
|
{ |
|
//Remove elements from the Students List<Student> that have already been loaded before we re-send the documents that weren't processed ... |
|
Students.RemoveRange(0, countOfLoadedDocs); |
|
} |
|
|
|
//for debug |
|
if (true == showDebugInfo) |
|
{ |
|
Console.WriteLine("{0}\tBulk Insert of {1} documents by Thread:{2} and Iteration:{3}, # of RUs: {4}", DateTime.UtcNow, countOfLoadedDocs, threadNumber, iterationNumber, lastRequestCharge); |
|
} |
|
|
|
} while (documentsLoaded < batchSize); |
|
|
|
//Return count of documents loaded |
|
return documentsLoaded; |
|
|
|
} |
|
|
|
private async Task<int> BulkInsertDocumentsAsync(List<Student> Students, bool showDebugInfo) |
|
{ |
|
// Keep track of the number of documents loaded |
|
int documentsLoaded = 0; |
|
|
|
// Save original batch size in case complete batch doesn't get loaded |
|
int batchSize = Students.Count; |
|
do |
|
{ |
|
Stopwatch clock = new Stopwatch(); |
|
clock.Start(); |
|
StoredProcedureResponse<int> scriptResult = await ExecuteWithRetries(client, () => client.ExecuteStoredProcedureAsync<int>(bulkLoadSprocSelfLink, Students), displayHttp429Error); |
|
clock.Stop(); |
|
|
|
int countOfLoadedDocs = scriptResult.Response; |
|
double lastRequestCharge = scriptResult.RequestCharge; |
|
|
|
//Keep track of the number of documents we've loaded so far |
|
documentsLoaded += countOfLoadedDocs; |
|
|
|
//Check to see if the entire batch was loaded. |
|
if (countOfLoadedDocs < Students.Count) |
|
{ |
|
//Remove elements from the Students List<Student> that have already been loaded before we re-send the documents that weren't processed ... |
|
Students.RemoveRange(0, countOfLoadedDocs); |
|
} |
|
|
|
//for debug |
|
if (true == showDebugInfo) |
|
{ |
|
// since we already increased totalOperationsCount, iteration# will be totalOperationsCount, and NOT (totalOperationsCount+1) |
|
Console.WriteLine("{0}\tBulk Insert of {1} documents, # of RUs: {2}", DateTime.UtcNow, countOfLoadedDocs, lastRequestCharge); |
|
} |
|
|
|
} while (documentsLoaded < batchSize); |
|
|
|
//Return count of documents loaded |
|
return documentsLoaded; |
|
|
|
} |
|
|
|
|
|
private void BulkInsertDocuments(List<Student> Students, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int iterationNumber) |
|
{ |
|
try |
|
{ |
|
BulkInsertDocumentsAsync(Students, showDebugInfo, opsRespInfo, threadNumber, iterationNumber).Wait(); |
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
HandleDocumentClientException(de); |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
if (ae.GetBaseException().GetType() == typeof(DocumentClientException)) |
|
{ |
|
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException; |
|
HandleDocumentClientException(baseException); |
|
} |
|
} |
|
catch (Exception e) |
|
{ |
|
Exception baseException = e.GetBaseException(); |
|
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); |
|
} |
|
} |
|
|
|
private void BulkInsertDocuments(List<Student> Students, bool showDebugInfo) |
|
{ |
|
try |
|
{ |
|
BulkInsertDocumentsAsync(Students, showDebugInfo).Wait(); |
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
HandleDocumentClientException(de); |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
if (ae.GetBaseException().GetType() == typeof(DocumentClientException)) |
|
{ |
|
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException; |
|
HandleDocumentClientException(baseException); |
|
} |
|
} |
|
catch (Exception e) |
|
{ |
|
Exception baseException = e.GetBaseException(); |
|
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); |
|
} |
|
} |
|
|
|
|
|
public class DoBulkInsertWorkArgs |
|
{ |
|
public List<Student> Students { get; set; } |
|
public bool ShowDebugInfo { get; set; } |
|
public List<OperationReponseInfo> Responses { get; set; } |
|
public int ThreadNumber { get; set; } |
|
} |
|
|
|
public void DoBulkInsertWork(object data) |
|
{ |
|
var args = (DoBulkInsertWorkArgs)data; |
|
|
|
List<Student> StudentsPerThread = args.Students; |
|
var opsResponseInfoListPerThread = args.Responses; |
|
var threadNumber = args.ThreadNumber; |
|
List<Student> StudentsPerBatch = null; |
|
|
|
int batchStart = 0; |
|
int numStudentsPerThread = args.Students.Count; |
|
int remainingStudents = numStudentsPerThread; |
|
var effectiveBatchSize = 0; |
|
var iterationCount = 0; |
|
|
|
|
|
while (remainingStudents > 0) |
|
{ |
|
// create a list of Students to be bulkloaded in a single batch |
|
StudentsPerBatch = new List<Student>(); |
|
|
|
// Effective batchSize will be least of remainingStudents and bulkLoadBatchSize |
|
effectiveBatchSize = (remainingStudents <= bulkLoadBatchSize) ? remainingStudents : bulkLoadBatchSize; |
|
|
|
int batchEnd = batchStart + effectiveBatchSize; |
|
for (int i = batchStart; i < batchEnd; i++) |
|
{ |
|
StudentsPerBatch.Add(StudentsPerThread[i]); |
|
} |
|
|
|
//Create a new OperationReponseInfo |
|
var opsResponseInfo = new OperationReponseInfo(); |
|
opsResponseInfoListPerThread.Add(opsResponseInfo); |
|
++iterationCount; |
|
|
|
// bulkload docs equal to Effective batchSize |
|
BulkInsertDocuments(StudentsPerBatch, args.ShowDebugInfo, opsResponseInfo, threadNumber, iterationCount); |
|
|
|
remainingStudents -= effectiveBatchSize; |
|
batchStart = batchEnd; |
|
} |
|
} |
|
|
|
|
|
|
|
/// <summary> |
|
/// Async method for inserting a document, with retries, when insert is being done on multiple threads and we would like to keep track of Request charge |
|
/// and Elapsed time on each thread and iteration |
|
/// </summary> |
|
/// <param name="student"></param> |
|
/// <param name="showDebugInfo"></param> |
|
/// <param name="opsRespInfo"></param> |
|
/// <param name="threadNumber"></param> |
|
/// <param name="iterationNumber"></param> |
|
/// <returns></returns> |
|
private async Task InsertDocumentAsync(Student student, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int iterationNumber) |
|
{ |
|
|
|
Stopwatch clock = new Stopwatch(); |
|
clock.Start(); |
|
ResourceResponse<Document> response = await ExecuteWithRetries(client, () => client.CreateDocumentAsync(colSelfLink, student), displayHttp429Error, threadNumber, iterationNumber); |
|
clock.Stop(); |
|
|
|
if (opsRespInfo != null) |
|
{ |
|
opsRespInfo.ElapsedMilliseconds = clock.ElapsedMilliseconds; |
|
opsRespInfo.OpRequestCharge = response.RequestCharge; |
|
} |
|
|
|
|
|
if (true == showDebugInfo) |
|
{ |
|
Console.WriteLine("{0}\tInsert by Thread:{1} and Iteration:{2}, # of RUs: {3}, Elapsed time: {4} ms", DateTime.UtcNow, threadNumber, iterationNumber, response.RequestCharge, clock.ElapsedMilliseconds); |
|
} |
|
|
|
} |
|
|
|
/// <summary> |
|
/// Async method for inserting a single document, with retries, on a single thread |
|
/// </summary> |
|
/// <param name="student"></param> |
|
/// <param name="showDebugInfo"></param> |
|
/// <returns></returns> |
|
private async Task InsertDocumentAsync(Student student, bool showDebugInfo) |
|
{ |
|
|
|
ResourceResponse<Document> response = await ExecuteWithRetries(client, () => client.CreateDocumentAsync(colSelfLink, student), displayHttp429Error); |
|
|
|
if (true == showDebugInfo) |
|
{ |
|
Console.WriteLine("{0}\tInsert Operation, # of RUs: {1}", DateTime.UtcNow, response.RequestCharge); |
|
} |
|
|
|
} |
|
|
|
private void InsertDocument(Student student, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int iterationNumber) |
|
{ |
|
|
|
try |
|
{ |
|
InsertDocumentAsync(student, showDebugInfo, opsRespInfo, threadNumber, iterationNumber).Wait(); |
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
HandleDocumentClientException(de); |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
if (ae.GetBaseException().GetType() == typeof(DocumentClientException)) |
|
{ |
|
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException; |
|
HandleDocumentClientException(baseException); |
|
} |
|
} |
|
catch (Exception e) |
|
{ |
|
Exception baseException = e.GetBaseException(); |
|
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); |
|
} |
|
//return responseDetails; |
|
} |
|
|
|
private void InsertDocument(Student student, bool showDebugInfo) |
|
{ |
|
|
|
try |
|
{ |
|
InsertDocumentAsync(student, showDebugInfo).Wait(); |
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
HandleDocumentClientException(de); |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
if (ae.GetBaseException().GetType() == typeof(DocumentClientException)) |
|
{ |
|
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException; |
|
HandleDocumentClientException(baseException); |
|
} |
|
} |
|
catch (Exception e) |
|
{ |
|
Exception baseException = e.GetBaseException(); |
|
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); |
|
} |
|
//return responseDetails; |
|
} |
|
|
|
|
|
public class DoInsertWorkArgs |
|
{ |
|
public List<Student> Students { get; set; } |
|
public bool ShowDebugInfo { get; set; } |
|
public List<OperationReponseInfo> Responses { get; set; } |
|
public int ThreadNumber { get; set; } |
|
} |
|
|
|
public void DoInsertWork(object data) |
|
{ |
|
var args = (DoInsertWorkArgs)data; |
|
|
|
var opsResponseInfoListPerThread = args.Responses; |
|
|
|
var threadNumber = args.ThreadNumber; |
|
|
|
var iterationCount = 0; |
|
|
|
foreach (var student in args.Students) |
|
{ |
|
//Create a new OperationReponseInfo |
|
var opsResponseInfo = new OperationReponseInfo(); |
|
opsResponseInfoListPerThread.Add(opsResponseInfo); |
|
++iterationCount; |
|
InsertDocument(student, args.ShowDebugInfo, opsResponseInfo, threadNumber, iterationCount); |
|
} |
|
} |
|
|
|
|
|
public class DoSingleReadWorkArgs |
|
{ |
|
public int NumberOfReadsPerThread { get; set; } |
|
public bool ShowDebugInfo { get; set; } |
|
public List<OperationReponseInfo> Responses { get; set; } |
|
public int ThreadNumber { get; set; } |
|
} |
|
|
|
public void DoSingleReadWork(object data) |
|
{ |
|
var args = (DoSingleReadWorkArgs)data; |
|
var opsResponseInfoListPerThread = args.Responses; |
|
var threadNumber = args.ThreadNumber; |
|
var readsPerThread = args.NumberOfReadsPerThread; |
|
var iterationCount = 0; |
|
//string studentName = "John Doe"; |
|
|
|
while (iterationCount < readsPerThread) |
|
{ |
|
//Create a new OperationReponseInfo |
|
var opsResponseInfo = new OperationReponseInfo(); |
|
opsResponseInfoListPerThread.Add(opsResponseInfo); |
|
++iterationCount; |
|
ReadSingleDocument(studentName, args.ShowDebugInfo, opsResponseInfo, threadNumber, iterationCount); |
|
} |
|
} |
|
|
|
private async Task<FeedResponse<Student>> QuerySingleDocumentAsync(IDocumentQuery<dynamic> query) |
|
{ |
|
return await query.ExecuteNextAsync<Student>(); |
|
} |
|
|
|
private async Task<Student> ReadSingleDocumentAsync(string studentName, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int iterationNumber) |
|
{ |
|
//tell server we only want 1 record |
|
FeedOptions options = new FeedOptions { MaxItemCount = 1 }; |
|
string queryString = "SELECT * FROM Students s WHERE s.Name = '" + studentName + "'"; |
|
IDocumentQuery<dynamic> query = client.CreateDocumentQuery(colSelfLink, queryString, options).AsDocumentQuery(); |
|
|
|
// start timer and measure time for running a query, including retries |
|
Stopwatch clock = new Stopwatch(); |
|
clock.Start(); |
|
FeedResponse<Student> queryResponse = await ExecuteWithRetries(client, () => QuerySingleDocumentAsync(query), displayHttp429Error, threadNumber, iterationNumber); |
|
clock.Stop(); |
|
|
|
// populate OperationReponseInfo counters that will use later for Summary |
|
if (opsRespInfo != null) |
|
{ |
|
opsRespInfo.ElapsedMilliseconds = clock.ElapsedMilliseconds; |
|
opsRespInfo.OpRequestCharge = queryResponse.RequestCharge; |
|
opsRespInfo.NumberOfDocumentsRead = queryResponse.Count; |
|
} |
|
|
|
// for debug |
|
if (true == showDebugInfo) |
|
{ |
|
Console.WriteLine("{0}\tSingle Document Read by Thread:{1} and Iteration:{2}, # of RUs: {3}", DateTime.UtcNow, threadNumber, iterationNumber, queryResponse.RequestCharge); |
|
} |
|
|
|
Student student = queryResponse.AsEnumerable().FirstOrDefault(); |
|
return student; |
|
} |
|
|
|
private async Task<Student> ReadSingleDocumentAsync(string studentName, bool showDebugInfo) |
|
{ |
|
//tell server we only want 1 record |
|
FeedOptions options = new FeedOptions { MaxItemCount = 1 }; |
|
string queryString = "SELECT * FROM Students s WHERE s.Name = '" + studentName + "'"; |
|
IDocumentQuery<dynamic> query = client.CreateDocumentQuery(colSelfLink, queryString, options).AsDocumentQuery(); |
|
|
|
// start timer and measure time for running a query, including retries |
|
Stopwatch clock = new Stopwatch(); |
|
clock.Start(); |
|
FeedResponse<Student> queryResponse = await ExecuteWithRetries(client, () => QuerySingleDocumentAsync(query), displayHttp429Error); |
|
clock.Stop(); |
|
|
|
|
|
// for debug |
|
if (true == showDebugInfo) |
|
{ |
|
Console.WriteLine("{0}\tSingle Document Read, # of RUs: {1}", DateTime.UtcNow, queryResponse.RequestCharge); |
|
} |
|
|
|
Student student = queryResponse.AsEnumerable().FirstOrDefault(); |
|
return student; |
|
} |
|
|
|
private void ReadSingleDocument(string studentName, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int iterationNumber) |
|
{ |
|
try |
|
{ |
|
// Async query |
|
ReadSingleDocumentAsync(studentName, showDebugInfo, opsRespInfo, threadNumber, iterationNumber).Wait(); |
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
HandleDocumentClientException(de); |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
if (ae.GetBaseException().GetType() == typeof(DocumentClientException)) |
|
{ |
|
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException; |
|
HandleDocumentClientException(baseException); |
|
} |
|
} |
|
catch (Exception e) |
|
{ |
|
Exception baseException = e.GetBaseException(); |
|
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); |
|
} |
|
} |
|
|
|
private void ReadSingleDocument(string studentName, bool showDebugInfo) |
|
{ |
|
try |
|
{ |
|
// Async query |
|
ReadSingleDocumentAsync(studentName, showDebugInfo).Wait(); |
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
HandleDocumentClientException(de); |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
if (ae.GetBaseException().GetType() == typeof(DocumentClientException)) |
|
{ |
|
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException; |
|
HandleDocumentClientException(baseException); |
|
} |
|
} |
|
catch (Exception e) |
|
{ |
|
Exception baseException = e.GetBaseException(); |
|
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); |
|
} |
|
} |
|
|
|
|
|
public class DoMultiReadWorkArgs |
|
{ |
|
public int NumberOfReadsPerThread { get; set; } |
|
public bool ShowDebugInfo { get; set; } |
|
public List<OperationReponseInfo> Responses { get; set; } |
|
public int ThreadNumber { get; set; } |
|
} |
|
|
|
public void DoMultiReadWork(object data) |
|
{ |
|
var args = (DoMultiReadWorkArgs)data; |
|
var opsResponseInfoListPerThread = args.Responses; |
|
var threadNumber = args.ThreadNumber; |
|
var readsPerThread = args.NumberOfReadsPerThread; |
|
var remainingDocsToRead = readsPerThread; |
|
var effectiveBatchSize = 0; |
|
//string studentName = "John Doe"; |
|
|
|
while ( remainingDocsToRead > 0) |
|
{ |
|
//Create a new OperationReponseInfo |
|
var opsResponseInfo = new OperationReponseInfo(); |
|
opsResponseInfoListPerThread.Add(opsResponseInfo); |
|
|
|
// Effective batchSize will be least of remainingStudents and bulkLoadBatchSize |
|
effectiveBatchSize = (remainingDocsToRead <= readBatchSize) ? remainingDocsToRead : readBatchSize; |
|
|
|
// read a single batch |
|
ReadMultiDocuments(studentName, args.ShowDebugInfo, opsResponseInfo, threadNumber, effectiveBatchSize); |
|
remainingDocsToRead -= effectiveBatchSize; |
|
} |
|
|
|
} |
|
|
|
|
|
private async Task<List<Student>> ReadMultiDocumentAsync(string studentName, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int batchSize) |
|
{ |
|
// Fetch records per batchSize |
|
FeedOptions options = new FeedOptions { MaxItemCount = batchSize }; |
|
string queryString = "SELECT * FROM Students s WHERE s.Name = '" + studentName + "'"; |
|
IDocumentQuery<dynamic> query = client.CreateDocumentQuery(colSelfLink, queryString, options).AsDocumentQuery(); |
|
|
|
FeedResponse<Student> queryResponse = null; |
|
|
|
List<Student> studentsRetrieved = new List<Student>(); |
|
|
|
Stopwatch clock = new Stopwatch(); |
|
var iterationNumber = 0; |
|
while ( true == query.HasMoreResults) |
|
{ |
|
iterationNumber++; |
|
// start timer and measure time for running a query, including retries |
|
clock.Start(); |
|
queryResponse = await ExecuteWithRetries(client, () => QuerySingleDocumentAsync(query), displayHttp429Error, threadNumber, iterationNumber); |
|
clock.Stop(); |
|
|
|
// for debug |
|
if (true == showDebugInfo) |
|
{ |
|
Console.WriteLine("{0}\t {1} Documents Read by Thread:{2} in {3} ms, # of RUs: {4}", DateTime.UtcNow, queryResponse.Count, threadNumber, clock.ElapsedMilliseconds, queryResponse.RequestCharge); |
|
} |
|
|
|
// populate OperationReponseInfo counters that will use later for Summary |
|
if (opsRespInfo != null) |
|
{ |
|
opsRespInfo.ElapsedMilliseconds += clock.ElapsedMilliseconds; |
|
opsRespInfo.OpRequestCharge += queryResponse.RequestCharge; |
|
opsRespInfo.NumberOfDocumentsRead += queryResponse.Count; |
|
} |
|
|
|
//add to retrieved Students List |
|
List<Student> students = queryResponse.AsEnumerable() as List<Student>; |
|
studentsRetrieved.AddRange(students); |
|
} |
|
|
|
|
|
return studentsRetrieved; |
|
} |
|
|
|
private async Task<List<Student>> ReadMultiDocumentAsync(string studentName, bool showDebugInfo, int batchSize) |
|
{ |
|
// Fetch records per batchSize |
|
FeedOptions options = new FeedOptions { MaxItemCount = batchSize }; |
|
string queryString = "SELECT * FROM Students s WHERE s.Name = '" + studentName + "'"; |
|
IDocumentQuery<dynamic> query = client.CreateDocumentQuery(colSelfLink, queryString, options).AsDocumentQuery(); |
|
|
|
FeedResponse<Student> queryResponse = null; |
|
|
|
List<Student> studentsRetrieved = new List<Student>(); |
|
|
|
Stopwatch clock = new Stopwatch(); |
|
while (true == query.HasMoreResults) |
|
{ |
|
// start timer and measure time for running a query, including retries |
|
clock.Start(); |
|
queryResponse = await ExecuteWithRetries(client, () => QuerySingleDocumentAsync(query), displayHttp429Error); |
|
clock.Stop(); |
|
|
|
// for debug |
|
if (true == showDebugInfo) |
|
{ |
|
Console.WriteLine("{0}\t {1} Documents Read in {2} ms, # of RUs: {3}", DateTime.UtcNow, queryResponse.Count, clock.ElapsedMilliseconds, queryResponse.RequestCharge); |
|
} |
|
|
|
//add to retrieved Students List |
|
List<Student> students = queryResponse.AsEnumerable() as List<Student>; |
|
studentsRetrieved.AddRange(students); |
|
} |
|
|
|
|
|
return studentsRetrieved; |
|
} |
|
|
|
private void ReadMultiDocuments(string studentName, bool showDebugInfo, OperationReponseInfo opsRespInfo, int threadNumber, int batchSize) |
|
{ |
|
try |
|
{ |
|
// Async query |
|
ReadMultiDocumentAsync(studentName, showDebugInfo, opsRespInfo, threadNumber, batchSize).Wait(); |
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
HandleDocumentClientException(de); |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
if (ae.GetBaseException().GetType() == typeof(DocumentClientException)) |
|
{ |
|
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException; |
|
HandleDocumentClientException(baseException); |
|
} |
|
} |
|
catch (Exception e) |
|
{ |
|
Exception baseException = e.GetBaseException(); |
|
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); |
|
} |
|
} |
|
|
|
private void ReadMultiDocuments(string studentName, bool showDebugInfo, int batchSize) |
|
{ |
|
try |
|
{ |
|
// Async query |
|
ReadMultiDocumentAsync(studentName, showDebugInfo, batchSize).Wait(); |
|
} |
|
catch (DocumentClientException de) |
|
{ |
|
HandleDocumentClientException(de); |
|
} |
|
catch (AggregateException ae) |
|
{ |
|
if (ae.GetBaseException().GetType() == typeof(DocumentClientException)) |
|
{ |
|
DocumentClientException baseException = ae.GetBaseException() as DocumentClientException; |
|
HandleDocumentClientException(baseException); |
|
} |
|
} |
|
catch (Exception e) |
|
{ |
|
Exception baseException = e.GetBaseException(); |
|
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message); |
|
} |
|
} |
|
|
|
|
|
} |
|
|
|
public class OperationReponseInfo |
|
{ |
|
public List<Student> ListOfStudents { get; set; } |
|
public int NumberOfDocumentsRead { get; set; } |
|
public int NumberOfDocumentsInserted { get; set; } |
|
public double OpRequestCharge { get; set; } |
|
public long ElapsedMilliseconds { get; set; } |
|
|
|
public OperationReponseInfo() |
|
{ |
|
ListOfStudents = null; |
|
NumberOfDocumentsRead = 0; |
|
NumberOfDocumentsInserted = 0; |
|
OpRequestCharge = 0.0; |
|
ElapsedMilliseconds = 0; |
|
} |
|
} |
|
} |