Skip to content

Instantly share code, notes, and snippets.

@adbrowne
Last active August 29, 2015 14:23
Show Gist options
  • Save adbrowne/152b8ecec9300133d62b to your computer and use it in GitHub Desktop.
Save adbrowne/152b8ecec9300133d62b to your computer and use it in GitHub Desktop.
Bad idea to rely on finding changes using AutoIncrement columns or DateTimes
CREATE DATABASE [OrderedInsert]
GO
ALTER DATABASE [OrderedInsert] SET ALLOW_SNAPSHOT_ISOLATION ON
GO
USE [OrderedInsert]
GO
CREATE TABLE [dbo].[Test](
[Created] [datetime2](7) NOT NULL DEFAULT (getdate()),
[AutoId] [int] IDENTITY(1,1) NOT NULL,
[RowId] [int] NOT NULL
) ON [PRIMARY]
GO
Found missing value(s)
Last run
TestTableRow AutoId 667018, Created 635699088421330000, RowId 3
This run
TestTableRow AutoId 667017, Created 635699088421330000, RowId 1
TestTableRow AutoId 667018, Created 635699088421330000, RowId 3
Missed
TestTableRow AutoId 667017, Created 635699088421330000, RowId 1
Increasing the constanct: MaxTransactionSleepMilliseconds finds problems faster. Even with this
constant set to zero (ie transaction is simply insert then commit with no Thread.Sleep you will
find missing values within a minute)
public class TestOrdering
{
private readonly ITestOutputHelper _output;
private readonly Random _rnd = new Random();
private const string ConnStr =
"Data Source=(local);Initial Catalog=OrderedInsert;Integrated Security=True;";
private const int ConcurrentInserts = 5;
private const int MaxTransactionSleepMilliseconds = 1000;
private int _lastRowId;
public TestOrdering(ITestOutputHelper output)
{
_output = output;
}
[Fact]
public void FindChangesUsingDateTime()
{
Func<TestTableRow, long> sortBy = x => x.Created.Ticks;
RunUntilMissingChangeFound(sortBy);
}
[Fact]
public void FindChangesUsingAutoIncriment()
{
Func<TestTableRow, long> sortBy = x => x.AutoId;
RunUntilMissingChangeFound(sortBy);
}
private void RunUntilMissingChangeFound(Func<TestTableRow, long> sortBy)
{
using (var conn = new SqlConnection(ConnStr))
{
conn.Open();
ClearTestTable(conn);
var cancellationTokenSource = new CancellationTokenSource();
Task.Run(() => CheckValues(conn, cancellationTokenSource, sortBy));
var insertTasks =
Enumerable.Range(1, ConcurrentInserts)
.Select(entry0 => Task.Run(() => InsertRows(cancellationTokenSource.Token))).ToArray();
Task.WaitAll(insertTasks);
}
}
private async Task CheckValues(SqlConnection conn,
CancellationTokenSource cancellationTokenSource, Func<TestTableRow, long> sortBy)
{
var lastValues = new List<TestTableRow>();
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
var currentValues = GetTableRows(conn);
var valuesMissing = CheckForMissingValues(lastValues, currentValues, sortBy);
if (valuesMissing)
{
cancellationTokenSource.Cancel();
}
lastValues = currentValues;
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
}
private static void ClearTestTable(SqlConnection conn)
{
using (var deleteCmd = conn.CreateCommand())
{
deleteCmd.CommandText = "DELETE FROM dbo.Test";
deleteCmd.ExecuteNonQuery();
}
}
private bool CheckForMissingValues(List<TestTableRow> lastValues, List<TestTableRow> currentValues, Func<TestTableRow, long> sortBy)
{
var lastRowIds = new HashSet<int>(lastValues.Select(x => x.RowId));
var valuesMissing = false;
if (lastValues.Count > 0)
{
var maxLast = lastValues.Select(sortBy).Max();
var newValues = currentValues.Where(x => !lastRowIds.Contains(x.RowId));
var missingValues = newValues.Where(x => sortBy(x) < maxLast).ToList();
if (missingValues.Any())
{
_output.WriteLine("Found missing value(s)");
_output.WriteLine("Last run");
lastValues.OrderBy(sortBy).ToList().ForEach(x => _output.WriteLine(x.ToString()));
_output.WriteLine("This run");
currentValues.OrderBy(sortBy).ToList().ForEach(x => _output.WriteLine(x.ToString()));
// these values would have been missed had we queried for values inserted since the last
// maximum Created/AutoId value
_output.WriteLine("Missed");
missingValues.OrderBy(sortBy).ToList().ForEach(x => _output.WriteLine(x.ToString()));
valuesMissing = true;
}
}
return valuesMissing;
}
private static List<TestTableRow> GetTableRows(SqlConnection conn)
{
var values = new List<TestTableRow>();
using (var tx = conn.BeginTransaction(IsolationLevel.Snapshot))
{
using (var queryCmd = conn.CreateCommand())
{
queryCmd.Transaction = tx;
queryCmd.CommandText = "SELECT Created, RowId, AutoId FROM dbo.Test";
using (var reader = queryCmd.ExecuteReader())
{
while (reader.Read())
{
var date = reader.GetDateTime(0);
var rowId = reader.GetInt32(1);
var autoId = reader.GetInt32(2);
values.Add(new TestTableRow
{
Created = date,
RowId = rowId,
AutoId = autoId
});
}
}
}
}
return values;
}
private async Task InsertRows(CancellationToken cancellation)
{
using (var conn = new SqlConnection(ConnStr))
{
conn.Open();
while (!cancellation.IsCancellationRequested)
{
var newRowId = Interlocked.Increment(ref _lastRowId);
using (var tx = conn.BeginTransaction(IsolationLevel.Snapshot))
{
using (var cmd = conn.CreateCommand())
{
cmd.Transaction = tx;
cmd.CommandText = "INSERT INTO dbo.Test(RowId) VALUES (@RowId)";
cmd.Parameters.AddWithValue("@RowId", newRowId);
cmd.ExecuteNonQuery();
if (MaxTransactionSleepMilliseconds > 0)
{
await
Task.Delay(
TimeSpan.FromMilliseconds(_rnd.NextDouble() * MaxTransactionSleepMilliseconds));
}
tx.Commit();
}
}
}
}
}
}
class TestTableRow
{
public DateTime Created { get; set; }
public int RowId { get; set; }
public int AutoId { get; set; }
public override string ToString()
{
return
string.Format(
"TestTableRow AutoId {0}, Created {1}, RowId {2}",
AutoId,
Created.Ticks,
RowId);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment