Skip to content

Instantly share code, notes, and snippets.

@PeteGoo
Created August 19, 2012 04:30
Show Gist options
  • Save PeteGoo/3392032 to your computer and use it in GitHub Desktop.
Save PeteGoo/3392032 to your computer and use it in GitHub Desktop.
An implementation of a SignalR Persistent Connection in Pushqa that understands OData filters (no IObservable etc needed just filtering normal SignalR event streams)
public class Global : System.Web.HttpApplication {
protected void Application_Start(object sender, EventArgs e) {
// Register the SignalR route for our event context at /events
RouteTable.Routes.MapConnection<SampleUsageConnection>("chat", "chat/{*operation}");
AreaRegistration.RegisterAllAreas();
RegisterGlobalFilters(GlobalFilters.Filters);
RegisterRoutes(RouteTable.Routes);
}
/// Other normal stuff omitted
}
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Linq2Rest.Parser;
using SignalR;
using SignalR.Infrastructure;
namespace Pushqa.Server.SignalR {
/// <summary>
/// A Persistent Connection that stores the initial connection query and filters any outbound messages accordingly
/// </summary>
public abstract class QueryablePersistentConnection : PersistentConnection {
private string filterString;
/// <summary>
/// Called when a new connection is made.
/// </summary>
/// <param name="request">The <see cref="T:SignalR.IRequest"/> for the current connection.</param><param name="connectionId">The id of the connecting client.</param>
/// <returns>
/// A <see cref="T:System.Threading.Tasks.Task"/> that completes when the connect operation is complete.
/// </returns>
protected override System.Threading.Tasks.Task OnConnectedAsync(IRequest request, string connectionId) {
filterString = request.QueryString.Get("$filter");
if(Connection != null && Connection is FilteredConnection) {
(Connection as FilteredConnection).FilterString = filterString;
}
return base.OnConnectedAsync(request, connectionId);
}
/// <summary>
/// Creates a connection
/// </summary>
/// <param name="connectionId"></param>
/// <param name="groups"></param>
/// <param name="request"></param>
/// <returns></returns>
protected override global::SignalR.Connection CreateConnection(string connectionId, System.Collections.Generic.IEnumerable<string> groups, IRequest request) {
return new FilteredConnection(filterString, _messageBus,
_jsonSerializer,
DefaultSignal,
connectionId,
GetDefaultSignals(connectionId),
groups,
_trace);
}
internal class MessageContainer<T> {
public MessageContainer(T message) {
Message = message;
}
public T Message { get; private set; }
}
internal class FilteredConnection : Connection {
private string filterString;
private ConcurrentDictionary<Type,Func<object, bool>> typedFilters = new ConcurrentDictionary<Type, Func<object, bool>>();
static FilteredConnection() {
createFilterGenericMethodDefn =
new Func<string, Func<object, bool>>(CreateFilter<MessageContainer<int>>).Method.GetGenericMethodDefinition();
}
public FilteredConnection(string filterString, IMessageBus messageBus, IJsonSerializer jsonSerializer, string baseSignal, string connectionId, IEnumerable<string> signals, IEnumerable<string> groups, ITraceManager traceManager) : base(messageBus, jsonSerializer, baseSignal, connectionId, signals, groups, traceManager) {
this.filterString = filterString;
}
/// <summary>
/// The filter to apply to messages
/// </summary>
public string FilterString {
get { return filterString; }
set {
if(filterString == value) {
return;
}
filterString = value;
typedFilters.Clear();
}
}
protected override List<object> ProcessResults(IList<Message> source) {
// override the default message processing to filter based on our query
List<object> processedResults = base.ProcessResults(source);
return processedResults.Where(m => GetFilter(m.GetType())(m) ).ToList();
}
private Func<object, bool> GetFilter(Type messageType) {
return typedFilters.GetOrAdd(messageType, type => createFilterGenericMethodDefn.MakeGenericMethod(new Type[] { messageType }).Invoke(null, new object[] { FilterString }) as Func<object, bool>);
}
private static readonly MethodInfo createFilterGenericMethodDefn;
private static Func<object, bool> CreateFilter<T>(string filterString) {
Func<MessageContainer<T>, bool> filter;
if (!string.IsNullOrWhiteSpace(filterString)) {
try {
// Construct the filter expression
FilterExpressionFactory filterExpressionFactory = new FilterExpressionFactory();
Expression<Func<MessageContainer<T>, bool>> expression = filterExpressionFactory.Create<MessageContainer<T>>(filterString);
filter = expression.Compile();
} catch (Exception) {
// Could not create a valid expression for this type
filter = arg => true;
}
}
else {
// No filter was given
filter = arg => true;
}
return new Func<object, bool>(message => !(message is T) || filter(new MessageContainer<T>((T)message)));
}
}
}
}
@{
ViewBag.Title = "Pushqa Samples: Queryable SignalR Persistent Connection";
}
<h2>Pushqa Samples: Queryable SignalR Persistent Connection</h2>
<div>@Html.ActionLink("Samples Home", "Index")</div>
<p>This sample is basically a simple SignalR page that broadcasts input messages bu only subscribes to those messages that do not contain the substring "pie".</p>
<p>Uri: http://myserver/myapp/chat/?$filter=substringof('pie', Message) eq false</p>
<div>
<script type="text/javascript">
$(function () {
// Setup our connection using a query
var connection = $.connection('../chat/', { $filter: "substringof('pie', Message) eq false" });
// Append each message received
connection.received(function (data) {
$('#messages').append('<li>' + data + '</li>');
});
connection.starting(function () {
$('#messages').append("'<li>Starting Connection</li>");
});
connection.start();
$("#broadcast").click(function() {
connection.send($('#msg').val());
});
});
</script>
<input type="text" id="msg" />
<input type="button" id="broadcast" value="broadcast" />
<ul id="messages">
</ul>
</div>
public class SampleUsageConnection : QueryablePersistentConnection {
protected override System.Threading.Tasks.Task OnReceivedAsync(IRequest request, string connectionId, string data) {
return Connection.Broadcast(data);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment