Skip to content

Instantly share code, notes, and snippets.

@ayende
Created October 11, 2019 18:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ayende/fce0ff17bf8feae40cb28daf384e4f55 to your computer and use it in GitHub Desktop.
Save ayende/fce0ff17bf8feae40cb28daf384e4f55 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MimeKit;
using MimeKit.Text;
using MimeKit.Utils;
using Newtonsoft.Json;
namespace ConsoleApp25
{
class Program
{
static char[] spearators = new char[] { ' ', '\t', ',', '!', '\r', '(', ')', '?', '-', '"', '\n', '/' };
static char[] trim = new char[] { '.', };
public class DocRef
{
public string Id;
public string Path;
public long Number;
}
public enum Fields
{
Body,
Subject,
Date,
From,
To
}
static void Main(string[] args)
{
var dir = @"C:\Users\ayende\Downloads\maildir\";
var docs = new ConcurrentDictionary<long, DocRef>();
long index = 0;
var fields = new ConcurrentDictionary<string, ConcurrentQueue<long>>[]
{
new ConcurrentDictionary<string, ConcurrentQueue<long>>(), // Body
new ConcurrentDictionary<string, ConcurrentQueue<long>>(), // Subject
new ConcurrentDictionary<string, ConcurrentQueue<long>>(), // Date
new ConcurrentDictionary<string, ConcurrentQueue<long>>(), // From
new ConcurrentDictionary<string, ConcurrentQueue<long>>(), // To
};
var blockingCollection = new BlockingCollection<string>(2048);
var tasks = new List<Task>();
for (int i = 0; i < 16; i++)
{
var task = Task.Run(() =>
{
while (blockingCollection.IsCompleted == false)
{
using var stream = File.OpenRead(blockingCollection.Take());
var parser = new MimeParser(stream, MimeFormat.Entity);
while (parser.IsEndOfStream == false)
{
var entity = parser.ParseMessage();
var cur = Interlocked.Increment(ref index);
foreach (var item in entity.To)
{
fields[(int)Fields.To]
.GetOrAdd(item.ToString().ToLower(), _ => new ConcurrentQueue<long>()).Enqueue(cur);
}
foreach (var item in entity.From)
{
fields[(int)Fields.From]
.GetOrAdd(item.ToString().ToLower(), _ => new ConcurrentQueue<long>()).Enqueue(cur);
}
foreach (var item in Process(entity.Subject))
{
fields[(int)Fields.Subject]
.GetOrAdd(item, _ => new ConcurrentQueue<long>()).Enqueue(cur);
}
foreach (var item in Process(entity.GetTextBody(TextFormat.Plain)))
{
fields[(int)Fields.Subject]
.GetOrAdd(item, _ => new ConcurrentQueue<long>()).Enqueue(cur);
}
fields[(int)Fields.Date].GetOrAdd(entity.Date.ToString("r"), _ => new ConcurrentQueue<long>()).Enqueue(cur);
docs.TryAdd(cur, new DocRef
{
Id = entity.MessageId,
Path = stream.Name,
Number = cur
});
}
}
});
tasks.Add(task);
}
var so = Stopwatch.StartNew();
tasks.Add(Task.Run(() =>
{
foreach (var file in Directory.EnumerateFiles(dir, "*", SearchOption.AllDirectories))
{
blockingCollection.Add(file);
}
blockingCollection.CompleteAdding();
}));
var final = Task.WhenAll(tasks.ToArray());
while (final.Wait(5000) == false)
{
Console.WriteLine(docs.Count);
}
Console.WriteLine(GC.GetTotalMemory(true));
Console.WriteLine(so.Elapsed);
}
private static HashSet<string> Process(string text)
{
return text
.Split(spearators, StringSplitOptions.RemoveEmptyEntries)
.Select(x => x.Trim(trim).ToLower())
.Where(x =>
{
if (x.Length > 3)
return true;
if (x.Length == 0)
return false;
return char.IsDigit(x[0]);
})
.ToHashSet();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment