Last active
December 19, 2016 14:33
-
-
Save smarenich/df646f34947d65b88d69cd0f52e7a7f2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace ItemImportMultiThreaded | |
{ | |
public class Item | |
{ | |
public string InventoryID { get; set; } | |
public string Description { get; set; } | |
public string Uom { get; set; } | |
public decimal CurrentPrice { get; set; } | |
public decimal Weight { get; set; } | |
public string SupplierName { get; set; } | |
public string SupplierPartNo { get; set; } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace ItemImportMultiThreaded | |
{ | |
public class ItemImporter | |
{ | |
private IN202500.Screen _itemsScreen; | |
private static object _itemsSchemaLock = new object(); | |
private static IN202500.Content _itemsSchema; | |
public void Login(string url, string username, string password, string company) | |
{ | |
Console.WriteLine("[{0}] Logging in to {1}...", System.Threading.Thread.CurrentThread.ManagedThreadId, url); | |
_itemsScreen = new IN202500.Screen(); | |
_itemsScreen.Url = url + "/Soap/IN202500.asmx"; | |
_itemsScreen.EnableDecompression = true; | |
_itemsScreen.CookieContainer = new System.Net.CookieContainer(); | |
_itemsScreen.Timeout = 36000; | |
_itemsScreen.Login(username, password); | |
Console.WriteLine("[{0}] Logged in to {1}.", System.Threading.Thread.CurrentThread.ManagedThreadId, url); | |
lock (_itemsSchemaLock) | |
{ | |
// Threads can share the same schema. | |
if (_itemsSchema == null) | |
{ | |
Console.WriteLine("[{0}] Retrieving IN202500 schema...", System.Threading.Thread.CurrentThread.ManagedThreadId); | |
_itemsSchema = _itemsScreen.GetSchema(); | |
if (_itemsSchema == null) throw new Exception("IN202500 GetSchema returned null. See AC-73433."); | |
} | |
} | |
} | |
public void Logout() | |
{ | |
_itemsScreen.Logout(); | |
} | |
public void Import(List<Item> items) | |
{ | |
Console.WriteLine("[{0}] Submitting {1} items to Acumatica...", System.Threading.Thread.CurrentThread.ManagedThreadId, items.Count); | |
var commands = new IN202500.Command[] | |
{ | |
_itemsSchema.StockItemSummary.InventoryID, | |
_itemsSchema.StockItemSummary.Description, | |
_itemsSchema.GeneralSettingsUnitOfMeasureBaseUnit.BaseUnit, | |
_itemsSchema.PriceCostInfoPriceManagement.DefaultPrice, | |
_itemsSchema.PackagingDimensions.Weight, | |
_itemsSchema.VendorDetails.VendorID, | |
_itemsSchema.VendorDetails.VendorInventoryID, | |
_itemsSchema.Actions.Save | |
}; | |
string[][] data = new string[items.Count][]; | |
int count = 0; | |
foreach(Item item in items) | |
{ | |
data[count] = new string[7]; | |
data[count][0] = item.InventoryID; | |
data[count][1] = item.Description.Trim(); | |
data[count][2] = item.Uom; | |
data[count][3] = item.CurrentPrice.ToString(System.Globalization.CultureInfo.InvariantCulture); | |
data[count][4] = item.Weight.ToString(System.Globalization.CultureInfo.InvariantCulture); | |
data[count][5] = item.SupplierName; | |
data[count][6] = item.SupplierPartNo; | |
count++; | |
} | |
_itemsScreen.Import(commands, null, data, false, true, true); | |
Console.WriteLine("[{0}] Submitted {1} items to Acumatica.", System.Threading.Thread.CurrentThread.ManagedThreadId, items.Count); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace ItemImportMultiThreaded | |
{ | |
class Program | |
{ | |
const int WorkerCount = 16; | |
const int BatchSize = 200; | |
const int ProcessLimit = 5000; | |
const string Url = "http://ec2-54-209-21-238.compute-1.amazonaws.com"; | |
static void Main(string[] args) | |
{ | |
System.Net.ServicePointManager.DefaultConnectionLimit = 100; | |
Stopwatch sw = new Stopwatch(); | |
sw.Start(); | |
Console.WriteLine("Loading CSV to queue..."); | |
var queue = LoadItemsToQueue(); | |
int count = queue.Count; | |
Console.WriteLine("Loading completed. Queue contains {0} items.", count); | |
Console.WriteLine("Starting {0} worker threads", WorkerCount); | |
//Note: i was previously using the Task Parallel Library but switched to pure threads to make sure the tasks are not simply pushed to the thread-pool. | |
List<Thread> threads = new List<Thread>(); | |
for (int i = 0; i < WorkerCount; i++) | |
{ | |
var t = new Thread(() => ProcessQueue(queue, Url)); | |
t.Start(); | |
threads.Add(t); | |
} | |
foreach (var t in threads) | |
{ | |
t.Join(); | |
} | |
sw.Stop(); | |
//Initialization takes about 20 seconds which we substract from the time to get a more accurate measurement of orders per hour | |
Console.WriteLine("All threads have completed. Total time elapsed: {0} seconds. Total processed: {1}. Items per hour: {2}", sw.Elapsed.TotalMinutes * 60, count, count / sw.Elapsed.TotalMinutes * 60); | |
Console.WriteLine("Press any key to continue."); | |
Console.ReadKey(); | |
} | |
private static void ProcessQueue(ConcurrentQueue<Item> queue, string url) | |
{ | |
Item item; | |
var itemsToImport = new List<Item>(); | |
var importer = new ItemImporter(); | |
importer.Login(url, "admin", "admin", null); | |
while (true) | |
{ | |
if (queue.TryDequeue(out item)) | |
{ | |
itemsToImport.Add(item); | |
if (itemsToImport.Count == BatchSize) | |
{ | |
importer.Import(itemsToImport); | |
itemsToImport.Clear(); | |
} | |
} | |
else | |
{ | |
if (itemsToImport.Count > 0) | |
{ | |
importer.Import(itemsToImport); | |
} | |
Console.WriteLine("[{0}] Queue is now empty - we can stop.", System.Threading.Thread.CurrentThread.ManagedThreadId); | |
break; | |
} | |
} | |
importer.Logout(); | |
} | |
private static ConcurrentQueue<Item> LoadItemsToQueue() | |
{ | |
var queue = new ConcurrentQueue<Item>(); | |
using (StreamReader reader = File.OpenText(Environment.GetCommandLineArgs()[1])) | |
{ | |
var csv = new CsvReader(reader); | |
csv.Configuration.IgnoreHeaderWhiteSpace = true; | |
while (csv.Read()) | |
{ | |
var item = csv.GetRecord<Item>(); | |
queue.Enqueue(item); | |
if (ProcessLimit > 0 && queue.Count >= ProcessLimit) break; | |
} | |
} | |
return queue; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment