Skip to content

Instantly share code, notes, and snippets.

@newdigate
Last active December 18, 2015 14:36
Show Gist options
  • Save newdigate/f0d4636bab6a379e1578 to your computer and use it in GitHub Desktop.
Save newdigate/f0d4636bab6a379e1578 to your computer and use it in GitHub Desktop.
TaskParallel audio amp
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace TaskParallel
{
public class Class1
{
public static void Main2(string[] args)
{
CancellationTokenSource cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;
List<Task> destinationTasks = new List<Task>();
for (int j = 0; j < 1000; j++)
{
Task<Tuple<double[], long>> source = Task.Factory.StartNew(o =>
{
int[] parameters = (int[])o;
int sampleRate = parameters[0];
int t = parameters[1];
double[] audioBuffer = new double[sampleRate];
for (int i = 0; i < sampleRate; i++)
{
if (token.IsCancellationRequested)
break;
audioBuffer[i] = Math.Sin(440 * Math.PI * i * 2 / sampleRate);
}
Console.WriteLine("{2}: {0} samples written ({1})", sampleRate, t, Thread.CurrentThread.ManagedThreadId);
return new Tuple<double[], long>(audioBuffer, t);
}, new[] { 44100, j });
Task destination = Task.Factory.ContinueWhenAll(new[] { source }, input =>
{
if (token.IsCancellationRequested) return;
foreach (double d in source.Result.Item1)
{
if (token.IsCancellationRequested) break;
}
Console.WriteLine("{2}: {0} samples read ({1})", source.Result.Item1.Length, source.Result.Item2, Thread.CurrentThread.ManagedThreadId);
}, cancelSource.Token);
const double amplificationFactor = .5D;
Task<Tuple<double[], long>> amplifier = Task.Factory.ContinueWhenAll(new[] { source }, input =>
{
if (token.IsCancellationRequested) return new Tuple<double[], long>(null, 0);
double[] audioBuffer = new double[source.Result.Item1.Length];
for (int i = 0; i < source.Result.Item1.Length; i++)
{
if (token.IsCancellationRequested) break;
audioBuffer[i] = source.Result.Item1[i] * amplificationFactor;
}
Console.WriteLine("{2}: {0} samples amplified ({1})", source.Result.Item1.Length, source.Result.Item2, Thread.CurrentThread.ManagedThreadId);
return new Tuple<double[], long>(source.Result.Item1, source.Result.Item2);
});
Task d2 = Task.Factory.ContinueWhenAll(new[] { amplifier }, input =>
{
if (token.IsCancellationRequested) return;
foreach (double d in amplifier.Result.Item1)
{
if (token.IsCancellationRequested) break;
}
Console.WriteLine("{2}: {0} samples read from amp ({1})", source.Result.Item1.Length, source.Result.Item2, Thread.CurrentThread.ManagedThreadId);
});
destinationTasks.Add(d2);
}
Console.ReadLine();
cancelSource.Cancel();
foreach (Task t in destinationTasks)
{
t.Wait();
}
}
public static void Main(string[] args)
{
InputModule<long> numberGenerator = new InputModule<long> { Process = () => 1 };
InputModule<long> numberGenerator2 = new InputModule<long> { Process = () => 2 };
Module<Tuple<long, long>, long> adder = new Module<Tuple<long, long>, long> { Process = (a) => { return a.Item1 + a.Item2; } };
OutputModule<long> output = new OutputModule<long> { Process = (l) => Console.WriteLine(l) };
Task<long> t1 = Task.Factory.StartNew<long>(numberGenerator.Process);
Task<long> t2 = Task.Factory.StartNew<long>(numberGenerator2.Process);
Task<long> t3 = Task.Factory.ContinueWhenAll<long>(new[] { t1, t2 }, t => { return adder.Process(new Tuple<long, long>( ((Task<long>)t[0]).Result, ((Task<long>)t[1]).Result)); });
Task t4 = t3.ContinueWith(t => {
output.Process(t3.Result);
});
t4.Wait();
Console.ReadLine();
}
}
public class Module<TInput, TOutput>
{
public Func<TInput, TOutput> Process { get; set; }
}
public class InputModule<TOutput> : Module<bool, TOutput>
{
public Func<TOutput> Process
{
get
{
return () => base.Process(true);
}
set {
base.Process = (b) => value();
}
}
}
public class OutputModule<TInput> : Module<TInput, bool> {
public Action<TInput> Process
{
get
{
return (input) => base.Process(input);
}
set
{
base.Process = (input) => { value(input); return true; };
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment