Skip to content

Instantly share code, notes, and snippets.

@smarenich
Last active December 19, 2016 14:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save smarenich/df646f34947d65b88d69cd0f52e7a7f2 to your computer and use it in GitHub Desktop.
Save smarenich/df646f34947d65b88d69cd0f52e7a7f2 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ItemImportMultiThreaded
{
public class Item
{
public string InventoryID { get; set; }
public string Description { get; set; }
public string Uom { get; set; }
public decimal CurrentPrice { get; set; }
public decimal Weight { get; set; }
public string SupplierName { get; set; }
public string SupplierPartNo { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ItemImportMultiThreaded
{
public class ItemImporter
{
private IN202500.Screen _itemsScreen;
private static object _itemsSchemaLock = new object();
private static IN202500.Content _itemsSchema;
public void Login(string url, string username, string password, string company)
{
Console.WriteLine("[{0}] Logging in to {1}...", System.Threading.Thread.CurrentThread.ManagedThreadId, url);
_itemsScreen = new IN202500.Screen();
_itemsScreen.Url = url + "/Soap/IN202500.asmx";
_itemsScreen.EnableDecompression = true;
_itemsScreen.CookieContainer = new System.Net.CookieContainer();
_itemsScreen.Timeout = 36000;
_itemsScreen.Login(username, password);
Console.WriteLine("[{0}] Logged in to {1}.", System.Threading.Thread.CurrentThread.ManagedThreadId, url);
lock (_itemsSchemaLock)
{
// Threads can share the same schema.
if (_itemsSchema == null)
{
Console.WriteLine("[{0}] Retrieving IN202500 schema...", System.Threading.Thread.CurrentThread.ManagedThreadId);
_itemsSchema = _itemsScreen.GetSchema();
if (_itemsSchema == null) throw new Exception("IN202500 GetSchema returned null. See AC-73433.");
}
}
}
public void Logout()
{
_itemsScreen.Logout();
}
public void Import(List<Item> items)
{
Console.WriteLine("[{0}] Submitting {1} items to Acumatica...", System.Threading.Thread.CurrentThread.ManagedThreadId, items.Count);
var commands = new IN202500.Command[]
{
_itemsSchema.StockItemSummary.InventoryID,
_itemsSchema.StockItemSummary.Description,
_itemsSchema.GeneralSettingsUnitOfMeasureBaseUnit.BaseUnit,
_itemsSchema.PriceCostInfoPriceManagement.DefaultPrice,
_itemsSchema.PackagingDimensions.Weight,
_itemsSchema.VendorDetails.VendorID,
_itemsSchema.VendorDetails.VendorInventoryID,
_itemsSchema.Actions.Save
};
string[][] data = new string[items.Count][];
int count = 0;
foreach(Item item in items)
{
data[count] = new string[7];
data[count][0] = item.InventoryID;
data[count][1] = item.Description.Trim();
data[count][2] = item.Uom;
data[count][3] = item.CurrentPrice.ToString(System.Globalization.CultureInfo.InvariantCulture);
data[count][4] = item.Weight.ToString(System.Globalization.CultureInfo.InvariantCulture);
data[count][5] = item.SupplierName;
data[count][6] = item.SupplierPartNo;
count++;
}
_itemsScreen.Import(commands, null, data, false, true, true);
Console.WriteLine("[{0}] Submitted {1} items to Acumatica.", System.Threading.Thread.CurrentThread.ManagedThreadId, items.Count);
}
}
}
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ItemImportMultiThreaded
{
class Program
{
const int WorkerCount = 16;
const int BatchSize = 200;
const int ProcessLimit = 5000;
const string Url = "http://ec2-54-209-21-238.compute-1.amazonaws.com";
static void Main(string[] args)
{
System.Net.ServicePointManager.DefaultConnectionLimit = 100;
Stopwatch sw = new Stopwatch();
sw.Start();
Console.WriteLine("Loading CSV to queue...");
var queue = LoadItemsToQueue();
int count = queue.Count;
Console.WriteLine("Loading completed. Queue contains {0} items.", count);
Console.WriteLine("Starting {0} worker threads", WorkerCount);
//Note: i was previously using the Task Parallel Library but switched to pure threads to make sure the tasks are not simply pushed to the thread-pool.
List<Thread> threads = new List<Thread>();
for (int i = 0; i < WorkerCount; i++)
{
var t = new Thread(() => ProcessQueue(queue, Url));
t.Start();
threads.Add(t);
}
foreach (var t in threads)
{
t.Join();
}
sw.Stop();
//Initialization takes about 20 seconds which we substract from the time to get a more accurate measurement of orders per hour
Console.WriteLine("All threads have completed. Total time elapsed: {0} seconds. Total processed: {1}. Items per hour: {2}", sw.Elapsed.TotalMinutes * 60, count, count / sw.Elapsed.TotalMinutes * 60);
Console.WriteLine("Press any key to continue.");
Console.ReadKey();
}
private static void ProcessQueue(ConcurrentQueue<Item> queue, string url)
{
Item item;
var itemsToImport = new List<Item>();
var importer = new ItemImporter();
importer.Login(url, "admin", "admin", null);
while (true)
{
if (queue.TryDequeue(out item))
{
itemsToImport.Add(item);
if (itemsToImport.Count == BatchSize)
{
importer.Import(itemsToImport);
itemsToImport.Clear();
}
}
else
{
if (itemsToImport.Count > 0)
{
importer.Import(itemsToImport);
}
Console.WriteLine("[{0}] Queue is now empty - we can stop.", System.Threading.Thread.CurrentThread.ManagedThreadId);
break;
}
}
importer.Logout();
}
private static ConcurrentQueue<Item> LoadItemsToQueue()
{
var queue = new ConcurrentQueue<Item>();
using (StreamReader reader = File.OpenText(Environment.GetCommandLineArgs()[1]))
{
var csv = new CsvReader(reader);
csv.Configuration.IgnoreHeaderWhiteSpace = true;
while (csv.Read())
{
var item = csv.GetRecord<Item>();
queue.Enqueue(item);
if (ProcessLimit > 0 && queue.Count >= ProcessLimit) break;
}
}
return queue;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment