Created
February 27, 2018 06:17
-
-
Save kekyo/11b5da1548f82658dbdf63a0678a895b to your computer and use it in GitHub Desktop.
elixir_agg_csv performance test code (https://github.com/enpedasi/elixir_agg_csv/blob/master/cs/Program.cs)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace csharp_agg_csv2 | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
if (args.Length < 1) | |
{ | |
Console.WriteLine("usage : ConsoleApp1.exe filename.csv"); | |
} | |
var readFilePath = args[0]; | |
Stopwatch sw = new Stopwatch(); | |
sw.Start(); | |
Console.WriteLine("test start"); | |
var results = ProcessReadAsync(readFilePath).Result; | |
Console.WriteLine(results.Count); | |
var sorted = results | |
.AsParallel() | |
.WithDegreeOfParallelism(Environment.ProcessorCount) | |
.OrderByDescending((x) => x.Value).Take(10); | |
//var sorted = wordMap.OrderByDescending((x) => x.Value); | |
sw.Stop(); | |
foreach (var rec in sorted) | |
{ | |
Console.WriteLine(rec.Key + ":" + rec.Value.ToString()); | |
} | |
var ts = sw.Elapsed; | |
Console.WriteLine($" {ts.Hours}時間 {ts.Minutes}分 {ts.Seconds}秒 {ts.Milliseconds}ミリ秒"); | |
Console.ReadKey(true); | |
} | |
private static async Task<Dictionary<string, int>> ProcessReadAsync(string path) | |
{ | |
char[] separators = { ',' }; | |
// Max queueable lines: 1024 | |
var queue = new BlockingCollection<List<string>>(new ConcurrentBag<List<string>>()); | |
var readerTask = ReadFromCSVAsync(path, queue); | |
var results = queue | |
.GetConsumingEnumerable() | |
.AsParallel() | |
.WithDegreeOfParallelism(Environment.ProcessorCount) | |
.SelectMany(lines => lines) | |
.Select(line => line.Split(separators).Skip(1).First().Trim('"')) | |
.GroupBy(column1 => column1) | |
.ToDictionary(g => g.Key, g => g.Count()); | |
await readerTask; | |
return results; | |
} | |
private static Task ReadFromCSVAsync( | |
string path, | |
BlockingCollection<List<string>> queue) | |
{ | |
return Task.Run(() => | |
{ | |
try | |
{ | |
using (var fs = new FileStream(path, | |
FileMode.Open, FileAccess.Read, FileShare.Read, | |
4096, FileOptions.SequentialScan)) | |
{ | |
var tr = new StreamReader(fs); | |
var lines = new List<string>(1000); | |
while (!tr.EndOfStream) | |
{ | |
var line = tr.ReadLine(); | |
if (string.IsNullOrWhiteSpace(line)) | |
{ | |
continue; | |
} | |
lines.Add(line); | |
if (lines.Count >= 1000) | |
{ | |
queue.Add(lines); | |
lines = new List<string>(1000); | |
} | |
} | |
if (lines.Count >= 1) | |
{ | |
queue.Add(lines); | |
} | |
} | |
} | |
finally | |
{ | |
queue.CompleteAdding(); | |
} | |
}); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment