Skip to content

Instantly share code, notes, and snippets.

@rdavisau
Last active August 29, 2015 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rdavisau/e7aa53749e08e13329c6 to your computer and use it in GitHub Desktop.
Save rdavisau/e7aa53749e08e13329c6 to your computer and use it in GitHub Desktop.
LINQPad script demonstrating how to use service discovery and the JsonProtocolMessenger
<Query Kind="Program">
<NuGetReference>Newtonsoft.Json</NuGetReference>
<NuGetReference Prerelease="true">rda.SocketHelpers</NuGetReference>
<Namespace>SocketHelpers.Discovery</Namespace>
<Namespace>SocketHelpers.Messaging</Namespace>
<Namespace>Sockets.Plugin</Namespace>
<Namespace>Splat</Namespace>
<Namespace>System.Reactive.Linq</Namespace>
<Namespace>System.Threading.Tasks</Namespace>
</Query>
async Task Main()
{
// hook into logging in case things Go Wrong which is not entirely unlikely
Locator.CurrentMutable.RegisterConstant(new LINQPadLogger(), typeof(ILogger));
var server = new MyServer();
var client = new MyClient();
await server.StartHosting(50023);
await client.DoItAll();
}
#region Server and Client
public class MyServer
{
private TcpSocketListener _listener;
private ServicePublisher<TypedServiceDefinition<string,MyServiceReponsePayload>> _publisher; // this verboseness will be fixed
public async Task StartHosting(int port)
{
// first we actually start listening, then we can start 'publishing' availability
await StartSocketListener(port);
await StartPublishingService(port);
}
public async Task StopHosting()
{
// stop publishing, then unbind the listener
await _listener.StopListeningAsync();
await _publisher.Unpublish();
}
private Task StartSocketListener(int port)
{
// set up a tcpsocketlistener for new connections
// when we get one, attach a jsonprotocolmessenger
// and dump the message stream
_listener = new TcpSocketListener();
// set up the connection event handler
_listener.ConnectionReceived += (sender, args) =>
{
var client = args.SocketClient;
// we got a new connection, held in args.SocketClient
// we wrap it in a JsonProtocolMessenger
// Message is our 'base' message class, it could be 'object' if we wanted
// Adding your running assembly to AdditionalTypeResolutionAssemblies seems to be required for any message resolution
// will look into that..
var messenger = new JsonProtocolMessenger<Message>(client)
{
AdditionalTypeResolutionAssemblies = { typeof(Message).Assembly }
};
// here just showing how you can filter your messages
Util.HorizontalRun("Message,MessageSubclass",
messenger.Messages.Where(m=> m.GetType() == typeof(Message)),
messenger.Messages.OfType<MessageSubclass>())
.Dump(String.Format("Messages received from the client at {0}:{1}", client.RemoteAddress, client.RemotePort));
// might as well log when clients are disconnected
messenger.Disconnected += (dsender, dargs) => dargs.Dump(String.Format("{0}:{1} Disconnected {2}", client.RemoteAddress, client.RemotePort));
// nothing happens unless you call StartExecuting
messenger.StartExecuting();
};
return _listener.StartListeningAsync(port);
}
private Task StartPublishingService(int port)
{
// create an instance of MyServiceDefinition
// then generate a publisher
var serviceDef = new MyServiceDefinition(port);
_publisher = serviceDef.CreateServicePublisher();
// start 'publishing'
// (actually it's listening for discovery reqs)
return _publisher.Publish();
}
}
public class MyClient
{
private TcpSocketClient _client;
private JsonProtocolMessenger<Message> _messenger;
public async Task DoItAll()
{
// first discover
var connectionDetails = await Discover();
var host = connectionDetails.RemoteAddress;
var port = connectionDetails.ServicePort;
// then connect
await Connect(host, port);
// then send a few bits and pieces before disconnecting
var r = new Random();
var names = new [] { "alice", "bob", "charlie", "danielle", "ester", "fabian", "geoffery" };
Observable
.Interval(TimeSpan.FromSeconds(1))
.Take(5)
.Subscribe(onNext: _=> _messenger.Send(new Message(names.RandomItem())),
onCompleted: async () =>
{
// send a MessageSubclass, then disconnect
_messenger.Send(new MessageSubclass(names.RandomItem(), Enumerable.Range(0,4).Select(_=> r.Next(0,100)).ToList(), new MyServiceDefinition(port)));
await Disconnect();
});
}
public async Task<MyServiceReponsePayload> Discover()
{
// create instance of service definition
// generate a discoverer from it
var serviceDef = new MyServiceDefinition();
var discoverer = serviceDef.CreateServiceDiscoverer();
discoverer.SendOnAllInterfaces = true;
// probably don't actually do this bit like this
// i'm taking shortcuts
// send out a discovery request every 500ms
var canceller = new CancellationTokenSource();
Task.Run(async ()=> {
while (!canceller.IsCancellationRequested)
{
await discoverer.Discover();
await Task.Delay(TimeSpan.FromSeconds(.5));
}
}, canceller.Token);
// DiscoveredServices pumps out the service responses
// in this demo case we just want the first one and we'll
// try to connect to it
//
// because i took shortcuts we might miss the first response
// given we start watching /after/ setting up the discovery loop
// (but we'll get the next one in 500ms)
return
await discoverer
.DiscoveredServices
.Take(1)
.Do(_=> canceller.Cancel()); // ha ha sneaky side effects
}
private async Task Connect(string host, int port)
{
// first connect the TcpSocketClient
_client = new TcpSocketClient();
await _client.ConnectAsync(host, port);
// then attach the JsonProtocolMessenger
_messenger = new JsonProtocolMessenger<Message>(_client)
{
AdditionalTypeResolutionAssemblies = { typeof(Message).Assembly }
};
// gotta call start executing
_messenger.StartExecuting();
}
private Task Disconnect()
{
// you don't have to call StopExecuting if you disconnect
// feels a bit lop-sided, api wise
return _messenger.Disconnect(DisconnectionType.Graceful);
}
}
#endregion
#region Service Definition
public class MyServiceReponsePayload : IDiscoveryPayload
{
public string RemoteAddress { get; set; }
public int ServicePort { get; set; }
// implement explicitly because we don't need this here
int IDiscoveryPayload.RemotePort { get; set; }
public MyServiceReponsePayload(int servicePort)
{
ServicePort = servicePort;
}
}
// the service discovery is useful for situations where you can't know ip addresses ahead of time
// e.g. multiplayer mobile game
// the service definition defines the discovery protocol and discovery ports
// and clients use udp to broadcast to request details from any hosts (TPayloadFormat)
// host can then send back details of the service (TPayloadFormat) like address, port, and any other metadata that is useful
// for a game it might be the name of the game, number of current players, etc.
// here we just take a basic string request and return a tuple with IP:port to demonstrate a 'complex' payload
public class MyServiceDefinition : JsonSerializedServiceDefinition<string, MyServiceReponsePayload>
{
const string requestString = "Hi anyone there?";
public int ServicePort { get; set; }
public MyServiceDefinition()
{
}
public MyServiceDefinition(int actualServicePort)
{
ServicePort = actualServicePort;
}
public override string DiscoveryRequest()
{
return requestString;
}
public override MyServiceReponsePayload ResponseFor(string request)
{
if (request == requestString)
{
// this is legit, let's send back our details
return new MyServiceReponsePayload(ServicePort);
}
else
{
// don't recognize this request, we might be getting hacked
// if we returns null, nothing is sent back
// it's as if no one was there..
return null;
}
}
}
#endregion
#region Message Classes
public class Message
{
public DateTime SentAt { get; set; }
public string Name { get; set; }
public Message(string name)
{
Name = name;
SentAt = DateTime.UtcNow;
}
}
// just to demonstrate a 'more complex' type
public class MessageSubclass : Message
{
public List<int> Numbers { get; set; }
public MyServiceDefinition ServiceDefinition { get; set; }
public MessageSubclass(string content, List<int> numbers, MyServiceDefinition sdef) : base(content)
{
Numbers = numbers;
ServiceDefinition = sdef;
}
}
#endregion
#region Logger
public class LINQPadLogger : ILogger
{
public LogLevel Level { get; set; }
public void Write(string message, LogLevel level)
{
if (level < Level)
return;
var color = "";
switch (level)
{
case LogLevel.Debug:
color = "grey";
break;
case LogLevel.Info:
color = "green";
break;
case LogLevel.Warn:
color = "orange";
break;
case LogLevel.Error:
color = "red";
break;
case LogLevel.Fatal:
color = "red";
break;
}
Util.WithStyle(message, String.Format("color: {0}", color)).Dump();
}
}
#endregion
#region Ext
public static class Ext
{
private static Random _r = new Random();
public static T RandomItem<T>(this T[] collection)
{
return collection[_r.Next(0, collection.Length - 1)];
}
}
#endregion
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment