Skip to content

Instantly share code, notes, and snippets.

@alanmcgovern
Created December 7, 2015 09:47
Show Gist options
  • Save alanmcgovern/9275bf1159630756cebe to your computer and use it in GitHub Desktop.
Save alanmcgovern/9275bf1159630756cebe to your computer and use it in GitHub Desktop.
async Task PlayAsync (IEnumerable<Track> tracks)
{
playerCancellation.Cancel ();
playerCancellation = new CancellationTokenSource ();
try {
var token = playerCancellation.Token;
foreach (var track in tracks) {
var stream = await FetchTrack (serverAddress, track.UUID, token).ConfigureAwait (false);
using (var player = new StreamingPlayer ())
await player.PlayAsync (stream, token);
}
} catch (TaskCanceledException) {
// We stopped it!
} catch (Exception ex) {
LoggingService.LogError (ex, "Unexpected exception playing a track");
}
}
async Task<Stream> FetchTrack (string serverAddress, int trackUuid, CancellationToken token)
{
using (var client = new HttpClient ()) {
var content = new StringContent (Messages.FetchTrack + trackUuid, Encoding.UTF8);
var response = await client.PostAsync (serverAddress, content, token);
return await response.Content.ReadAsStreamAsync ();
}
}
using System;
using AudioToolbox;
using System.Threading;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Tunez;
using System.IO;
namespace StreamingAudio
{
/// <summary>
/// A Class to hold the AudioBuffer with all setting together
/// </summary>
internal class AudioBuffer
{
public IntPtr Buffer { get; set; }
public List<AudioStreamPacketDescription> PacketDescriptions { get; set; }
public int CurrentOffset { get; set; }
public bool IsInUse { get; set; }
}
/// <summary>
/// Wrapper around OutputQueue and AudioFileStream to allow streaming of various filetypes
/// </summary>
public class StreamingPlayer : IDisposable
{
public event EventHandler Finished;
public event Action<OutputAudioQueue> OutputReady;
readonly object locker = new object ();
// the AudioToolbox decoder
AudioFileStream fileStream;
// The queue of buffers which are available to use
Queue<AudioBuffer> availableBuffers;
// All the buffers we created
Dictionary<IntPtr, AudioBuffer> outputBuffers;
public OutputAudioQueue outputQueue;
bool Started {
get; set;
}
public bool Paused {
get; private set;
}
public float Volume {
get {
return outputQueue.Volume;
}
set {
outputQueue.Volume = value;
}
}
/// <summary>
/// Defines the size for each buffer, when using a slow source use more buffers with lower buffersizes
/// </summary>
public int BufferSize {
get; private set;
}
TaskCompletionSource<bool> needsBufferTask;
Task<bool> NeedsBuffer (CancellationToken token)
{
lock (locker) {
if (!needsBufferTask.Task.IsCompleted)
token.Register (() => needsBufferTask.TrySetCanceled ());
return needsBufferTask.Task;
}
}
/// <summary>
/// Defines the maximum Number of Buffers to use, the count can only change after Reset is called or the
/// StreamingPlayback is freshly instantiated
/// </summary>
public int MaxBufferCount {
get; set;
}
public StreamingPlayer () : this (AudioFileType.MP3)
{
}
public StreamingPlayer (AudioFileType type)
{
BufferSize = 16 * 1024;
MaxBufferCount = 16;
fileStream = new AudioFileStream (type);
fileStream.PacketDecoded += AudioPacketDecoded;
fileStream.PropertyFound += AudioPropertyFound;
needsBufferTask = new TaskCompletionSource<bool> ();
needsBufferTask.SetResult (true);
}
void Reset ()
{
Started = false;
if (outputQueue != null) {
outputQueue.Stop (true);
outputQueue.Reset ();
foreach (AudioBuffer buf in outputBuffers.Values) {
buf.PacketDescriptions.Clear ();
outputQueue.FreeBuffer (buf.Buffer);
}
outputQueue.Dispose ();
availableBuffers = null;
outputBuffers = null;
outputQueue = null;
}
}
/// <summary>
/// Stops the OutputQueue
/// </summary>
public void Pause ()
{
Paused = true;
if (outputQueue.IsRunning)
outputQueue.Pause ();
}
/// <summary>
/// Starts the OutputQueue
/// </summary>
public void Resume ()
{
Paused = false;
if (!outputQueue.IsRunning)
outputQueue.Start ();
}
/// <summary>
/// Begins playing the audio stored in the supplied buffer
/// </summary>
/// <returns></returns>
/// <param name="buffer">Buffer.</param>
/// <param name="token">Token.</param>
public async Task PlayAsync (byte[] buffer, CancellationToken token)
{
return PlayAsync (new MemoryStream (buffer), token);
}
/// <summary>
/// Begins playing the audio stroed in the supplied stream
/// </summary>
/// <returns>The async.</returns>
/// <param name="stream">Stream.</param>
/// <param name="token">Token.</param>
public async Task PlayAsync (Stream stream, CancellationToken token)
{
Reset ();
var finishedTask = new TaskCompletionSource<bool> ();
Finished += (sender, e) => finishedTask.TrySetResult (true);
token.Register (() => finishedTask.TrySetCanceled ());
var buffer = new byte [BufferSize];
while (await NeedsBuffer (token).ConfigureAwait (false)) {
var read = await stream.ReadAsync (buffer, 0, buffer.Length, token).ConfigureAwait (false);
var lastPacket = read == 0 || stream.Position == stream.Length;
fileStream.ParseBytes (buffer, 0, read, lastPacket);
if (lastPacket) {
EnqueueBuffer ();
outputQueue.Stop (false);
break;
}
LoggingService.LogInfo ("Parsed the bytes");
}
await finishedTask.Task.ConfigureAwait (false);
}
public void Dispose ()
{
Dispose (true);
GC.SuppressFinalize (this);
}
/// <summary>
/// Cleaning up all the native Resource
/// </summary>
protected virtual void Dispose (bool disposing)
{
if (disposing) {
Reset ();
using (fileStream)
fileStream = null;
using (outputQueue)
outputQueue = null;
}
}
/// <summary>
/// Saving the decoded Packets to our active Buffer, if the Buffer is full queue it into the OutputQueue
/// and wait until another buffer gets freed up
/// </summary>
void AudioPacketDecoded (object sender, PacketReceivedEventArgs args)
{
lock (locker) {
foreach (var p in args.PacketDescriptions) {
AudioStreamPacketDescription pd = p;
var currentBuffer = GetOrCreateAudioBuffer ();
int left = BufferSize - currentBuffer.CurrentOffset;
if (left < pd.DataByteSize) {
LoggingService.LogInfo ("Enqueuing a buffer because it's full");
EnqueueBuffer ();
currentBuffer = GetOrCreateAudioBuffer ();
}
AudioQueue.FillAudioData (currentBuffer.Buffer, currentBuffer.CurrentOffset, args.InputData, (int)pd.StartOffset, pd.DataByteSize);
// Set new offset for this packet
pd.StartOffset = currentBuffer.CurrentOffset;
// Add the packet to our Buffer
currentBuffer.PacketDescriptions.Add (pd);
// Add the Size so that we know how much is in the buffer
currentBuffer.CurrentOffset += pd.DataByteSize;
}
}
}
/// <summary>
/// Enqueue the active buffer to the OutputQueue
/// </summary>
void EnqueueBuffer ()
{
lock (locker) {
var currentBuffer = availableBuffers.Dequeue ();
outputQueue.EnqueueBuffer (currentBuffer.Buffer, currentBuffer.CurrentOffset, currentBuffer.PacketDescriptions.ToArray ());
if (outputBuffers.Count - availableBuffers.Count > MaxBufferCount) {
LoggingService.LogInfo ("We do not want any more buffers!");
needsBufferTask = new TaskCompletionSource<bool> ();
}
if (!Started) {
Started = true;
Resume ();
}
}
}
/// <summary>
/// When a AudioProperty in the fed packets is found this callback is called
/// </summary>
void AudioPropertyFound (object sender, PropertyFoundEventArgs args)
{
lock (locker) {
if (args.Property == AudioFileStreamProperty.ReadyToProducePackets) {
Started = false;
if (outputQueue != null)
outputQueue.Dispose ();
availableBuffers = new Queue<AudioBuffer> ();
outputBuffers = new Dictionary<IntPtr, AudioBuffer> ();
outputQueue = new OutputAudioQueue (fileStream.StreamBasicDescription);
outputQueue.AddListener (AudioQueueProperty.IsRunning, property => {
if (property == AudioQueueProperty.IsRunning) {
if (!Paused && !outputQueue.IsRunning) {
try {
Finished (this, new EventArgs ());
} catch (Exception ex) {
LoggingService.LogError (ex, "Unhandled exception emitting the Finished event");
}
}
}
});
if (OutputReady != null)
OutputReady (outputQueue);
outputQueue.BufferCompleted += HandleBufferCompleted;
outputQueue.MagicCookie = fileStream.MagicCookie;
}
}
}
AudioBuffer GetOrCreateAudioBuffer ()
{
if (availableBuffers.Count == 0) {
LoggingService.LogInfo ("Creating a new buffer");
IntPtr outBuffer;
outputQueue.AllocateBuffer (BufferSize, out outBuffer);
var buffer = new AudioBuffer {
Buffer = outBuffer,
PacketDescriptions = new List<AudioStreamPacketDescription> ()
};
outputBuffers.Add (outBuffer, buffer);
availableBuffers.Enqueue (buffer);
}
return availableBuffers.Peek ();
}
/// <summary>
/// Is called when a buffer is completly read and can be queued for re-use
/// </summary>
void HandleBufferCompleted (object sender, BufferCompletedEventArgs e)
{
lock (locker) {
var buffer = outputBuffers [e.IntPtrBuffer];
buffer.PacketDescriptions.Clear ();
buffer.CurrentOffset = 0;
availableBuffers.Enqueue (buffer);
// Signal that we should try to fill up some more buffers with audio
// data. NOTE: When we parse a blob of data we will always create enough
// buffers to fit the decoded data. That could make us create more buffers
// than 'MaxBufferCount', so let's be careful to ensure we only start reading
// more input data once enough buffers have been freed up.
if (outputBuffers.Count - availableBuffers.Count < MaxBufferCount)
needsBufferTask.TrySetResult (true);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment