Skip to content

Instantly share code, notes, and snippets.

@danbarua
Created February 18, 2014 15:32
Show Gist options
  • Save danbarua/9073209 to your computer and use it in GitHub Desktop.
Save danbarua/9073209 to your computer and use it in GitHub Desktop.
Reactive Freeswitch ESL Parsing
/// <summary>The observable extensions.</summary>
public static class ObservableExtensions
{
/// <summary>Aggregates a Stream using the supplied Aggregator until the given predicate is true</summary>
/// <param name="source">The source.</param>
/// <param name="seed">The seed.</param>
/// <param name="accumulator">The accumulator.</param>
/// <param name="predicate">A predicate which indicates whether the aggregation is completed.</param>
/// <typeparam name="TSource">The Type of the Source stream.</typeparam>
/// <typeparam name="TAccumulate">The Type of the Accumulator.</typeparam>
/// <returns>The <see cref="IObservable{T}"/>.</returns>
public static IObservable<TAccumulate> AggregateUntil<TSource, TAccumulate>(
this IObservable<TSource> source,
Func<TAccumulate> seed,
Func<TAccumulate, TSource, TAccumulate> accumulator,
Func<TAccumulate, bool> predicate)
{
return Observable.Create<TAccumulate>(observer =>
{
var accumulate = seed();
return source.Subscribe(
value =>
{
accumulate = accumulator(accumulate, value);
if (predicate(accumulate))
{
observer.OnNext(accumulate);
accumulate = seed();
}
},
observer.OnError,
observer.OnCompleted);
});
}
}
/// <summary>
/// A simple state-machine for parsing incoming EventSocket messages.
/// </summary>
public class Parser
{
private readonly StringBuilder buffer = new StringBuilder(); // StringBuilder in .Net 4 uses a Linked List internally to avoid expensive reallocations. Faster but uses marginally more memory.
private char previous;
private int? contentLength;
private IDictionary<string, string> headers;
/// <summary>
/// Gets a value indicating whether parsing an incoming message has completed.
/// </summary>
public bool Completed { get; private set; }
/// <summary>
/// Gets a value indicating whether the incoming message has a body.
/// </summary>
public bool HasBody
{
get
{
return contentLength.HasValue;
}
}
/// <summary>
/// Appends the given <see cref="char"/> to the message.
/// </summary>
/// <param name="next">The next <see cref="char"/> of the message.</param>
/// <returns>The same instance of the <see cref="Parser"/>.</returns>
public Parser Append(char next)
{
if (Completed)
{
return new Parser().Append(next);
}
buffer.Append(next);
if (!HasBody)
{
// we're parsing the headers
if (previous == '\n' && next == '\n')
{
// \n\n denotes the end of the Headers
headers = buffer.ToString().ParseKeyValuePairs("\n", ": ");
if (headers.ContainsKey(HeaderNames.ContentLength))
{
contentLength = int.Parse(headers[HeaderNames.ContentLength]);
// start parsing the body content
buffer.Clear();
// allocate the buffer up front given that we now know the expected size
buffer.EnsureCapacity(contentLength.Value);
}
else
{
// end of message
Completed = true;
}
}
else
{
previous = next;
}
}
else
{
// if we've read the Content-Length amount of bytes then we're done
Completed = buffer.Length == contentLength.GetValueOrDefault();
}
return this;
}
public BasicMessage ParseMessage()
{
if (!Completed)
throw new InvalidOperationException("The message was not completely parsed.");
return HasBody ? new BasicMessage(headers, buffer.ToString()) : new BasicMessage(headers);
}
}
public static class ParserExtensions
{
public static IObservable<BasicMessage> ExtractBasicMessages(
this IObservable<byte[]> byteStream)
{
return byteStream.SelectMany(x => Encoding.ASCII.GetString(x)).ExtractBasicMessages();
}
public static IObservable<BasicMessage> ExtractBasicMessages(
this IObservable<char> charStream)
{
return
charStream.AggregateUntil(() => new Parser(), (builder, ch) => builder.Append(ch), builder => builder.Completed)
.Select(builder => builder.ParseMessage());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment