Skip to content

Instantly share code, notes, and snippets.

@karlosRivera
Forked from janderit/DatabaseObserver.cs
Created October 29, 2020 18:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save karlosRivera/c9aa53dcee51eef4e5ef38c9abc01260 to your computer and use it in GitHub Desktop.
Save karlosRivera/c9aa53dcee51eef4e5ef38c9abc01260 to your computer and use it in GitHub Desktop.
C# class to observe SQL Server databases via Service Broker / SqlDependency
public sealed class DatabaseObserver : IDisposable
{
private readonly string _connectionstring;
private readonly string _sqlcommand;
private readonly Action<Observation> _onChange;
private readonly Action<Exception> _onError;
private readonly Action<string> _onWarning;
private SqlConnection _connection;
private SqlCommand _cmd;
private SqlDependency _dependency;
public enum Observation
{
INSERT,
UPDATE,
DELETE
}
public DatabaseObserver(string connectionstring, string sqlcommand, Action<Observation> onChange, Action<Exception> onError, Action<string> onWarning = null)
{
if (connectionstring == null) throw new ArgumentNullException(nameof(connectionstring));
if (sqlcommand == null) throw new ArgumentNullException(nameof(sqlcommand));
if (onChange == null) throw new ArgumentNullException(nameof(onChange));
if (onError == null) throw new ArgumentNullException(nameof(onError));
_connectionstring = connectionstring;
_sqlcommand = sqlcommand;
_onChange = onChange;
_onError = onError;
_onWarning = onWarning ?? (warn => { });
}
private readonly object _lock = new object();
private bool _started = false;
private bool _primed = false;
public void Start()
{
lock (_lock)
{
SqlDependency.Start(_connectionstring);
_connection = new SqlConnection(_connectionstring);
_connection.Open();
_started = true;
Prime();
}
}
private void Prime()
{
lock (_lock)
{
if (!_started) return;
if (_primed) return;
_cmd = new SqlCommand(_sqlcommand)
{
Connection = _connection,
Notification = null
};
_dependency = new SqlDependency(_cmd);
_dependency.OnChange += Handle;
_cmd.ExecuteNonQueryAsync().Wait();
}
}
private void Handle(object sender, SqlNotificationEventArgs e)
{
try
{
switch (e.Info)
{
case SqlNotificationInfo.Alter:
case SqlNotificationInfo.AlreadyChanged:
case SqlNotificationInfo.Drop:
case SqlNotificationInfo.Error:
case SqlNotificationInfo.Expired:
case SqlNotificationInfo.Invalid:
case SqlNotificationInfo.Isolation:
case SqlNotificationInfo.Merge:
case SqlNotificationInfo.Options:
case SqlNotificationInfo.PreviousFire:
case SqlNotificationInfo.Query:
case SqlNotificationInfo.Resource:
case SqlNotificationInfo.Restart:
case SqlNotificationInfo.TemplateLimit:
case SqlNotificationInfo.Truncate:
case SqlNotificationInfo.Unknown:
_onWarning(e.Info.ToString());
break;
case SqlNotificationInfo.Insert:
_onChange(Observation.INSERT);
break;
case SqlNotificationInfo.Update:
_onChange(Observation.UPDATE);
break;
case SqlNotificationInfo.Delete:
_onChange(Observation.DELETE);
break;
}
lock (_lock)
{
Unprime();
Prime();
}
}
catch (Exception ex)
{
_onError(ex);
}
}
private void Unprime()
{
lock (_lock)
{
if (!_primed) return;
if (_cmd == null) return;
if (_dependency == null) return;
_dependency.OnChange -= Handle;
_dependency = null;
_cmd.Dispose();
_cmd = null;
_primed = false;
}
}
public void Stop()
{
lock (_lock)
{
if (!_started) return;
_started = false;
Unprime();
_connection.Close();
_connection.Dispose();
SqlDependency.Stop(_connectionstring);
}
}
public void Dispose()
{
Stop();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment