Skip to content

Instantly share code, notes, and snippets.

@sleemer
Created October 3, 2016 18:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sleemer/94b53370ab8ff552d31689704601367a to your computer and use it in GitHub Desktop.
Save sleemer/94b53370ab8ff552d31689704601367a to your computer and use it in GitHub Desktop.
Basic implementation of TcpPublisher with Rx
public enum PublisherStatus
{
NotRunning,
Listenning,
Publishing,
Stopped,
Failed
}
using System;
using System.Collections.Immutable;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
public class TcpPublisher<T> : IDisposable
{
private readonly TcpListener _listener;
private ImmutableList<TcpClient> _clients = ImmutableList<TcpClient>.Empty;
private readonly IObservable<T> _source;
private CompositeDisposable _dispose = null;
private BehaviorSubject<PublisherStatus> _statuses = new BehaviorSubject<PublisherStatus>(PublisherStatus.NotRunning);
public TcpPublisher(IObservable<T> source, string address = null, int port = 7000)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
_source = source;
var ip = address == null ? IPAddress.Loopback : IPAddress.Parse(address);
_listener = new TcpListener(ip, port);
}
public void Start()
{
if (_dispose == null)
{
_listener.Start();
_statuses.OnNext(PublisherStatus.Listenning);
var connections = Observable.FromAsync(_listener.AcceptTcpClientAsync)
.Repeat()
.Subscribe(client => _clients = _clients.Add(client));
var subscription = _source.Subscribe(FanOut);
_dispose = new CompositeDisposable(
subscription,
connections,
Disposable.Create(() => _listener.Stop()),
Disposable.Create(() => _clients.ForEach(CloseClientConnection))
);
}
}
public void Stop()
{
if (_dispose != null)
{
_dispose.Dispose();
_dispose = null;
_statuses.OnNext(PublisherStatus.Stopped);
}
}
public void Dispose()
{
Stop();
}
public IObservable<PublisherStatus> GetStatusStream()
{
return _statuses.DistinctUntilChanged();
}
private void FanOut(T message)
{
string payload = JsonConvert.SerializeObject(message);
_clients.ForEach(async client => await SendMessageAsync(client, payload));
}
private async Task SendMessageAsync(TcpClient client, string payload)
{
try
{
_statuses.OnNext(PublisherStatus.Publishing);
using (var writer = new StreamWriter(client.GetStream(), Encoding.UTF8, 1024, true))
{
await writer.WriteLineAsync(payload);
}
}
catch
{
Console.WriteLine($"Error occured while sending message to client.");
_clients = _clients.Remove(client);
if (_clients.Count == 0)
{
_statuses.OnNext(PublisherStatus.Listenning);
}
}
}
private void CloseClientConnection(TcpClient client)
{
try
{
_clients = _clients.Remove(client);
client.Dispose();
}
catch
{
Console.WriteLine($"Error occured while closing connection with client.");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment