Last active
December 18, 2015 14:36
-
-
Save newdigate/f0d4636bab6a379e1578 to your computer and use it in GitHub Desktop.
TaskParallel audio amp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Runtime.InteropServices; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace TaskParallel | |
{ | |
public class Class1 | |
{ | |
public static void Main2(string[] args) | |
{ | |
CancellationTokenSource cancelSource = new CancellationTokenSource(); | |
CancellationToken token = cancelSource.Token; | |
List<Task> destinationTasks = new List<Task>(); | |
for (int j = 0; j < 1000; j++) | |
{ | |
Task<Tuple<double[], long>> source = Task.Factory.StartNew(o => | |
{ | |
int[] parameters = (int[])o; | |
int sampleRate = parameters[0]; | |
int t = parameters[1]; | |
double[] audioBuffer = new double[sampleRate]; | |
for (int i = 0; i < sampleRate; i++) | |
{ | |
if (token.IsCancellationRequested) | |
break; | |
audioBuffer[i] = Math.Sin(440 * Math.PI * i * 2 / sampleRate); | |
} | |
Console.WriteLine("{2}: {0} samples written ({1})", sampleRate, t, Thread.CurrentThread.ManagedThreadId); | |
return new Tuple<double[], long>(audioBuffer, t); | |
}, new[] { 44100, j }); | |
Task destination = Task.Factory.ContinueWhenAll(new[] { source }, input => | |
{ | |
if (token.IsCancellationRequested) return; | |
foreach (double d in source.Result.Item1) | |
{ | |
if (token.IsCancellationRequested) break; | |
} | |
Console.WriteLine("{2}: {0} samples read ({1})", source.Result.Item1.Length, source.Result.Item2, Thread.CurrentThread.ManagedThreadId); | |
}, cancelSource.Token); | |
const double amplificationFactor = .5D; | |
Task<Tuple<double[], long>> amplifier = Task.Factory.ContinueWhenAll(new[] { source }, input => | |
{ | |
if (token.IsCancellationRequested) return new Tuple<double[], long>(null, 0); | |
double[] audioBuffer = new double[source.Result.Item1.Length]; | |
for (int i = 0; i < source.Result.Item1.Length; i++) | |
{ | |
if (token.IsCancellationRequested) break; | |
audioBuffer[i] = source.Result.Item1[i] * amplificationFactor; | |
} | |
Console.WriteLine("{2}: {0} samples amplified ({1})", source.Result.Item1.Length, source.Result.Item2, Thread.CurrentThread.ManagedThreadId); | |
return new Tuple<double[], long>(source.Result.Item1, source.Result.Item2); | |
}); | |
Task d2 = Task.Factory.ContinueWhenAll(new[] { amplifier }, input => | |
{ | |
if (token.IsCancellationRequested) return; | |
foreach (double d in amplifier.Result.Item1) | |
{ | |
if (token.IsCancellationRequested) break; | |
} | |
Console.WriteLine("{2}: {0} samples read from amp ({1})", source.Result.Item1.Length, source.Result.Item2, Thread.CurrentThread.ManagedThreadId); | |
}); | |
destinationTasks.Add(d2); | |
} | |
Console.ReadLine(); | |
cancelSource.Cancel(); | |
foreach (Task t in destinationTasks) | |
{ | |
t.Wait(); | |
} | |
} | |
public static void Main(string[] args) | |
{ | |
InputModule<long> numberGenerator = new InputModule<long> { Process = () => 1 }; | |
InputModule<long> numberGenerator2 = new InputModule<long> { Process = () => 2 }; | |
Module<Tuple<long, long>, long> adder = new Module<Tuple<long, long>, long> { Process = (a) => { return a.Item1 + a.Item2; } }; | |
OutputModule<long> output = new OutputModule<long> { Process = (l) => Console.WriteLine(l) }; | |
Task<long> t1 = Task.Factory.StartNew<long>(numberGenerator.Process); | |
Task<long> t2 = Task.Factory.StartNew<long>(numberGenerator2.Process); | |
Task<long> t3 = Task.Factory.ContinueWhenAll<long>(new[] { t1, t2 }, t => { return adder.Process(new Tuple<long, long>( ((Task<long>)t[0]).Result, ((Task<long>)t[1]).Result)); }); | |
Task t4 = t3.ContinueWith(t => { | |
output.Process(t3.Result); | |
}); | |
t4.Wait(); | |
Console.ReadLine(); | |
} | |
} | |
public class Module<TInput, TOutput> | |
{ | |
public Func<TInput, TOutput> Process { get; set; } | |
} | |
public class InputModule<TOutput> : Module<bool, TOutput> | |
{ | |
public Func<TOutput> Process | |
{ | |
get | |
{ | |
return () => base.Process(true); | |
} | |
set { | |
base.Process = (b) => value(); | |
} | |
} | |
} | |
public class OutputModule<TInput> : Module<TInput, bool> { | |
public Action<TInput> Process | |
{ | |
get | |
{ | |
return (input) => base.Process(input); | |
} | |
set | |
{ | |
base.Process = (input) => { value(input); return true; }; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment