Skip to content

Instantly share code, notes, and snippets.

@DamianEdwards
Last active December 15, 2015 03:19
Show Gist options
  • Save DamianEdwards/5193779 to your computer and use it in GitHub Desktop.
Save DamianEdwards/5193779 to your computer and use it in GitHub Desktop.
Demonstrates the behavior of SqlDataReader when inserts are happening during the Read() loop. Even with ReadUncommitted isolation for the consumer, the only records returned are those that existed when ExecuteReader() was called. UPDATE: Seems with big enough numbers the behavior can change, presumably due to how SQL decides to do locking (row, …
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading;
namespace DataReaderBehavior
{
class Program
{
private static readonly string _connectionString = @"Data Source=(local);Initial Catalog=master;Integrated Security=SSPI;";
static void Main(string[] args)
{
Install();
var publisher = new Thread(() => Publisher(IsolationLevel.Serializable));
var consumer = new Thread(() => Consumer(IsolationLevel.Serializable));
publisher.Start();
consumer.Start();
Console.ReadLine();
publisher.Abort();
consumer.Abort();
}
private static void Publisher(IsolationLevel isolationLevel)
{
using (var connection = new SqlConnection(_connectionString))
{
connection.Open();
for (var i = 0; i < 9000; i++)
{
using (var transaction = connection.BeginTransaction(isolationLevel))
{
var command = new SqlCommand("INSERT INTO [Sample].[dbo].[S] (Message) VALUES (@Message)", connection);
command.Parameters.AddWithValue("Message", i.ToString());
command.Transaction = transaction;
command.ExecuteNonQuery();
transaction.Commit();
}
Console.WriteLine("+ Publisher: inserted " + i.ToString());
Thread.Sleep(1);
}
Console.WriteLine("+ Publisher: finished inserting");
}
}
private static void Consumer(IsolationLevel isolationLevel)
{
using (var connection = new SqlConnection(_connectionString))
{
connection.Open();
// Wait enough time for ~8000 records to be inserted by the Publisher
Thread.Sleep(8000);
using (var transaction = connection.BeginTransaction(isolationLevel))
{
var command = new SqlCommand("SELECT * FROM [Sample].[dbo].[S]", connection);
command.Transaction = transaction;
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine("- Consumer: read row " + reader.GetString(1));
Thread.Sleep(10);
}
}
}
Console.WriteLine("- Consumer: finished reading");
}
}
private static void Install()
{
using (var connection = new SqlConnection(_connectionString))
{
connection.Open();
var command = new SqlCommand(
@"IF NOT EXISTS(SELECT [database_id] FROM [sys].[databases] WHERE [name] = 'Sample')
CREATE DATABASE [Sample];
", connection);
command.ExecuteNonQuery();
command = new SqlCommand(
@"Use [Sample];
IF NOT EXISTS(SELECT [object_id] FROM [sys].[tables] WHERE [name] = 'S')
BEGIN
CREATE TABLE [Sample].[dbo].[S] (
[MessageId] [int] IDENTITY(1,1) NOT NULL,
[Message] [nvarchar](256) NOT NULL,
CONSTRAINT [PK_S] PRIMARY KEY CLUSTERED ([MessageId] ASC));
END
ELSE
DELETE FROM [Sample].[dbo].[S];", connection);
command.ExecuteNonQuery();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment