Skip to content

Instantly share code, notes, and snippets.

@LeeCampbell
Created December 12, 2012 16:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LeeCampbell/4269282 to your computer and use it in GitHub Desktop.
Save LeeCampbell/4269282 to your computer and use it in GitHub Desktop.
Thoughts on a simple Messaging API
using System;
namespace MyLib.Messaging
{
public interface IConsumer<out T>
{
///<summary>Allows non-destructive read access to the next message on the queue</summary>
T Peek();
///<summary>Transactional consumer. Requires a transaction scope to be accessed.</summary>
///<example><code><![[
///using(TransactionScope tran = new TransactionScope())
///{
/// var bookingRequest = _bookingRequestConsumer.Receive<BookingRequest>(TimeSpan.FromMinutes(1));
/// var booking = _bookingFactory.Create(bookingRequest);
/// var reciept = booking.Save();
/// _bookingReceiptPublisher.Publish(recipt);
/// tran.Commit();
///}
///]]></code></example>
T Receive(TimeSpan timeout);
}
public interface IListener<out T>
{
IObservable<T> ReceivedMessages();
}
public interface IMapper<in T, in TMessage>
{
void Map(T source, TMessage target);
}
public interface IPublisher<in T>
{
void Publish(T message);
}
public interface IRequestor<in TRequest, out TResponse>
{
IObservable<TResponse> Request(TRequest request);
}
//E.g.
//ProtocolBufferSerialzier<T> : ISerializer<T, byte[]>
//XmlSerializer<T> : ISerializer<T, XElement>
//JSONSerializer<T> : ISerializer<T, string>
public interface ISerializer<T, TSerialized>
{
TSerialized Serialize(T input);
T Deserialize(TSerialized input);
}
public interface ITranslator<in TInput, out TOutput>
{
TOutput Translate(TInput source);
}
}
namespace MyLib.Messaging.Http
{
public sealed class HttpRequestParameters
{
private readonly Uri _rootUrl;
private readonly NameValueCollection _queryStringParameters = new NameValueCollection();
private readonly NameValueCollection _postParameters = new NameValueCollection();
private readonly NameValueCollection _headers = new NameValueCollection();
public HttpRequestParameters(Uri rootUrl)
{
_rootUrl = rootUrl;
}
public Uri RootUrl
{
get { return _rootUrl; }
}
public NameValueCollection QueryStringParameters
{
get { return _queryStringParameters; }
}
public NameValueCollection PostParameters
{
get { return _postParameters; }
}
public NameValueCollection Headers
{
get { return _headers; }
}
public Uri ConstructUri()
{
var uriBuilder = new UriBuilder(RootUrl);
if (QueryStringParameters.Count > 0)
{
var queryString = System.Web.HttpUtility.ParseQueryString(string.Empty);
foreach (var key in QueryStringParameters.AllKeys)
{
queryString[key] = QueryStringParameters[key];
}
uriBuilder.Query = queryString.ToString(); // Returns "key1=value1&key2=value2", all URL-encoded
}
return uriBuilder.Uri;
}
public override string ToString()
{
var headers = ToString(Headers);
var post = ToString(PostParameters);
return string.Format("{0} HEADERS:{1} POST:{2}", ConstructUri(), headers, post);
}
private static string ToString(NameValueCollection nvc)
{
return string.Join(", ", nvc.AllKeys.Select(k => string.Format("[{0}:{1}]", k, nvc[k])));
}
}
//TODO: deal with potential Struct/Value return types
//TODO: provide cancelation feature
//TODO: Test POST, GET, Adding Headers, sending binary etc..
public abstract class HttpRequestor<TRequest, TResponse> : IRequestor<TRequest, TResponse>
{
private readonly IMapper<TRequest, HttpRequestParameters> _requestMapper;
private readonly ITranslator<HttpWebResponse, TResponse> _responseTranslator;
protected HttpRequestor(IMapper<TRequest, HttpRequestParameters> requestMapper, ITranslator<HttpWebResponse, TResponse> responseTranslator)
{
_requestMapper = requestMapper;
_responseTranslator = responseTranslator;
}
public IObservable<TResponse> Request(TRequest request)
{
return Observable.Create<TResponse>(
o =>
{
try
{
var httpRequestParams = new HttpRequestParameters(RequestUri);
_requestMapper.Map(request, httpRequestParams);
var webRequest = CreateRequest(httpRequestParams);
var webResponse = (HttpWebResponse)webRequest.GetResponse();
var response = _responseTranslator.Translate(webResponse);
if (response != null)
{
o.OnNext(response);
}
o.OnCompleted();
}
catch (Exception e)
{
o.OnError(e);
}
return Disposable.Empty;
});
}
protected virtual HttpWebRequest CreateRequest(HttpRequestParameters requestParameters)
{
var queryUri = requestParameters.ConstructUri();
var httpWebRequest = (HttpWebRequest)WebRequest.Create(queryUri);
httpWebRequest.Method = RequestMethod;
httpWebRequest.ContentType = RequestContentType;
//webRequest.Proxy = new System.Net.WebProxy(ProxyString, true); //true means no proxy
//Proxy can be added in overridden implementation?
foreach (var key in requestParameters.Headers.AllKeys)
{
httpWebRequest.Headers.Add(key, requestParameters.Headers[key]);
}
if (requestParameters.PostParameters.Count > 0)
{
var postArguments = System.Web.HttpUtility.ParseQueryString(string.Empty);
foreach (var key in requestParameters.PostParameters.AllKeys)
{
postArguments[key] = requestParameters.PostParameters[key];
}
using (var requestStream = httpWebRequest.GetRequestStream())
using (var writer = new StreamWriter(requestStream))
{
writer.Write(postArguments.ToString());
}
}
return httpWebRequest;
}
protected abstract Uri RequestUri { get; }
protected virtual string RequestMethod
{
get { return "GET"; }
}
protected virtual string RequestContentType
{
get { return null; }
}
}
}
/*
using System;
using System.IO;
using System.Net;
using System.Reactive;
using System.Reactive.Linq;
using NUnit.Framework;
namespace MyLib.Messaging.Http.IntegrationTests
{
public sealed class GoogleHttpRequestor : HttpRequestor<string, string>
{
public GoogleHttpRequestor(IMapper<string, HttpRequestParameters> requestMapper, ITranslator<WebResponse, string> responseTranslator)
: base(requestMapper, responseTranslator)
{
}
protected override Uri RequestUri
{
get { return new Uri("http://www.google.com"); }
}
}
public class GoogleRequestMapper : IMapper<string, HttpRequestParameters>
{
public void Map(string searchText, HttpRequestParameters target)
{
target.QueryStringParameters.Add("q", searchText);
}
}
public sealed class IwsHttpRequestor : HttpRequestor<Unit, byte[]>
{
public IwsHttpRequestor(IMapper<Unit, HttpRequestParameters> requestMapper, ITranslator<HttpWebResponse, byte[]> responseTranslator)
: base(requestMapper, responseTranslator)
{
}
protected override Uri RequestUri
{
get { return new Uri("http://lndvs415:8080/fixing/version"); } //Dev
//get { return new Uri("http://lndvs415.ln.rbccm.com:8090/fixing/version"); } //SIT (IWS QA)
//get { return new Uri("http://lnpss191.ln.rbccm.com:8080/fixing/version"); } //QA (IWS UAT)
}
protected override string RequestContentType
{
get { return "application/text"; }
}
}
public sealed class NullRequestMapper : IMapper<Unit, HttpRequestParameters>
{
public static readonly NullRequestMapper Instance = new NullRequestMapper();
public void Map(Unit _, HttpRequestParameters target)
{
}
}
public class WebResponseTranslator : ITranslator<WebResponse, string>
{
public string Translate(WebResponse source)
{
var responseStream = source.GetResponseStream();
if (responseStream == null) return null;
using (var reader = new StreamReader(responseStream))
{
var payload = reader.ReadToEnd();
return payload;
}
}
}
public class BinaryWebResponseTranslator : ITranslator<HttpWebResponse, byte[]>
{
public byte[] Translate(HttpWebResponse wr)
{
if (wr.StatusCode != HttpStatusCode.OK || wr.ContentLength <= 0)
return new byte[0];
using (var s = wr.GetResponseStream())
{
if (s == null)
throw new InvalidOperationException("WebResponse contained a null response stream");
//s.ReadTimeout = TimeoutMs;
long bufferPos = 0;
var buffer = new byte[wr.ContentLength];
int b;
while (bufferPos < buffer.Length && (b = s.ReadByte()) != -1)
buffer[bufferPos++] = (byte)b;
if (bufferPos != wr.ContentLength)
throw new InvalidOperationException("WebResponse failed to receive correct payload");
return buffer;
}
}
}
[TestFixture]
public class HttpGetRequestorFixture
{
//Need to test :
// query string e.g. http://www.google.com?qJames520Dean
// REST GET e.g. http://contacts.google.com/contact/12345/?json
// REST PUT e.g. http://contacts.google.com/contact/12345/?json
// REST POST? e.g. http://contacts.google.com/contact/12345/?json
// Post e.g. Submitting a form
// Post e.g. pushing binary data
[Test]
public void Should_hit_google_machine()
{
var requestor = new GoogleHttpRequestor(new GoogleRequestMapper(), new WebResponseTranslator());
var result = requestor.Request("James Dean").First();
Assert.AreEqual("blah", result);
}
[Test]
public void Should_hit_IWS_version_service()
{
var requestor = new IwsHttpRequestor(NullRequestMapper.Instance, new BinaryWebResponseTranslator());
var result = requestor.Request(Unit.Default).First();
foreach (var b in result)
{
Console.Write("[{0:x2}]", b);
}
Console.WriteLine();
Assert.AreEqual(8, result.Length);
}
}
}
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment