Last active
April 3, 2020 21:38
-
-
Save Kittoes0124/07ef7c5a7e9382c352434d66b5cf866a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// define a container for information relevant to each row | |
public sealed class SqlResultSetRow : IEnumerable<(string fieldName, Type fieldType, object fieldValue)> | |
{ | |
private readonly (string fieldName, Type fieldType, object fieldValue)[] m_fields; | |
public int ResultSetIndex { get; } | |
public SqlResultSetRow((string, Type, object)[] fields, int resultSetIndex) { | |
m_fields = fields; | |
ResultSetIndex = resultSetIndex; | |
} | |
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); | |
public IEnumerator<(string fieldName, Type fieldType, object fieldValue)> GetEnumerator() { | |
var rows = m_fields; | |
foreach (var row in rows) { | |
yield return row; | |
} | |
} | |
public object GetFieldName(int fieldOffset) => m_fields[fieldOffset].fieldName; | |
public object GetFieldValue(int fieldOffset) => m_fields[fieldOffset].fieldValue; | |
} | |
// define a class to hold our generic method(s) | |
public static class SqlClientExtensions | |
{ | |
public static async IAsyncEnumerable<T> ProcessRows<T>(this SqlCommand command, Func<SqlResultSetRow, CancellationToken, ValueTask<T>> rowCallback, CommandBehavior commandBehavior = CommandBehavior.SequentialAccess, [EnumeratorCancellation] CancellationToken cancellationToken = default) { | |
using (var dataReader = await command.ExecuteReaderAsync(commandBehavior, cancellationToken)) { | |
var resultSetIndex = 0; | |
do { | |
var fieldCount = dataReader.FieldCount; | |
var fieldNames = new string[fieldCount]; | |
var fieldTypes = new Type[fieldCount]; | |
for (var i = 0; (i < fieldCount); ++i) { | |
fieldNames[i] = dataReader.GetName(i); | |
fieldTypes[i] = dataReader.GetFieldType(i); | |
} | |
while (await dataReader.ReadAsync(cancellationToken)) { | |
var fields = new (string, Type, object)[fieldCount]; | |
for (var i = 0; (i < fieldCount); ++i) { | |
fields[i] = (fieldNames[i], fieldTypes[i], dataReader.GetValue(i)); | |
} | |
yield return await rowCallback(new SqlResultSetRow(fields, resultSetIndex), cancellationToken); | |
} | |
} while (await dataReader.NextResultAsync(cancellationToken)); | |
} | |
} | |
} | |
class Program | |
{ | |
// a minimal implementation of a rowCallBack function | |
public static async ValueTask<ExpandoObject> OnProcessRow(SqlResultSetRow resultSetRow, CancellationToken cancellationToken) { | |
var rowValue = (new ExpandoObject() as IDictionary<string, object>); | |
foreach (var field in resultSetRow) { | |
rowValue[field.fieldName] = field.fieldValue; | |
} | |
return (rowValue as ExpandoObject); | |
} | |
// put everything together | |
static async Task Main(string[] args) { | |
try { | |
var programTimeout = TimeSpan.FromMinutes(3); | |
var cancellationTokenSource = new CancellationTokenSource(programTimeout); | |
using (var connection = new SqlConnection(@"Data Source=(localdb)\MSSQLLocalDB;Initial Catalog=master;Integrated Security=True;")) | |
using (var command = connection.CreateCommand()) { | |
command.CommandText = "select 1 as [a];"; | |
command.CommandType = CommandType.Text; | |
await connection.OpenAsync(cancellationToken: cancellationTokenSource.Token); | |
await foreach (dynamic row in command.ProcessRows(rowCallback: OnProcessRow, cancellationToken: cancellationTokenSource.Token)) { | |
Console.WriteLine($"a: {row.a}"); | |
} | |
} | |
} | |
catch (Exception e) { | |
// do something with exception here | |
throw; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment