Skip to content

Instantly share code, notes, and snippets.

Created December 8, 2020 05:19
Show Gist options
  • Save instance-id/fc8c1a156a2fe27c7020c56042b4f5c5 to your computer and use it in GitHub Desktop.
Save instance-id/fc8c1a156a2fe27c7020c56042b4f5c5 to your computer and use it in GitHub Desktop.
Threaded text processor
// -- Found : ----
// -- User : displayName -------------------------------------
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading.Tasks;
namespace ConsoleApplication
public static class TextProcessor
// The paths to read and write
const string OldFilePath = @"C:\Users\Donavon\Desktop\old.sql";
const string NewFilePath = @"C:\Users\Donavon\Desktop\new.sql";
// The maximum number of lines we can read for parallel processing
// given the memory restrictions etc. Please set this to a number
// that is optimum for you.
static readonly int ExpectedMaxLines = (int)Math.Pow(2, 10);
// The data structures to hold the old and new lines
private static readonly BlockingCollection<string> DirtyLines = new BlockingCollection<string>(ExpectedMaxLines);
private static readonly BlockingCollection<string> CleanLines = new BlockingCollection<string>(ExpectedMaxLines);
// A common factory. Since all tasks are long running, this is enough.
private static readonly TaskFactory TaskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
public static void Main()
// Need to start one reader task which will read one line at a time and
// put that line in the BlockingCollection for parallel processing.
// We have started 1 reader task and multiple processor tasks
// Now we need to start a writer task that will write the cleaned lines to disk
var finalTask = BeginWriter();
// Since writer task is the task which will signify the end of the entire
// exercise of reading, processing and writing, we will wait till the
// writer task has finished too.
Task.WaitAll(new[] {finalTask});
Console.WriteLine("All text lines cleaned and written to disk.");
private static void BeginReader()
TaskFactory.StartNew(() =>
Console.WriteLine("Reader task initiated.");
using (var reader = new StreamReader(OldFilePath))
string line;
while ((line = reader.ReadLine()) != null)
private static void BeginParallelProcessing()
// Starting as many processor tasks as there are number of processors available
// on this machine. These tasks will return when there are no more lines to process
// Globally defined id, and a lock, for adding in the required lines.
var globalId = 1;
var idLock = new object();
for (var taskIndex = 0; taskIndex < Environment.ProcessorCount; taskIndex++)
TaskFactory.StartNew(() =>
while (!DirtyLines.IsCompleted)
string line, updatedLine;
if (!DirtyLines.TryTake(out line)) continue;
if (line.Contains("(''"))
int nextGlobalId;
lock (idLock)
nextGlobalId = globalId++;
updatedLine = line.Replace("('',", "('" + nextGlobalId + "',");
updatedLine = line;
// Adding a delay of 10 seconds to allow all processing tasks to finish
if (!CleanLines.IsAddingCompleted)
private static Task BeginWriter()
var finalTask = TaskFactory.StartNew(() =>
Console.WriteLine("Writer task initiated.");
using (var writer = new StreamWriter(NewFilePath))
while (!CleanLines.IsCompleted)
string cleanLine;
if (!CleanLines.TryTake(out cleanLine)) continue;
return finalTask;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment