Skip to content

Instantly share code, notes, and snippets.

@kekyo
Created February 27, 2018 06:17
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kekyo/11b5da1548f82658dbdf63a0678a895b to your computer and use it in GitHub Desktop.
Save kekyo/11b5da1548f82658dbdf63a0678a895b to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace csharp_agg_csv2
{
class Program
{
static void Main(string[] args)
{
if (args.Length < 1)
{
Console.WriteLine("usage : ConsoleApp1.exe filename.csv");
}
var readFilePath = args[0];
Stopwatch sw = new Stopwatch();
sw.Start();
Console.WriteLine("test start");
var results = ProcessReadAsync(readFilePath).Result;
Console.WriteLine(results.Count);
var sorted = results
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.OrderByDescending((x) => x.Value).Take(10);
//var sorted = wordMap.OrderByDescending((x) => x.Value);
sw.Stop();
foreach (var rec in sorted)
{
Console.WriteLine(rec.Key + ":" + rec.Value.ToString());
}
var ts = sw.Elapsed;
Console.WriteLine($" {ts.Hours}時間 {ts.Minutes}分 {ts.Seconds}秒 {ts.Milliseconds}ミリ秒");
Console.ReadKey(true);
}
private static async Task<Dictionary<string, int>> ProcessReadAsync(string path)
{
char[] separators = { ',' };
// Max queueable lines: 1024
var queue = new BlockingCollection<List<string>>(new ConcurrentBag<List<string>>());
var readerTask = ReadFromCSVAsync(path, queue);
var results = queue
.GetConsumingEnumerable()
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.SelectMany(lines => lines)
.Select(line => line.Split(separators).Skip(1).First().Trim('"'))
.GroupBy(column1 => column1)
.ToDictionary(g => g.Key, g => g.Count());
await readerTask;
return results;
}
private static Task ReadFromCSVAsync(
string path,
BlockingCollection<List<string>> queue)
{
return Task.Run(() =>
{
try
{
using (var fs = new FileStream(path,
FileMode.Open, FileAccess.Read, FileShare.Read,
4096, FileOptions.SequentialScan))
{
var tr = new StreamReader(fs);
var lines = new List<string>(1000);
while (!tr.EndOfStream)
{
var line = tr.ReadLine();
if (string.IsNullOrWhiteSpace(line))
{
continue;
}
lines.Add(line);
if (lines.Count >= 1000)
{
queue.Add(lines);
lines = new List<string>(1000);
}
}
if (lines.Count >= 1)
{
queue.Add(lines);
}
}
}
finally
{
queue.CompleteAdding();
}
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment