Skip to content

Instantly share code, notes, and snippets.

@vector623
Created July 12, 2018 13:13
Show Gist options
  • Save vector623/6b9d960fb7aa2b5f7d696bfc4709da98 to your computer and use it in GitHub Desktop.
Save vector623/6b9d960fb7aa2b5f7d696bfc4709da98 to your computer and use it in GitHub Desktop.
example Serilog usage in a typical ETL
public IActionResult WarehouseTransactions()
{
dynamic logs = new ExpandoObject();
try
{
using (var athenaDb = LocalDB.GetConnection())
using (var remoteDb = RemoteDB.GetConnection())
{
//download ids+lastmodified timestamp for source and destination
var sourceTransationState = athenaDb
.Query<Transaction>(
@"SELECT
TRANSACTION_ID,
LAST_MODIFIED_DATE
FROM dbo.TRANSACTIONS
WHERE TRANDATE >= '2018-01-01'
ORDER BY CREATE_DATE DESC",
commandTimeout: 2400)
.ToList();
var destinationTransactionState = remoteDb
.GetTransactionsState<Transaction>(DateTime.Parse("2018-01-01"))
.ToList();
//figure out which transaction ids are new and which are updated
var newTransactionIds = sourceTransactionState
.Except(destinationTransactionState, new Transaction.IdComparer())
.Select(t => t.transaction_id)
.OrderBy(t => t)
.ToList();
logs.newTransactionIdsCount = newTransactionIds.Count;
var updatedTransactionIds = sourceTransactionState
.Intersect(destinationTransactionState, new Transaction.IdComparer())
.Except(destinationTransactionState, new Transaction.StatusComparer())
.Select(t => t.transaction_id)
.OrderBy(t => t)
.ToList();
logs.updatedTransactionIdsCount = updatedTransactionIds.Count;
//fetch new+updated transactions from source
var transactionIds = newTransactionIds
.Concat(updatedTransactionIds)
.ToList();
var transactions = localDb
.GetTransactionsByIds(transactionIds)
.ToList();
logs.transactionsFetchedFromLocalDB = transactions.Count;
//convert transactions from local to remote type
var transactionsMapped = transactions
.Select(MapTransaction)
.OrderBy(t => t, new Transaction.IdComparer())
.ToList();
logs.transactionsMapped = transactionsMapped.Count;
//separate new transactions from updated
var newTransactions = transactionsMapped
.Where(t => newTransactionIds.BinarySearch(t.transaction_id) >= 0)
.ToList();
logs.newTransactionsCount = newTransactions.Count
var updatedTransactions = transactionionsMappedd
.Where(t => updatedTransactionIds.BinarySearch(t.transaction_id) >= 0)
.ToList();
logs.updatedTransactionsCount = updatedTransactions.Count;
//insert new transactions
logs.insertResults = remoteDB.BulkInsert(
newTransactions
"public.transactions");
//update existing transactions
logs.insertResults = remoteDB.BulkUpdate(
updatedTransactions
"public.transactions");
//return logged data to caller
logs.Result = "success";
return Content(
JsonConvert.SerializeObject((ExpandoObject)logs,Formatting.Indented),
"application/json");
}
}
catch (Exception e)
{
//log exception and return logged data to caller
Logger.LogError(e, "problem warehousing transactions");
logs.Result = "failure";
return Content(
JsonConvert.SerializeObject((ExpandoObject)logs,Formatting.Indented),
"application/json");
}
finally
{
//record logged data
Logger.LogInformation("{@log}", (ExpandoObject) logs);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment