Skip to content

Instantly share code, notes, and snippets.

@ChrisRisner
Last active August 29, 2015 14:07
Show Gist options
  • Save ChrisRisner/bf96261b384f55fd0c29 to your computer and use it in GitHub Desktop.
Save ChrisRisner/bf96261b384f55fd0c29 to your computer and use it in GitHub Desktop.
OrleansSignalRAdventure
var hub = $.connection.playerHub;
hub.client.playerUpdate = function (message) {
processMessage(message);
}
hub.client.playerUpdates = function (messages) {
messages.Messages.forEach(processMessage);
}
$.connection.hub.start().done(function () {
console.log("listening for messages");
})
[StatelessWorker]
public interface IPushNotifierGrain : Orleans.IGrain
{
Task SendMessage(string message, string recipient);
}
public class PlayerHub : Hub
{
public void PlayerUpdate(ClientMessage message)
{
Clients.Group(message.Recipient).playerUpdate(message);
}
public void PlayerUpdates(ClientMessageBatch messages)
{
Clients.Group(messages.Messages[0].Recipient).playerUpdates(messages);
}
public override System.Threading.Tasks.Task OnConnected()
{
if (Context.Headers.Get("ORLEANS") != "GRAIN")
{
// This connection does not have the GRAIN header, so it must be a browser
// Therefore add this connection to the browser group
Groups.Add(Context.ConnectionId, "BROWSERS");
Cookie playerIdCookie;
bool foundCookie = Context.RequestCookies.TryGetValue("playerid", out playerIdCookie);
if (foundCookie)
{
Groups.Add(Context.ConnectionId, playerIdCookie.Value);
}
}
return base.OnConnected();
}
}
//Get Push grain
var notifier = PushNotifierGrainFactory.GetGrain(0);
//loop through players in the room grain
var playersInRoom = await this.State.roomGrain.GetPlayersInRoom();
for (int i = 0; i < playersInRoom.Length; i++)
{
Guid playerId = playersInRoom[i].GetPrimaryKey();
await notifier.SendMessage(this.State.monsterInfo.Name + " has left the room", playerId.ToString());
}
[Reentrant]
public class PushNotifierGrain : Orleans.Grain, IPushNotifierGrain
{
Dictionary<string, Tuple<HubConnection, IHubProxy>> hubs = new Dictionary<string, Tuple<HubConnection, IHubProxy>>();
List<ClientMessage> messageQueue = new List<ClientMessage>();
public override async Task ActivateAsync()
{
// set up a timer to regularly flush the message queue
this.RegisterTimer(FlushQueue, null, TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(100));
if (RoleEnvironment.IsAvailable)
{
// in azure
await RefreshHubs(null);
// set up a timer to regularly refresh the hubs, to respond to azure infrastructure changes
this.RegisterTimer(RefreshHubs, null, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(60));
}
else
{
Trace.TraceError("PushNotGrain Issue, RoleEnvironment not available");
// not in azure, the SignalR hub is running locally
await AddHub("http://localhost:48777/");
}
await base.ActivateAsync();
}
private async Task RefreshHubs(object _)
{
var addresses = new List<string>();
var tasks = new List<Task>();
// discover the current infrastructure
foreach (var instance in RoleEnvironment.Roles["AdventureTerreWebRole"].Instances)
{
var endpoint = instance.InstanceEndpoints["InternalSignalR"];
addresses.Add(string.Format("http://{0}", endpoint.IPEndpoint.ToString()));
}
var newHubs = addresses.Where(x => !hubs.Keys.Contains(x)).ToArray();
var deadHubs = hubs.Keys.Where(x => !addresses.Contains(x)).ToArray();
// remove dead hubs
foreach (var hub in deadHubs)
{
hubs.Remove(hub);
}
// add new hubs
foreach (var hub in newHubs)
{
tasks.Add(AddHub(hub));
}
await Task.WhenAll(tasks);
}
private Task FlushQueue(object _)
{
this.Flush();
return TaskDone.Done;
}
private async Task AddHub(string address)
{
// create a connection to a hub
var hubConnection = new HubConnection(address);
hubConnection.Headers.Add("ORLEANS", "GRAIN");
var hub = hubConnection.CreateHubProxy("PlayerHub");
await hubConnection.Start();
hubs.Add(address, new Tuple<HubConnection, IHubProxy>(hubConnection, hub));
}
public Task SendMessage(string message, string recipient)
{
// add a message to the send queue
//messageQueue.Add(message);
messageQueue.Add(new ClientMessage { Message = message, Recipient = recipient });
if (messageQueue.Count > 25)
{
// if the queue size is greater than 25, flush the queue
Flush();
}
return TaskDone.Done;
}
private void Flush()
{
if (messageQueue.Count == 0) return;
// send all messages to all SignalR hubs
var messagesToSend = messageQueue.ToArray();
messageQueue.Clear();
var promises = new List<Task>();
foreach (var hub in hubs.Values)
{
try
{
if (hub.Item1.State == ConnectionState.Connected)
{
hub.Item2.Invoke("PlayerUpdates", new ClientMessageBatch { Messages = messagesToSend });
}
else
{
hub.Item1.Start();
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
}
}
<Sites>
<Site name="Web">
<Bindings>
<Binding name="Endpoint1" endpointName="Endpoint1" />
<Binding name="InternalSignalR" endpointName="InternalSignalR"/>
</Bindings>
</Site>
</Sites>
<Endpoints>
<InputEndpoint name="Endpoint1" protocol="http" port="80" />
<InternalEndpoint name="InternalSignalR" protocol="http" port="81" />
</Endpoints>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment