Skip to content

Instantly share code, notes, and snippets.

@sleemer
Last active October 10, 2016 20:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sleemer/70a2a838b1cbc9c7e635cbc348cdbf4c to your computer and use it in GitHub Desktop.
Save sleemer/70a2a838b1cbc9c7e635cbc348cdbf4c to your computer and use it in GitHub Desktop.
Rx implementation of TCP publisher.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using Newtonsoft.Json;
public static class TcpPublisherObservableExtensions
{
/// <summary>
/// Broadcasts any message from <paramref name="source" /> stream to all TCP client connections.
/// <param name="source">Steam to broadcast via TCP</param>
/// <param name="ip">Ip address of the server</param>
/// <param name="port">Port number</param>
/// <param name="bufferSize">The buffer size, in bytes</param>
/// <param name="onError">Error callback</param>
/// <param name="onCompleted">Completion callback</param>
/// <param name="name">Name of the TCP publisher, is used for logging</param>
/// </summary>
public static IDisposable PublishViaTcp<T>(
this IObservable<T> source,
string ip = null, int port = 7000, int bufferSize = 64 * 1024,
Action<Exception> onError = null,
Action onCompleted = null,
string name = null)
{
name = name ?? Guid.NewGuid().ToString();
onError = onError ?? (_ => { });
onCompleted = onCompleted ?? (() => { });
var ipAddress = ip == null ? IPAddress.Loopback : IPAddress.Parse(ip);
var listener = new TcpListener(ipAddress, port);
listener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1);
listener.Start();
Console.WriteLine($"{DateTimeOffset.UtcNow.ToLocalTime()} '{name}': started publishing.");
var clients = new ConcurrentDictionary<TcpClient, StreamWriter>();
Action<KeyValuePair<TcpClient, StreamWriter>> closeClientConnection = client =>
{
StreamWriter writer;
if (clients.TryRemove(client.Key, out writer))
{
Console.WriteLine($"{DateTimeOffset.UtcNow.ToLocalTime()} '{name}': {client.Key.Client.RemoteEndPoint.ToString()} disconnected.");
client.Key.Dispose();
writer.Dispose();
}
};
Action<KeyValuePair<TcpClient, StreamWriter>, string> sendMessageToClient = async (client, message) =>
{
try
{
await client.Value.WriteLineAsync(message);
await client.Value.FlushAsync();
}
catch (Exception)
{
closeClientConnection(client);
}
};
var connections = Observable
.FromAsync(listener.AcceptTcpClientAsync)
.Do(client => Console.WriteLine($"{DateTimeOffset.UtcNow.ToLocalTime()} '{name}': new connection from {client.Client.RemoteEndPoint.ToString()}."))
.Repeat();
var messages = Observable.Using(
() => connections.Subscribe(client => clients.TryAdd(client, new StreamWriter(client.GetStream(), Encoding.UTF8, bufferSize, true))),
_ => source.Select(item => JsonConvert.SerializeObject(item)));
return new CompositeDisposable(
messages.Subscribe(
message => clients.ToImmutableList().ForEach(client => sendMessageToClient(client, message)),
onError,
onCompleted
),
Disposable.Create(() => listener.Stop()),
Disposable.Create(() => clients.ToImmutableList().ForEach(closeClientConnection)),
Disposable.Create(() => Console.WriteLine($"{DateTimeOffset.UtcNow.ToLocalTime()} '{name}': stopped publishing."))
);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment