Skip to content

Instantly share code, notes, and snippets.

@bostelk
Created July 13, 2024 11:45
Show Gist options
  • Save bostelk/6fc4c38a0dfaa9b5b4fe1fec9c934f5e to your computer and use it in GitHub Desktop.
Save bostelk/6fc4c38a0dfaa9b5b4fe1fec9c934f5e to your computer and use it in GitHub Desktop.
Parse a large JSON Lines file in a high performance manner with minimal memory overhead.
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
using System.Text;
//using Newtonsoft.Json;
using System.Diagnostics;
using System.Collections.Concurrent;
using System.Threading;
using Utf8Json;
namespace ConsoleApp1
{
class Program
{
static async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
static private void ProcessLine(ReadOnlySequence<byte> line)
{
string converted = Encoding.UTF8.GetString(line);
//var json = JsonConvert.DeserializeObject(converted); slow
var json = JsonSerializer.Deserialize<dynamic>(converted);
if (!silent)
{
Console.WriteLine(json);
}
}
static async Task ProcessLinesAsync(Stream stream)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(stream, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
static async Task FillPipeAsync(Stream stream, PipeWriter writer)
{
const int minimumBufferSize = 4096;
while (true)
{
// Allocate at least 4096 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await stream.ReadAsync(memory);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
//LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
static private bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
static bool silent = false;
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object> action, object state)
{
_queue.Add((action, state));
}
}
static void Main(string[] args)
{
if (args.Length > 0)
{
if (args.Length > 1)
{
if (args[1] == "--silent")
{
silent = true;
}
}
if (File.Exists(args[0]))
{
Console.WriteLine("Read: " + args[0]);
using (FileStream fs = File.OpenRead(args[0]))
{
Stopwatch sw = new Stopwatch();
sw.Start();
Task task = ProcessLinesAsync(fs);
task.Wait();
sw.Stop();
Console.WriteLine("Finished in {0} seconds", sw.Elapsed.TotalSeconds);
}
}
else
{
Console.WriteLine("Failed to find file: " + args[0]);
}
}
else
{
Console.WriteLine("Incorrect usage: <filepath>");
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment