Skip to content

Instantly share code, notes, and snippets.

@hrannzo
Created October 11, 2016 20:33
Show Gist options
  • Save hrannzo/7f242fff20959dbcb5f198a7c98e572d to your computer and use it in GitHub Desktop.
Save hrannzo/7f242fff20959dbcb5f198a7c98e572d to your computer and use it in GitHub Desktop.
class Program
{
static void Main(string[] args)
{
using (var store = WireupEventStore())
{
// do some stuff
}
}
private static IStoreEvents WireupEventStore()
{
return Wireup
.Init()
.UsingSqlPersistence("EventStore")
.WithDialect(new MsSqlDialect().WithSchemaPrefix("my_schema"))
//.WithDialect(new MsSqlDialect().WithTableMapping("MyCommits", "MySnapshots"))
.InitializeStorageEngine()
.Build();
}
}
public static class MsSqlDialectExtensions
{
public static ISqlDialect WithSchemaPrefix(this MsSqlDialect self, string schema)
{
if (self == null)
throw new ArgumentNullException(nameof(self));
if (string.IsNullOrWhiteSpace(schema))
throw new ArgumentException("Value cannot be null or whitespace.", nameof(schema));
return new SqlDialectTransformer(self, s => SchemaTransform(s, schema));
}
public static ISqlDialect WithTableMapping(this MsSqlDialect self, string newCommitsTableName, string newSnapshotsTableName)
{
if (self == null)
throw new ArgumentNullException(nameof(self));
if (string.IsNullOrWhiteSpace(newCommitsTableName))
throw new ArgumentException("Value cannot be null or whitespace.", nameof(newCommitsTableName));
if (string.IsNullOrWhiteSpace(newSnapshotsTableName))
throw new ArgumentException("Value cannot be null or whitespace.", nameof(newSnapshotsTableName));
return new SqlDialectTransformer(self, s => TableTransform(s, newCommitsTableName, newSnapshotsTableName));
}
private static string SchemaTransform(string statement, string schema)
{
var t = statement.Replace("[dbo].[Commits]", "Commits")
.Replace("[dbo].[Snapshots]", "Snapshots")
.Replace("[Commits]", "Commits")
.Replace("[Snapshots]", "Snapshots")
.Replace(" Commits", $" {schema}.Commits")
.Replace(" Snapshots", $" {schema}.Snapshots")
.Replace("'Commits'", $"'{schema}.Commits'")
.Replace("'Snapshots'", $"'{schema}.Snapshots'")
.Replace("_Commits_", $"_{schema}Commits_")
.Replace("_Snapshots_", $"_{schema}Snapshots_")
.Replace("PK_Commits", $"PK_{schema}Commits")
.Replace("PK_Snapshots", $"PK_{schema}Snapshots");
return t;
}
private static string TableTransform(string statement, string commitsTableName, string snapshotsTableName)
{
var t = statement.Replace("[dbo].[Commits]", commitsTableName)
.Replace("[dbo].[Snapshots]", snapshotsTableName)
.Replace("[Commits]", commitsTableName)
.Replace("[Snapshots]", snapshotsTableName)
.Replace(" Commits", $" {commitsTableName}")
.Replace(" Snapshots", $" {commitsTableName}")
.Replace("'Commits'", $"'{commitsTableName}'")
.Replace("'Snapshots'", $"'{snapshotsTableName}'")
.Replace("_Commits_", $"_{commitsTableName}_")
.Replace("_Snapshots_", $"_{snapshotsTableName}_")
.Replace("PK_Commits", $"PK_{commitsTableName}")
.Replace("PK_Snapshots", $"PK_{snapshotsTableName}");
return t;
}
}
public class SqlDialectTransformer : ISqlDialect
{
private readonly ISqlDialect _dialect;
private readonly Func<string, string> _transformation;
private readonly ConcurrentDictionary<string, string> _cache = new ConcurrentDictionary<string, string>();
public SqlDialectTransformer(ISqlDialect dialect, Func<string, string> transformation)
{
if (dialect == null)
throw new ArgumentNullException(nameof(dialect));
if (transformation == null)
throw new ArgumentNullException(nameof(transformation));
_dialect = dialect;
_transformation = transformation;
}
private string Transform(string statement)
{
return _cache.GetOrAdd(statement, _transformation);
}
public object CoalesceParameterValue(object value)
{
return _dialect.CoalesceParameterValue(value);
}
public IDbTransaction OpenTransaction(IDbConnection connection)
{
return _dialect.OpenTransaction(connection);
}
public bool IsDuplicate(Exception exception)
{
return _dialect.IsDuplicate(exception);
}
public void AddPayloadParamater(IConnectionFactory connectionFactory, IDbConnection connection, IDbStatement cmd, byte[] payload)
{
_dialect.AddPayloadParamater(connectionFactory, connection, cmd, payload);
}
public DateTime ToDateTime(object value)
{
return _dialect.ToDateTime(value);
}
public IDbStatement BuildStatement(TransactionScope scope, IDbConnection connection, IDbTransaction transaction)
{
return _dialect.BuildStatement(scope, connection, transaction);
}
public string InitializeStorage => Transform(_dialect.InitializeStorage);
public string GetSnapshot => Transform(_dialect.GetSnapshot);
public string GetCommitsFromStartingRevision => Transform(_dialect.GetCommitsFromStartingRevision);
public string GetCommitsFromInstant => Transform(_dialect.GetCommitsFromInstant);
public string GetCommitsFromToInstant => Transform(_dialect.GetCommitsFromToInstant);
public string PersistCommit => Transform(_dialect.PersistCommit);
public string GetCommitsFromCheckpoint => Transform(_dialect.GetCommitsFromCheckpoint);
public string GetCommitsFromBucketAndCheckpoint => Transform(_dialect.GetCommitsFromBucketAndCheckpoint);
public string GetUndispatchedCommits => Transform(_dialect.GetUndispatchedCommits);
public string GetStreamsRequiringSnapshots => Transform(_dialect.GetStreamsRequiringSnapshots);
public string PurgeStorage => Transform(_dialect.PurgeStorage);
public string PurgeBucket => Transform(_dialect.PurgeBucket);
public string Drop => Transform(_dialect.Drop);
public string DeleteStream => Transform(_dialect.DeleteStream);
public string DuplicateCommit => Transform(_dialect.DuplicateCommit);
public string AppendSnapshotToCommit => Transform(_dialect.AppendSnapshotToCommit);
public string MarkCommitAsDispatched => Transform(_dialect.MarkCommitAsDispatched);
public string BucketId => Transform(_dialect.BucketId);
public string StreamId => Transform(_dialect.StreamId);
public string StreamIdOriginal => Transform(_dialect.StreamIdOriginal);
public string StreamRevision => Transform(_dialect.StreamRevision);
public string MaxStreamRevision => Transform(_dialect.MaxStreamRevision);
public string Items => Transform(_dialect.Items);
public string CommitId => Transform(_dialect.CommitId);
public string CommitSequence => Transform(_dialect.CommitSequence);
public string CommitStamp => Transform(_dialect.CommitStamp);
public string CommitStampStart => Transform(_dialect.CommitStampStart);
public string CommitStampEnd => Transform(_dialect.CommitStampEnd);
public string Headers => Transform(_dialect.Headers);
public string Payload => Transform(_dialect.Payload);
public string Threshold => Transform(_dialect.Threshold);
public string Limit => Transform(_dialect.Limit);
public string Skip => Transform(_dialect.Skip);
public string CheckpointNumber => Transform(_dialect.CheckpointNumber);
public bool CanPage => _dialect.CanPage;
public NextPageDelegate NextPageDelegate => _dialect.NextPageDelegate;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment