Skip to content

Instantly share code, notes, and snippets.

@abdullin
Last active April 19, 2017 11:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abdullin/bae1150fb8a09d15d14cdc208e4558e4 to your computer and use it in GitHub Desktop.
Save abdullin/bae1150fb8a09d15d14cdc208e4558e4 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Threading;
namespace SearchBench {
public struct SkuEntry {
public readonly long TenantId;
public readonly long ProductId;
public readonly string Sku;
public SkuEntry(long tenantId, long productId, string sku) {
TenantId = tenantId;
ProductId = productId;
Sku = sku;
}
}
public interface ISkuSearch {
void AddSkus(IList<SkuEntry> listOfProductIdsAndSkus);
void DeleteSkus(IList<SkuEntry> listOfSkusToRemove);
IList<long> FindProducts(long tenantId, string sku, int limit);
}
class Program {
static void Main(string[] args) {
var file = args[0];
ISkuSearch search = new RinatsSearch();
Console.WriteLine("Basic unit test");
BasicUnitTest(search);
const int batchSize = 1000;
using (var loader = new Loader(file)) {
Console.WriteLine("Warm-up to avoid JIT overhead");
for (var i = 0; i < 10; i++) {
search.AddSkus(loader.GetNext(batchSize));
}
Console.WriteLine("Replay scenario");
var total = 0L;
var randomTenants = new HashSet<long>();
var randomSamples = new List<SkuEntry>();
for (var i = 10; i < 40; i++) {
var increase = (long)Math.Pow(2, i);
Console.Write("Adding {0}... ", increase);
var watch = Stopwatch.StartNew();
long j;
for (j = 0; j < increase; j += batchSize) {
var list = loader.GetNext(batchSize);
search.AddSkus(list);
// sampling
for (var k = 0; k < 10; k++) {
randomTenants.Add(list[k].TenantId);
randomSamples.Add(list[k]);
}
}
Console.WriteLine("Speed: {0:0.00}r/s", increase / watch.Elapsed.TotalSeconds);
total += j;
Console.Write("Seq search (fail) ");
watch.Restart();
var ts = randomTenants.OrderBy(t => t).ToList();
var searchSize = 1000;
for (var k = 0; k < searchSize; k++) {
var found =
search.FindProducts(ts[k % ts.Count], Guid.NewGuid().ToString(), int.MaxValue).Count;
if (found > 0) {
throw new ApplicationException("Should never find this");
}
}
Console.WriteLine("Speed {0:0.00}r/s", 1D * searchSize / watch.Elapsed.TotalSeconds);
Console.Write("Seq search (pass) ");
watch.Restart();
for (var k = 0; k < searchSize; k++) {
var s = randomSamples[k % randomSamples.Count];
var sku = s.Sku;
if (sku.Length > 6) {
sku = sku.Substring(3);
}
var found = search.FindProducts(s.TenantId, sku, int.MaxValue).Count;
if (found == 0) {
throw new ApplicationException("Should find this");
}
}
Console.WriteLine("Speed {0:0.00}r/s", 1D * searchSize / watch.Elapsed.TotalSeconds);
Console.Write("Parallel {0} read + write...", Environment.ProcessorCount);
long parallel = 0;
using (var cts = new CancellationTokenSource()) {
for (var k = 0; k < Environment.ProcessorCount; k++) {
new Thread(() => {
while (!cts.IsCancellationRequested) {
var pos = (long)((uint)Environment.TickCount);
var sku = randomSamples[(int)(pos % randomSamples.Count)];
search.FindProducts(sku.TenantId, sku.Sku, int.MaxValue);
Interlocked.Increment(ref parallel);
}
}).Start();
}
new Thread(() => {
while (!cts.IsCancellationRequested) {
search.AddSkus(loader.GetNext(1));
}
}).Start();
Thread.Sleep(500);
Interlocked.Exchange(ref parallel, 0);
Thread.Sleep(1000);
var count = Interlocked.Read(ref parallel);
cts.Cancel();
Console.WriteLine("{0}r/s", count);
Thread.Sleep(500);
}
}
}
Console.ReadLine();
// starting
}
sealed class Loader : IDisposable {
readonly StreamReader _reader;
static readonly char[] Separator = {'\t'};
public Loader(string file) {
if (file.Contains("gz")) {
_reader = new StreamReader(new GZipStream(File.OpenRead(file), CompressionMode.Decompress));
} else {
_reader = new StreamReader(File.OpenRead(file));
}
_reader.ReadLine();
}
readonly List<SkuEntry> _buffer = new List<SkuEntry>();
public IList<SkuEntry> GetNext(int count) {
_buffer.Clear();
while (_buffer.Count < count) {
var s = _reader.ReadLine();
if (s == null) {
throw new ApplicationException("End of line");
}
var tabs = s.Split(Separator, 4);
if (tabs.Length < 4) {
continue;
}
var tenant = long.Parse(tabs[0]);
var key = byte.Parse(tabs[1]);
var productId = long.Parse(tabs[2]);
if (key != 1 && key != 5) {
continue;
}
_buffer.Add(new SkuEntry(tenant, productId, tabs[3]));
}
return _buffer;
}
public void Dispose() {
_reader.Dispose();
}
}
static void BasicUnitTest(ISkuSearch search) {
// using tenant 0 for testing
const long t = 0;
ShouldFind(search, t, "test", 0);
search.AddSkus(new List<SkuEntry> {
new SkuEntry(t, 1, "test1"),
new SkuEntry(t, 2, "test2")
});
ShouldFind(search, t, "test", 2);
ShouldFind(search, t, "est", 2);
ShouldFind(search, t, "test1", 1);
ShouldFind(search, t, "test2", 1);
ShouldFind(search, t, "est2", 1);
ShouldFind(search, t, "est3", 0);
search.DeleteSkus(new List<SkuEntry> {
new SkuEntry(t, 2, "test2")
});
ShouldFind(search, t, "test2", 0);
ShouldFind(search, t, "test1", 1);
}
static void ShouldFind(ISkuSearch search, long tenantId, string query, int shouldFind) {
var count = search.FindProducts(tenantId, query, shouldFind + 1).Count;
if (count == shouldFind) {
return;
}
var msg = string.Format("Expected to find {0} of {1}, got {2}", shouldFind, query, count);
throw new InvalidOperationException(msg);
}
}
sealed class RinatsSearch : ISkuSearch {
readonly ConcurrentDictionary<long, List<SkuEntry>> _dict =
new ConcurrentDictionary<long, List<SkuEntry>>();
public void AddSkus(IList<SkuEntry> listOfProductIdsAndSkus) {
foreach (var sku in listOfProductIdsAndSkus) {
_dict.AddOrUpdate(sku.TenantId, l => new List<SkuEntry> { sku },
(l, list) => new List<SkuEntry>(list) {sku});
}
}
public void DeleteSkus(IList<SkuEntry> listOfSkusToRemove) {
foreach (var sku in listOfSkusToRemove) {
_dict.AddOrUpdate(sku.TenantId, l => new List<SkuEntry>(), (l, list) => {
var clone = new List<SkuEntry>(list);
clone.RemoveAll(s => s.ProductId == sku.ProductId);
return clone;
});
}
}
public IList<long> FindProducts(long tenantId, string sku, int limit) {
List<SkuEntry> value;
if (!_dict.TryGetValue(tenantId, out value)) {
return new long[0];
}
return value
.Where(s => s.Sku.IndexOf(sku, StringComparison.OrdinalIgnoreCase) >= 0).Take(limit)
.Select(s => s.ProductId)
.ToList();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment