Skip to content

Instantly share code, notes, and snippets.

@stdray
Created May 30, 2014 13:09
Show Gist options
  • Save stdray/fb3b490602e6d59ee851 to your computer and use it in GitHub Desktop.
Save stdray/fb3b490602e6d59ee851 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Xml.Linq;
using System.Xml.XPath;
using Akka.Actor;
using Akka.Event;
using CsvHelper;
namespace ExtDoc2Parser
{
using Settings = Properties.Settings;
using Data = Tuple<string, string, string>;
public class Parser : TypedActor, IHandle<byte[]>
{
private readonly ActorRef _writer;
private readonly LoggingAdapter _logger = Logging.GetLogger(Context);
private readonly Stopwatch _timer = new Stopwatch();
private int _parsedCount = 0;
public Parser(ActorRef writer)
{
_writer = writer;
}
public void Handle(byte[] message)
{
_timer.Start();
var data = Parse(message);
_timer.Stop();
foreach (var d in data)
_writer.Tell(d);
if (++_parsedCount % Settings.Default.LogPeriod != 0)
return;
var avr = ((double)_timer.ElapsedMilliseconds) / Settings.Default.LogPeriod;
_logger.Log(LogLevel.InfoLevel, "avr time : {0}ms", avr);
_timer.Reset();
}
private static IEnumerable<Data> Parse(byte[] doc)
{
return from el in XElement.Load(new MemoryStream(doc)).Descendants("anyType")
let amounts = el.XPathSelectElements(Settings.Default.LastColumnSelector)
let amountsValue = Value(amounts)
where !string.IsNullOrEmpty(amountsValue)
let notificationNumbers = el.Descendants().Where(d => d.Name.LocalName == "notificationNumber")
let ordinalNumbers =
el.XPathSelectElements(".//*[local-name() = 'lot']/*[local-name() = 'ordinalNumber']")
select new Data(Value(notificationNumbers), Value(ordinalNumbers), Value(amounts));
}
private static string Value(IEnumerable<XElement> xElements)
{
return string.Join(" ", xElements.Select(x => x.Value));
}
}
public class Writer : TypedActor, IHandle<Data>
{
private readonly CsvWriter _csv;
private readonly LoggingAdapter _logger = Logging.GetLogger(Context);
private int _writedCount = 0;
public Writer(TextWriter writer)
{
_csv = new CsvWriter(writer);
_csv.Configuration.RegisterClassMap<AccumulationDataMap>();
_csv.Configuration.Delimiter = Settings.Default.Delimiter;
_csv.WriteHeader<Data>();
}
public void Handle(Data message)
{
_csv.WriteRecord(message);
if (++_writedCount % Settings.Default.LogPeriod == 0)
_logger.Log(LogLevel.InfoLevel, "write records: {0}", _writedCount);
}
}
}
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using Akka.Actor;
using Akka.Routing;
using ExtDoc2Parser.Model;
namespace ExtDoc2Parser
{
using Settings = Properties.Settings;
internal class Program
{
private static void Main()
{
Console.WriteLine("start");
using (var system = ActorSystem.Create("extDoc2"))
using (var streamWriter = new StreamWriter(Settings.Default.FileName))
using (var db = new marketDB { CommandTimeout = Settings.Default.CommandTimeout })
{
var writer = system.ActorOf(Props.Create(() => new Writer(streamWriter)), "writer");
var parsers = Enumerable
.Range(0, Settings.Default.DegreeOfParallelism)
.Select(i => system.ActorOf(Props.Create(() => new Parser(writer)), "parser_" + i))
.ToList();
var router = system.ActorOf(new Props().WithRouter(new RoundRobinGroup(parsers)));
var data = db.EXTDOC2
.Where(d => d.EXTERNALSOURCEID == Settings.Default.ExternalSourceId)
.Where(d => d.EXTERNALDATAKIND == Settings.Default.DataKindId)
.Select(d => d.CONTENT);
var rowNumber = 0;
var timer = Stopwatch.StartNew();
foreach (var d in data)
{
router.Tell(d);
if (++rowNumber % Settings.Default.LogPeriod == 0)
Console.WriteLine("[{2}] fetch items: {0}, avr time: {1}ms",
rowNumber,
timer.ElapsedMilliseconds / rowNumber,
DateTime.Now);
}
writer.GracefulStop(TimeSpan.FromSeconds(200)).Wait();
}
Console.WriteLine("finish");
Console.ReadLine();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment