Created
October 11, 2016 20:33
-
-
Save hrannzo/7f242fff20959dbcb5f198a7c98e572d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
using (var store = WireupEventStore()) | |
{ | |
// do some stuff | |
} | |
} | |
private static IStoreEvents WireupEventStore() | |
{ | |
return Wireup | |
.Init() | |
.UsingSqlPersistence("EventStore") | |
.WithDialect(new MsSqlDialect().WithSchemaPrefix("my_schema")) | |
//.WithDialect(new MsSqlDialect().WithTableMapping("MyCommits", "MySnapshots")) | |
.InitializeStorageEngine() | |
.Build(); | |
} | |
} | |
public static class MsSqlDialectExtensions | |
{ | |
public static ISqlDialect WithSchemaPrefix(this MsSqlDialect self, string schema) | |
{ | |
if (self == null) | |
throw new ArgumentNullException(nameof(self)); | |
if (string.IsNullOrWhiteSpace(schema)) | |
throw new ArgumentException("Value cannot be null or whitespace.", nameof(schema)); | |
return new SqlDialectTransformer(self, s => SchemaTransform(s, schema)); | |
} | |
public static ISqlDialect WithTableMapping(this MsSqlDialect self, string newCommitsTableName, string newSnapshotsTableName) | |
{ | |
if (self == null) | |
throw new ArgumentNullException(nameof(self)); | |
if (string.IsNullOrWhiteSpace(newCommitsTableName)) | |
throw new ArgumentException("Value cannot be null or whitespace.", nameof(newCommitsTableName)); | |
if (string.IsNullOrWhiteSpace(newSnapshotsTableName)) | |
throw new ArgumentException("Value cannot be null or whitespace.", nameof(newSnapshotsTableName)); | |
return new SqlDialectTransformer(self, s => TableTransform(s, newCommitsTableName, newSnapshotsTableName)); | |
} | |
private static string SchemaTransform(string statement, string schema) | |
{ | |
var t = statement.Replace("[dbo].[Commits]", "Commits") | |
.Replace("[dbo].[Snapshots]", "Snapshots") | |
.Replace("[Commits]", "Commits") | |
.Replace("[Snapshots]", "Snapshots") | |
.Replace(" Commits", $" {schema}.Commits") | |
.Replace(" Snapshots", $" {schema}.Snapshots") | |
.Replace("'Commits'", $"'{schema}.Commits'") | |
.Replace("'Snapshots'", $"'{schema}.Snapshots'") | |
.Replace("_Commits_", $"_{schema}Commits_") | |
.Replace("_Snapshots_", $"_{schema}Snapshots_") | |
.Replace("PK_Commits", $"PK_{schema}Commits") | |
.Replace("PK_Snapshots", $"PK_{schema}Snapshots"); | |
return t; | |
} | |
private static string TableTransform(string statement, string commitsTableName, string snapshotsTableName) | |
{ | |
var t = statement.Replace("[dbo].[Commits]", commitsTableName) | |
.Replace("[dbo].[Snapshots]", snapshotsTableName) | |
.Replace("[Commits]", commitsTableName) | |
.Replace("[Snapshots]", snapshotsTableName) | |
.Replace(" Commits", $" {commitsTableName}") | |
.Replace(" Snapshots", $" {commitsTableName}") | |
.Replace("'Commits'", $"'{commitsTableName}'") | |
.Replace("'Snapshots'", $"'{snapshotsTableName}'") | |
.Replace("_Commits_", $"_{commitsTableName}_") | |
.Replace("_Snapshots_", $"_{snapshotsTableName}_") | |
.Replace("PK_Commits", $"PK_{commitsTableName}") | |
.Replace("PK_Snapshots", $"PK_{snapshotsTableName}"); | |
return t; | |
} | |
} | |
public class SqlDialectTransformer : ISqlDialect | |
{ | |
private readonly ISqlDialect _dialect; | |
private readonly Func<string, string> _transformation; | |
private readonly ConcurrentDictionary<string, string> _cache = new ConcurrentDictionary<string, string>(); | |
public SqlDialectTransformer(ISqlDialect dialect, Func<string, string> transformation) | |
{ | |
if (dialect == null) | |
throw new ArgumentNullException(nameof(dialect)); | |
if (transformation == null) | |
throw new ArgumentNullException(nameof(transformation)); | |
_dialect = dialect; | |
_transformation = transformation; | |
} | |
private string Transform(string statement) | |
{ | |
return _cache.GetOrAdd(statement, _transformation); | |
} | |
public object CoalesceParameterValue(object value) | |
{ | |
return _dialect.CoalesceParameterValue(value); | |
} | |
public IDbTransaction OpenTransaction(IDbConnection connection) | |
{ | |
return _dialect.OpenTransaction(connection); | |
} | |
public bool IsDuplicate(Exception exception) | |
{ | |
return _dialect.IsDuplicate(exception); | |
} | |
public void AddPayloadParamater(IConnectionFactory connectionFactory, IDbConnection connection, IDbStatement cmd, byte[] payload) | |
{ | |
_dialect.AddPayloadParamater(connectionFactory, connection, cmd, payload); | |
} | |
public DateTime ToDateTime(object value) | |
{ | |
return _dialect.ToDateTime(value); | |
} | |
public IDbStatement BuildStatement(TransactionScope scope, IDbConnection connection, IDbTransaction transaction) | |
{ | |
return _dialect.BuildStatement(scope, connection, transaction); | |
} | |
public string InitializeStorage => Transform(_dialect.InitializeStorage); | |
public string GetSnapshot => Transform(_dialect.GetSnapshot); | |
public string GetCommitsFromStartingRevision => Transform(_dialect.GetCommitsFromStartingRevision); | |
public string GetCommitsFromInstant => Transform(_dialect.GetCommitsFromInstant); | |
public string GetCommitsFromToInstant => Transform(_dialect.GetCommitsFromToInstant); | |
public string PersistCommit => Transform(_dialect.PersistCommit); | |
public string GetCommitsFromCheckpoint => Transform(_dialect.GetCommitsFromCheckpoint); | |
public string GetCommitsFromBucketAndCheckpoint => Transform(_dialect.GetCommitsFromBucketAndCheckpoint); | |
public string GetUndispatchedCommits => Transform(_dialect.GetUndispatchedCommits); | |
public string GetStreamsRequiringSnapshots => Transform(_dialect.GetStreamsRequiringSnapshots); | |
public string PurgeStorage => Transform(_dialect.PurgeStorage); | |
public string PurgeBucket => Transform(_dialect.PurgeBucket); | |
public string Drop => Transform(_dialect.Drop); | |
public string DeleteStream => Transform(_dialect.DeleteStream); | |
public string DuplicateCommit => Transform(_dialect.DuplicateCommit); | |
public string AppendSnapshotToCommit => Transform(_dialect.AppendSnapshotToCommit); | |
public string MarkCommitAsDispatched => Transform(_dialect.MarkCommitAsDispatched); | |
public string BucketId => Transform(_dialect.BucketId); | |
public string StreamId => Transform(_dialect.StreamId); | |
public string StreamIdOriginal => Transform(_dialect.StreamIdOriginal); | |
public string StreamRevision => Transform(_dialect.StreamRevision); | |
public string MaxStreamRevision => Transform(_dialect.MaxStreamRevision); | |
public string Items => Transform(_dialect.Items); | |
public string CommitId => Transform(_dialect.CommitId); | |
public string CommitSequence => Transform(_dialect.CommitSequence); | |
public string CommitStamp => Transform(_dialect.CommitStamp); | |
public string CommitStampStart => Transform(_dialect.CommitStampStart); | |
public string CommitStampEnd => Transform(_dialect.CommitStampEnd); | |
public string Headers => Transform(_dialect.Headers); | |
public string Payload => Transform(_dialect.Payload); | |
public string Threshold => Transform(_dialect.Threshold); | |
public string Limit => Transform(_dialect.Limit); | |
public string Skip => Transform(_dialect.Skip); | |
public string CheckpointNumber => Transform(_dialect.CheckpointNumber); | |
public bool CanPage => _dialect.CanPage; | |
public NextPageDelegate NextPageDelegate => _dialect.NextPageDelegate; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment