Created
July 13, 2024 11:45
-
-
Save bostelk/6fc4c38a0dfaa9b5b4fe1fec9c934f5e to your computer and use it in GitHub Desktop.
Parse a large JSON Lines file in a high performance manner with minimal memory overhead.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Buffers; | |
using System.IO; | |
using System.IO.Pipelines; | |
using System.Threading.Tasks; | |
using System.Text; | |
//using Newtonsoft.Json; | |
using System.Diagnostics; | |
using System.Collections.Concurrent; | |
using System.Threading; | |
using Utf8Json; | |
namespace ConsoleApp1 | |
{ | |
class Program | |
{ | |
static async Task ReadPipeAsync(PipeReader reader) | |
{ | |
while (true) | |
{ | |
ReadResult result = await reader.ReadAsync(); | |
ReadOnlySequence<byte> buffer = result.Buffer; | |
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line)) | |
{ | |
// Process the line. | |
ProcessLine(line); | |
} | |
// Tell the PipeReader how much of the buffer has been consumed. | |
reader.AdvanceTo(buffer.Start, buffer.End); | |
// Stop reading if there's no more data coming. | |
if (result.IsCompleted) | |
{ | |
break; | |
} | |
} | |
// Mark the PipeReader as complete. | |
await reader.CompleteAsync(); | |
} | |
static private void ProcessLine(ReadOnlySequence<byte> line) | |
{ | |
string converted = Encoding.UTF8.GetString(line); | |
//var json = JsonConvert.DeserializeObject(converted); slow | |
var json = JsonSerializer.Deserialize<dynamic>(converted); | |
if (!silent) | |
{ | |
Console.WriteLine(json); | |
} | |
} | |
static async Task ProcessLinesAsync(Stream stream) | |
{ | |
var pipe = new Pipe(); | |
Task writing = FillPipeAsync(stream, pipe.Writer); | |
Task reading = ReadPipeAsync(pipe.Reader); | |
await Task.WhenAll(reading, writing); | |
} | |
static async Task FillPipeAsync(Stream stream, PipeWriter writer) | |
{ | |
const int minimumBufferSize = 4096; | |
while (true) | |
{ | |
// Allocate at least 4096 bytes from the PipeWriter. | |
Memory<byte> memory = writer.GetMemory(minimumBufferSize); | |
try | |
{ | |
int bytesRead = await stream.ReadAsync(memory); | |
if (bytesRead == 0) | |
{ | |
break; | |
} | |
// Tell the PipeWriter how much was read from the Socket. | |
writer.Advance(bytesRead); | |
} | |
catch (Exception ex) | |
{ | |
//LogError(ex); | |
break; | |
} | |
// Make the data available to the PipeReader. | |
FlushResult result = await writer.FlushAsync(); | |
if (result.IsCompleted) | |
{ | |
break; | |
} | |
} | |
// By completing PipeWriter, tell the PipeReader that there's no more data coming. | |
await writer.CompleteAsync(); | |
} | |
static private bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line) | |
{ | |
// Look for a EOL in the buffer. | |
SequencePosition? position = buffer.PositionOf((byte)'\n'); | |
if (position == null) | |
{ | |
line = default; | |
return false; | |
} | |
// Skip the line + the \n. | |
line = buffer.Slice(0, position.Value); | |
buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); | |
return true; | |
} | |
static bool silent = false; | |
// This is a sample scheduler that async callbacks on a single dedicated thread. | |
public class SingleThreadPipeScheduler : PipeScheduler | |
{ | |
private readonly BlockingCollection<(Action<object> Action, object State)> _queue = | |
new BlockingCollection<(Action<object> Action, object State)>(); | |
private readonly Thread _thread; | |
public SingleThreadPipeScheduler() | |
{ | |
_thread = new Thread(DoWork); | |
_thread.Start(); | |
} | |
private void DoWork() | |
{ | |
foreach (var item in _queue.GetConsumingEnumerable()) | |
{ | |
item.Action(item.State); | |
} | |
} | |
public override void Schedule(Action<object> action, object state) | |
{ | |
_queue.Add((action, state)); | |
} | |
} | |
static void Main(string[] args) | |
{ | |
if (args.Length > 0) | |
{ | |
if (args.Length > 1) | |
{ | |
if (args[1] == "--silent") | |
{ | |
silent = true; | |
} | |
} | |
if (File.Exists(args[0])) | |
{ | |
Console.WriteLine("Read: " + args[0]); | |
using (FileStream fs = File.OpenRead(args[0])) | |
{ | |
Stopwatch sw = new Stopwatch(); | |
sw.Start(); | |
Task task = ProcessLinesAsync(fs); | |
task.Wait(); | |
sw.Stop(); | |
Console.WriteLine("Finished in {0} seconds", sw.Elapsed.TotalSeconds); | |
} | |
} | |
else | |
{ | |
Console.WriteLine("Failed to find file: " + args[0]); | |
} | |
} | |
else | |
{ | |
Console.WriteLine("Incorrect usage: <filepath>"); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment