Last active
October 10, 2016 20:16
-
-
Save sleemer/70a2a838b1cbc9c7e635cbc348cdbf4c to your computer and use it in GitHub Desktop.
Rx implementation of TCP publisher.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Collections.Immutable; | |
using System.IO; | |
using System.Net; | |
using System.Net.Sockets; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Text; | |
using Newtonsoft.Json; | |
public static class TcpPublisherObservableExtensions | |
{ | |
/// <summary> | |
/// Broadcasts any message from <paramref name="source" /> stream to all TCP client connections. | |
/// <param name="source">Steam to broadcast via TCP</param> | |
/// <param name="ip">Ip address of the server</param> | |
/// <param name="port">Port number</param> | |
/// <param name="bufferSize">The buffer size, in bytes</param> | |
/// <param name="onError">Error callback</param> | |
/// <param name="onCompleted">Completion callback</param> | |
/// <param name="name">Name of the TCP publisher, is used for logging</param> | |
/// </summary> | |
public static IDisposable PublishViaTcp<T>( | |
this IObservable<T> source, | |
string ip = null, int port = 7000, int bufferSize = 64 * 1024, | |
Action<Exception> onError = null, | |
Action onCompleted = null, | |
string name = null) | |
{ | |
name = name ?? Guid.NewGuid().ToString(); | |
onError = onError ?? (_ => { }); | |
onCompleted = onCompleted ?? (() => { }); | |
var ipAddress = ip == null ? IPAddress.Loopback : IPAddress.Parse(ip); | |
var listener = new TcpListener(ipAddress, port); | |
listener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1); | |
listener.Start(); | |
Console.WriteLine($"{DateTimeOffset.UtcNow.ToLocalTime()} '{name}': started publishing."); | |
var clients = new ConcurrentDictionary<TcpClient, StreamWriter>(); | |
Action<KeyValuePair<TcpClient, StreamWriter>> closeClientConnection = client => | |
{ | |
StreamWriter writer; | |
if (clients.TryRemove(client.Key, out writer)) | |
{ | |
Console.WriteLine($"{DateTimeOffset.UtcNow.ToLocalTime()} '{name}': {client.Key.Client.RemoteEndPoint.ToString()} disconnected."); | |
client.Key.Dispose(); | |
writer.Dispose(); | |
} | |
}; | |
Action<KeyValuePair<TcpClient, StreamWriter>, string> sendMessageToClient = async (client, message) => | |
{ | |
try | |
{ | |
await client.Value.WriteLineAsync(message); | |
await client.Value.FlushAsync(); | |
} | |
catch (Exception) | |
{ | |
closeClientConnection(client); | |
} | |
}; | |
var connections = Observable | |
.FromAsync(listener.AcceptTcpClientAsync) | |
.Do(client => Console.WriteLine($"{DateTimeOffset.UtcNow.ToLocalTime()} '{name}': new connection from {client.Client.RemoteEndPoint.ToString()}.")) | |
.Repeat(); | |
var messages = Observable.Using( | |
() => connections.Subscribe(client => clients.TryAdd(client, new StreamWriter(client.GetStream(), Encoding.UTF8, bufferSize, true))), | |
_ => source.Select(item => JsonConvert.SerializeObject(item))); | |
return new CompositeDisposable( | |
messages.Subscribe( | |
message => clients.ToImmutableList().ForEach(client => sendMessageToClient(client, message)), | |
onError, | |
onCompleted | |
), | |
Disposable.Create(() => listener.Stop()), | |
Disposable.Create(() => clients.ToImmutableList().ForEach(closeClientConnection)), | |
Disposable.Create(() => Console.WriteLine($"{DateTimeOffset.UtcNow.ToLocalTime()} '{name}': stopped publishing.")) | |
); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment