Created
August 19, 2012 04:30
-
-
Save PeteGoo/3392032 to your computer and use it in GitHub Desktop.
An implementation of a SignalR Persistent Connection in Pushqa that understands OData filters (no IObservable etc needed just filtering normal SignalR event streams)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Global : System.Web.HttpApplication { | |
protected void Application_Start(object sender, EventArgs e) { | |
// Register the SignalR route for our event context at /events | |
RouteTable.Routes.MapConnection<SampleUsageConnection>("chat", "chat/{*operation}"); | |
AreaRegistration.RegisterAllAreas(); | |
RegisterGlobalFilters(GlobalFilters.Filters); | |
RegisterRoutes(RouteTable.Routes); | |
} | |
/// Other normal stuff omitted | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Linq.Expressions; | |
using System.Reflection; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Linq2Rest.Parser; | |
using SignalR; | |
using SignalR.Infrastructure; | |
namespace Pushqa.Server.SignalR { | |
/// <summary> | |
/// A Persistent Connection that stores the initial connection query and filters any outbound messages accordingly | |
/// </summary> | |
public abstract class QueryablePersistentConnection : PersistentConnection { | |
private string filterString; | |
/// <summary> | |
/// Called when a new connection is made. | |
/// </summary> | |
/// <param name="request">The <see cref="T:SignalR.IRequest"/> for the current connection.</param><param name="connectionId">The id of the connecting client.</param> | |
/// <returns> | |
/// A <see cref="T:System.Threading.Tasks.Task"/> that completes when the connect operation is complete. | |
/// </returns> | |
protected override System.Threading.Tasks.Task OnConnectedAsync(IRequest request, string connectionId) { | |
filterString = request.QueryString.Get("$filter"); | |
if(Connection != null && Connection is FilteredConnection) { | |
(Connection as FilteredConnection).FilterString = filterString; | |
} | |
return base.OnConnectedAsync(request, connectionId); | |
} | |
/// <summary> | |
/// Creates a connection | |
/// </summary> | |
/// <param name="connectionId"></param> | |
/// <param name="groups"></param> | |
/// <param name="request"></param> | |
/// <returns></returns> | |
protected override global::SignalR.Connection CreateConnection(string connectionId, System.Collections.Generic.IEnumerable<string> groups, IRequest request) { | |
return new FilteredConnection(filterString, _messageBus, | |
_jsonSerializer, | |
DefaultSignal, | |
connectionId, | |
GetDefaultSignals(connectionId), | |
groups, | |
_trace); | |
} | |
internal class MessageContainer<T> { | |
public MessageContainer(T message) { | |
Message = message; | |
} | |
public T Message { get; private set; } | |
} | |
internal class FilteredConnection : Connection { | |
private string filterString; | |
private ConcurrentDictionary<Type,Func<object, bool>> typedFilters = new ConcurrentDictionary<Type, Func<object, bool>>(); | |
static FilteredConnection() { | |
createFilterGenericMethodDefn = | |
new Func<string, Func<object, bool>>(CreateFilter<MessageContainer<int>>).Method.GetGenericMethodDefinition(); | |
} | |
public FilteredConnection(string filterString, IMessageBus messageBus, IJsonSerializer jsonSerializer, string baseSignal, string connectionId, IEnumerable<string> signals, IEnumerable<string> groups, ITraceManager traceManager) : base(messageBus, jsonSerializer, baseSignal, connectionId, signals, groups, traceManager) { | |
this.filterString = filterString; | |
} | |
/// <summary> | |
/// The filter to apply to messages | |
/// </summary> | |
public string FilterString { | |
get { return filterString; } | |
set { | |
if(filterString == value) { | |
return; | |
} | |
filterString = value; | |
typedFilters.Clear(); | |
} | |
} | |
protected override List<object> ProcessResults(IList<Message> source) { | |
// override the default message processing to filter based on our query | |
List<object> processedResults = base.ProcessResults(source); | |
return processedResults.Where(m => GetFilter(m.GetType())(m) ).ToList(); | |
} | |
private Func<object, bool> GetFilter(Type messageType) { | |
return typedFilters.GetOrAdd(messageType, type => createFilterGenericMethodDefn.MakeGenericMethod(new Type[] { messageType }).Invoke(null, new object[] { FilterString }) as Func<object, bool>); | |
} | |
private static readonly MethodInfo createFilterGenericMethodDefn; | |
private static Func<object, bool> CreateFilter<T>(string filterString) { | |
Func<MessageContainer<T>, bool> filter; | |
if (!string.IsNullOrWhiteSpace(filterString)) { | |
try { | |
// Construct the filter expression | |
FilterExpressionFactory filterExpressionFactory = new FilterExpressionFactory(); | |
Expression<Func<MessageContainer<T>, bool>> expression = filterExpressionFactory.Create<MessageContainer<T>>(filterString); | |
filter = expression.Compile(); | |
} catch (Exception) { | |
// Could not create a valid expression for this type | |
filter = arg => true; | |
} | |
} | |
else { | |
// No filter was given | |
filter = arg => true; | |
} | |
return new Func<object, bool>(message => !(message is T) || filter(new MessageContainer<T>((T)message))); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@{ | |
ViewBag.Title = "Pushqa Samples: Queryable SignalR Persistent Connection"; | |
} | |
<h2>Pushqa Samples: Queryable SignalR Persistent Connection</h2> | |
<div>@Html.ActionLink("Samples Home", "Index")</div> | |
<p>This sample is basically a simple SignalR page that broadcasts input messages bu only subscribes to those messages that do not contain the substring "pie".</p> | |
<p>Uri: http://myserver/myapp/chat/?$filter=substringof('pie', Message) eq false</p> | |
<div> | |
<script type="text/javascript"> | |
$(function () { | |
// Setup our connection using a query | |
var connection = $.connection('../chat/', { $filter: "substringof('pie', Message) eq false" }); | |
// Append each message received | |
connection.received(function (data) { | |
$('#messages').append('<li>' + data + '</li>'); | |
}); | |
connection.starting(function () { | |
$('#messages').append("'<li>Starting Connection</li>"); | |
}); | |
connection.start(); | |
$("#broadcast").click(function() { | |
connection.send($('#msg').val()); | |
}); | |
}); | |
</script> | |
<input type="text" id="msg" /> | |
<input type="button" id="broadcast" value="broadcast" /> | |
<ul id="messages"> | |
</ul> | |
</div> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SampleUsageConnection : QueryablePersistentConnection { | |
protected override System.Threading.Tasks.Task OnReceivedAsync(IRequest request, string connectionId, string data) { | |
return Connection.Broadcast(data); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment