Skip to content

Instantly share code, notes, and snippets.

@Kittoes0124
Last active April 3, 2020 21:38
Show Gist options
  • Save Kittoes0124/07ef7c5a7e9382c352434d66b5cf866a to your computer and use it in GitHub Desktop.
Save Kittoes0124/07ef7c5a7e9382c352434d66b5cf866a to your computer and use it in GitHub Desktop.
// define a container for information relevant to each row
public sealed class SqlResultSetRow : IEnumerable<(string fieldName, Type fieldType, object fieldValue)>
{
private readonly (string fieldName, Type fieldType, object fieldValue)[] m_fields;
public int ResultSetIndex { get; }
public SqlResultSetRow((string, Type, object)[] fields, int resultSetIndex) {
m_fields = fields;
ResultSetIndex = resultSetIndex;
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
public IEnumerator<(string fieldName, Type fieldType, object fieldValue)> GetEnumerator() {
var rows = m_fields;
foreach (var row in rows) {
yield return row;
}
}
public object GetFieldName(int fieldOffset) => m_fields[fieldOffset].fieldName;
public object GetFieldValue(int fieldOffset) => m_fields[fieldOffset].fieldValue;
}
// define a class to hold our generic method(s)
public static class SqlClientExtensions
{
public static async IAsyncEnumerable<T> ProcessRows<T>(this SqlCommand command, Func<SqlResultSetRow, CancellationToken, ValueTask<T>> rowCallback, CommandBehavior commandBehavior = CommandBehavior.SequentialAccess, [EnumeratorCancellation] CancellationToken cancellationToken = default) {
using (var dataReader = await command.ExecuteReaderAsync(commandBehavior, cancellationToken)) {
var resultSetIndex = 0;
do {
var fieldCount = dataReader.FieldCount;
var fieldNames = new string[fieldCount];
var fieldTypes = new Type[fieldCount];
for (var i = 0; (i < fieldCount); ++i) {
fieldNames[i] = dataReader.GetName(i);
fieldTypes[i] = dataReader.GetFieldType(i);
}
while (await dataReader.ReadAsync(cancellationToken)) {
var fields = new (string, Type, object)[fieldCount];
for (var i = 0; (i < fieldCount); ++i) {
fields[i] = (fieldNames[i], fieldTypes[i], dataReader.GetValue(i));
}
yield return await rowCallback(new SqlResultSetRow(fields, resultSetIndex), cancellationToken);
}
} while (await dataReader.NextResultAsync(cancellationToken));
}
}
}
class Program
{
// a minimal implementation of a rowCallBack function
public static async ValueTask<ExpandoObject> OnProcessRow(SqlResultSetRow resultSetRow, CancellationToken cancellationToken) {
var rowValue = (new ExpandoObject() as IDictionary<string, object>);
foreach (var field in resultSetRow) {
rowValue[field.fieldName] = field.fieldValue;
}
return (rowValue as ExpandoObject);
}
// put everything together
static async Task Main(string[] args) {
try {
var programTimeout = TimeSpan.FromMinutes(3);
var cancellationTokenSource = new CancellationTokenSource(programTimeout);
using (var connection = new SqlConnection(@"Data Source=(localdb)\MSSQLLocalDB;Initial Catalog=master;Integrated Security=True;"))
using (var command = connection.CreateCommand()) {
command.CommandText = "select 1 as [a];";
command.CommandType = CommandType.Text;
await connection.OpenAsync(cancellationToken: cancellationTokenSource.Token);
await foreach (dynamic row in command.ProcessRows(rowCallback: OnProcessRow, cancellationToken: cancellationTokenSource.Token)) {
Console.WriteLine($"a: {row.a}");
}
}
}
catch (Exception e) {
// do something with exception here
throw;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment