Skip to content

Instantly share code, notes, and snippets.

@yreynhout
Created August 30, 2014 12:22
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yreynhout/1c2f0cd7aeb61e698ab0 to your computer and use it in GitHub Desktop.
Save yreynhout/1c2f0cd7aeb61e698ab0 to your computer and use it in GitHub Desktop.
SqlProjectors
public static class PlaceHolderForProjections {
public static readonly PortfolioProjection = TSql.Projection().
When<PortfolioAdded>(@event =>
TSql.NonQuery(
"INSERT INTO [Portfolio] (Id, Name) VALUES (@P1, @P2)",
new { P1 = TSql.Int(@event.Id), P2 = TSql.NVarChar(@event.Name, 40) }
).
When<PortfolioRemoved>(@event =>
TSql.NonQuery(
"DELETE FROM [Portfolio] WHERE Id = @P1",
new { P1 = TSql.Int(@event.Id) }
).
When<PortfolioRenamed>(@event =>
TSql.NonQuery(
"UPDATE [Portfolio] SET Name = @P2 WHERE Id = @P1",
new { P1 = TSql.Int(@event.Id), P2 = TSql.NVarChar(@event.Name, 40) }
).
Build();
// returns a SqlProjection which has 1 property: SqlProjectionHandler[] Handlers
// that's what we feed to the SqlProjectors (so I don't care whether you want to host
// one or a hundred projections in a projector).
}
using System;
using System.Data.Common;
namespace Paramol
{
/// <summary>
/// Represent a T-SQL non query statement.
/// </summary>
public class SqlNonQueryStatement
{
private readonly DbParameter[] _parameters;
private readonly string _text;
/// <summary>
/// Initializes a new instance of the <see cref="SqlNonQueryStatement" /> class.
/// </summary>
/// <param name="text">The text.</param>
/// <param name="parameters">The parameters.</param>
/// <exception cref="System.ArgumentNullException">
/// Thrown when <paramref name="text" /> or <paramref name="parameters" />
/// is <c>null</c>.
/// </exception>
public SqlNonQueryStatement(string text, DbParameter[] parameters)
{
if (text == null)
throw new ArgumentNullException("text");
if (parameters == null)
throw new ArgumentNullException("parameters");
//TODO: Remove this because it doesn't make sense when we're dealing with DbParameters
if (parameters.Length > Limits.MaxParameterCount)
throw new ArgumentException(
string.Format("The parameter count is limited to {0}.", Limits.MaxParameterCount),
"parameters");
_text = text;
_parameters = parameters;
}
/// <summary>
/// Gets the text.
/// </summary>
/// <value>
/// The text.
/// </value>
public string Text
{
get { return _text; }
}
/// <summary>
/// Gets the parameters.
/// </summary>
/// <value>
/// The parameters.
/// </value>
public DbParameter[] Parameters
{
get { return _parameters; }
}
}
}
using System;
using System.Collections.Generic;
using Paramol;
namespace Projac
{
/// <summary>
/// Represents a handler of a particular type of event.
/// </summary>
public class SqlProjectionHandler
{
private readonly Type _event;
private readonly Func<object, IEnumerable<SqlNonQueryStatement>> _handler;
/// <summary>
/// Initializes a new instance of the <see cref="SqlProjectionHandler" /> class.
/// </summary>
/// <param name="event">The event.</param>
/// <param name="handler">The handler.</param>
/// <exception cref="System.ArgumentNullException">
/// Throw when <paramref name="event" /> or <paramref name="handler" /> is
/// <c>null</c>.
/// </exception>
public SqlProjectionHandler(Type @event, Func<object, IEnumerable<SqlNonQueryStatement>> handler)
{
if (@event == null) throw new ArgumentNullException("event");
if (handler == null) throw new ArgumentNullException("handler");
_event = @event;
_handler = handler;
}
/// <summary>
/// The type of event to handle.
/// </summary>
public Type Event
{
get { return _event; }
}
/// <summary>
/// The function that handles the event.
/// </summary>
public Func<object, IEnumerable<SqlNonQueryStatement>> Handler
{
get { return _handler; }
}
}
}
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Projac
{
//Future changes:
//Batch by # of event, # of statements and/or time - at which point we're better off using Rx
//Transactionless variations
public class SimpleSqlProjector
{
private readonly IReadOnlyCollection<SqlProjectionHandler> _handlers;
private readonly ConnectionStringSettings _connectionStringSettings;
private readonly DbProviderFactory _dbProviderFactory;
public SimpleSqlProjector(IReadOnlyCollection<SqlProjectionHandler> handlers, ConnectionStringSettings connectionStringSettings)
{
if (handlers == null) throw new ArgumentNullException("handlers");
if (connectionStringSettings == null) throw new ArgumentNullException("connectionStringSettings");
_handlers = handlers;
_connectionStringSettings = connectionStringSettings;
_dbProviderFactory = DbProviderFactories.GetFactory(connectionStringSettings.ProviderName);
}
public void Project(object @event)
{
if (@event == null) throw new ArgumentNullException("event");
using (var statementEnumerator = _handlers.
Where(handler => handler.Event == @event.GetType()).
SelectMany(handler => handler.Handler(@event)).
GetEnumerator())
{
var movedToStatement = statementEnumerator.MoveNext();
if (!movedToStatement) return;
using (var connection = _dbProviderFactory.CreateConnection())
{
connection.ConnectionString = _connectionStringSettings.ConnectionString;
connection.Open();
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
using (var command = _dbProviderFactory.CreateCommand())
{
command.Connection = connection;
command.Transaction = transaction;
command.CommandType = CommandType.Text;
while (movedToStatement)
{
command.CommandText = statementEnumerator.Current.Text;
command.Parameters.Clear();
command.Parameters.AddRange(statementEnumerator.Current.Parameters);
command.ExecuteNonQuery();
movedToStatement = statementEnumerator.MoveNext();
}
}
transaction.Commit();
}
connection.Close();
}
}
}
public Task ProjectAsync(object @event)
{
return ProjectAsync(@event, CancellationToken.None);
}
public async Task ProjectAsync(object @event, CancellationToken cancellationToken)
{
if (@event == null) throw new ArgumentNullException("event");
using (var statementEnumerator = _handlers.
Where(handler => handler.Event == @event.GetType()).
SelectMany(handler => handler.Handler(@event)).
GetEnumerator())
{
var movedToStatement = statementEnumerator.MoveNext();
if (movedToStatement)
{
using (var connection = _dbProviderFactory.CreateConnection())
{
connection.ConnectionString = _connectionStringSettings.ConnectionString;
await connection.OpenAsync(cancellationToken);
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
using (var command = _dbProviderFactory.CreateCommand())
{
command.Connection = connection;
command.Transaction = transaction;
command.CommandType = CommandType.Text;
while (movedToStatement)
{
command.CommandText = statementEnumerator.Current.Text;
command.Parameters.Clear();
command.Parameters.AddRange(statementEnumerator.Current.Parameters);
await command.ExecuteNonQueryAsync(cancellationToken);
movedToStatement = statementEnumerator.MoveNext();
}
}
transaction.Commit();
}
connection.Close();
}
}
}
}
}
public class LookMaNoTransactionStreamingSqlProjector
{
private readonly IReadOnlyCollection<SqlProjectionHandler> _handlers;
private readonly ConnectionStringSettings _connectionStringSettings;
private readonly DbProviderFactory _dbProviderFactory;
public LookMaNoTransactionStreamingSqlProjector(IReadOnlyCollection<SqlProjectionHandler> handlers, ConnectionStringSettings connectionStringSettings)
{
if (handlers == null) throw new ArgumentNullException("handlers");
if (connectionStringSettings == null) throw new ArgumentNullException("connectionStringSettings");
_handlers = handlers;
_connectionStringSettings = connectionStringSettings;
_dbProviderFactory = DbProviderFactories.GetFactory(connectionStringSettings.ProviderName);
}
public void Project(IEnumerable<object> events)
{
if (events == null) throw new ArgumentNullException("events");
using (var eventEnumerator = events.GetEnumerator())
{
var movedToEvent = eventEnumerator.MoveNext();
if (!movedToEvent) return;
using (var connection = _dbProviderFactory.CreateConnection())
{
connection.ConnectionString = _connectionStringSettings.ConnectionString;
connection.Open();
using (var command = _dbProviderFactory.CreateCommand())
{
command.Connection = connection;
command.CommandType = CommandType.Text;
while (movedToEvent)
{
var @event = eventEnumerator.Current;
using (var statementEnumerator = _handlers.
Where(handler => handler.Event == @event.GetType()).
SelectMany(handler => handler.Handler(@event)).
GetEnumerator())
{
var movedToStatement = statementEnumerator.MoveNext();
if (movedToStatement)
{
while (movedToStatement)
{
command.CommandText = statementEnumerator.Current.Text;
command.Parameters.Clear();
command.Parameters.AddRange(statementEnumerator.Current.Parameters);
command.ExecuteNonQuery();
movedToStatement = statementEnumerator.MoveNext();
}
}
}
movedToEvent = eventEnumerator.MoveNext();
}
}
connection.Close();
}
}
}
public Task ProjectAsync(IEnumerable<object> events)
{
return ProjectAsync(events, CancellationToken.None);
}
public async Task ProjectAsync(IEnumerable<object> events, CancellationToken cancellationToken)
{
if (events == null) throw new ArgumentNullException("events");
using (var eventEnumerator = events.GetEnumerator())
{
var movedToEvent = eventEnumerator.MoveNext();
if (!movedToEvent) return;
using (var connection = _dbProviderFactory.CreateConnection())
{
connection.ConnectionString = _connectionStringSettings.ConnectionString;
await connection.OpenAsync(cancellationToken);
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
using (var command = _dbProviderFactory.CreateCommand())
{
command.Connection = connection;
command.Transaction = transaction;
command.CommandType = CommandType.Text;
while (movedToEvent)
{
var @event = eventEnumerator.Current;
using (var statementEnumerator = _handlers.
Where(handler => handler.Event == @event.GetType()).
SelectMany(handler => handler.Handler(@event)).
GetEnumerator())
{
var movedToStatement = statementEnumerator.MoveNext();
if (movedToStatement)
{
while (movedToStatement)
{
command.CommandText = statementEnumerator.Current.Text;
command.Parameters.Clear();
command.Parameters.AddRange(statementEnumerator.Current.Parameters);
await command.ExecuteNonQueryAsync(cancellationToken);
movedToStatement = statementEnumerator.MoveNext();
}
}
}
movedToEvent = eventEnumerator.MoveNext();
}
}
transaction.Commit();
}
connection.Close();
}
}
}
}
public class BigFatTransactionStreamingSqlProjector
{
private readonly IReadOnlyCollection<SqlProjectionHandler> _handlers;
private readonly ConnectionStringSettings _connectionStringSettings;
private readonly DbProviderFactory _dbProviderFactory;
public BigFatTransactionStreamingSqlProjector(IReadOnlyCollection<SqlProjectionHandler> handlers, ConnectionStringSettings connectionStringSettings)
{
if (handlers == null) throw new ArgumentNullException("handlers");
if (connectionStringSettings == null) throw new ArgumentNullException("connectionStringSettings");
_handlers = handlers;
_connectionStringSettings = connectionStringSettings;
_dbProviderFactory = DbProviderFactories.GetFactory(connectionStringSettings.ProviderName);
}
public void Project(IEnumerable<object> events)
{
if (events == null) throw new ArgumentNullException("events");
using (var eventEnumerator = events.GetEnumerator())
{
var movedToEvent = eventEnumerator.MoveNext();
if (!movedToEvent) return;
using (var connection = _dbProviderFactory.CreateConnection())
{
connection.ConnectionString = _connectionStringSettings.ConnectionString;
connection.Open();
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
using (var command = _dbProviderFactory.CreateCommand())
{
command.Connection = connection;
command.Transaction = transaction;
command.CommandType = CommandType.Text;
while (movedToEvent)
{
var @event = eventEnumerator.Current;
using (var statementEnumerator = _handlers.
Where(handler => handler.Event == @event.GetType()).
SelectMany(handler => handler.Handler(@event)).
GetEnumerator())
{
var movedToStatement = statementEnumerator.MoveNext();
if (movedToStatement)
{
while (movedToStatement)
{
command.CommandText = statementEnumerator.Current.Text;
command.Parameters.Clear();
command.Parameters.AddRange(statementEnumerator.Current.Parameters);
command.ExecuteNonQuery();
movedToStatement = statementEnumerator.MoveNext();
}
}
}
movedToEvent = eventEnumerator.MoveNext();
}
}
transaction.Commit();
}
connection.Close();
}
}
}
public Task ProjectAsync(IEnumerable<object> events)
{
return ProjectAsync(events, CancellationToken.None);
}
public async Task ProjectAsync(IEnumerable<object> events, CancellationToken cancellationToken)
{
if (events == null) throw new ArgumentNullException("events");
using (var eventEnumerator = events.GetEnumerator())
{
var movedToEvent = eventEnumerator.MoveNext();
if (!movedToEvent) return;
using (var connection = _dbProviderFactory.CreateConnection())
{
connection.ConnectionString = _connectionStringSettings.ConnectionString;
await connection.OpenAsync(cancellationToken);
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
using (var command = _dbProviderFactory.CreateCommand())
{
command.Connection = connection;
command.Transaction = transaction;
command.CommandType = CommandType.Text;
while (movedToEvent)
{
var @event = eventEnumerator.Current;
using (var statementEnumerator = _handlers.
Where(handler => handler.Event == @event.GetType()).
SelectMany(handler => handler.Handler(@event)).
GetEnumerator())
{
var movedToStatement = statementEnumerator.MoveNext();
if (movedToStatement)
{
while (movedToStatement)
{
command.CommandText = statementEnumerator.Current.Text;
command.Parameters.Clear();
command.Parameters.AddRange(statementEnumerator.Current.Parameters);
await command.ExecuteNonQueryAsync(cancellationToken);
movedToStatement = statementEnumerator.MoveNext();
}
}
}
movedToEvent = eventEnumerator.MoveNext();
}
}
transaction.Commit();
}
connection.Close();
}
}
}
}
public class TransactionPerEventStreamingSqlProjector
{
private readonly IReadOnlyCollection<SqlProjectionHandler> _handlers;
private readonly ConnectionStringSettings _connectionStringSettings;
private readonly DbProviderFactory _dbProviderFactory;
public TransactionPerEventStreamingSqlProjector(IReadOnlyCollection<SqlProjectionHandler> handlers, ConnectionStringSettings connectionStringSettings)
{
if (handlers == null) throw new ArgumentNullException("handlers");
if (connectionStringSettings == null) throw new ArgumentNullException("connectionStringSettings");
_handlers = handlers;
_connectionStringSettings = connectionStringSettings;
_dbProviderFactory = DbProviderFactories.GetFactory(connectionStringSettings.ProviderName);
}
public void Project(IEnumerable<object> events)
{
if (events == null) throw new ArgumentNullException("events");
using (var eventEnumerator = events.GetEnumerator())
{
var movedToEvent = eventEnumerator.MoveNext();
if (!movedToEvent) return;
while (movedToEvent)
{
var @event = eventEnumerator.Current;
using (var connection = _dbProviderFactory.CreateConnection())
{
connection.ConnectionString = _connectionStringSettings.ConnectionString;
connection.Open();
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
using (var command = _dbProviderFactory.CreateCommand())
{
command.Connection = connection;
command.Transaction = transaction;
command.CommandType = CommandType.Text;
using (var statementEnumerator = _handlers.
Where(handler => handler.Event == @event.GetType()).
SelectMany(handler => handler.Handler(@event)).
GetEnumerator())
{
var movedToStatement = statementEnumerator.MoveNext();
if (movedToStatement)
{
while (movedToStatement)
{
command.CommandText = statementEnumerator.Current.Text;
command.Parameters.Clear();
command.Parameters.AddRange(statementEnumerator.Current.Parameters);
command.ExecuteNonQuery();
movedToStatement = statementEnumerator.MoveNext();
}
}
}
}
transaction.Commit();
}
connection.Close();
}
movedToEvent = eventEnumerator.MoveNext();
}
}
}
}
public class TransactionPerEventObservingSqlProjector : IObserver<object>
{
private readonly IReadOnlyCollection<SqlProjectionHandler> _handlers;
private readonly ConnectionStringSettings _connectionStringSettings;
private readonly DbProviderFactory _dbProviderFactory;
public TransactionPerEventObservingSqlProjector(IReadOnlyCollection<SqlProjectionHandler> handlers, ConnectionStringSettings connectionStringSettings)
{
if (handlers == null) throw new ArgumentNullException("handlers");
if (connectionStringSettings == null) throw new ArgumentNullException("connectionStringSettings");
_handlers = handlers;
_connectionStringSettings = connectionStringSettings;
_dbProviderFactory = DbProviderFactories.GetFactory(connectionStringSettings.ProviderName);
}
public void OnNext(object value)
{
if (value == null) throw new ArgumentNullException("event");
using (var statementEnumerator = _handlers.
Where(handler => handler.Event == value.GetType()).
SelectMany(handler => handler.Handler(value)).
GetEnumerator())
{
var movedToStatement = statementEnumerator.MoveNext();
if (!movedToStatement) return;
using (var connection = _dbProviderFactory.CreateConnection())
{
connection.ConnectionString = _connectionStringSettings.ConnectionString;
connection.Open();
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
using (var command = _dbProviderFactory.CreateCommand())
{
command.Connection = connection;
command.Transaction = transaction;
command.CommandType = CommandType.Text;
while (movedToStatement)
{
command.CommandText = statementEnumerator.Current.Text;
command.Parameters.Clear();
command.Parameters.AddRange(statementEnumerator.Current.Parameters);
command.ExecuteNonQuery();
movedToStatement = statementEnumerator.MoveNext();
}
}
transaction.Commit();
}
connection.Close();
}
}
}
public void OnError(Exception error)
{
}
public void OnCompleted()
{
}
}
}
@yreynhout
Copy link
Author

Caller of these would be custom code that pulls events out of your favorite eventstore (whether it's NES or GES).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment