Skip to content

Instantly share code, notes, and snippets.

@dosper7
Created July 5, 2023 17:41
Show Gist options
  • Save dosper7/8cd144729a89daab346ace386f3a381a to your computer and use it in GitHub Desktop.
Save dosper7/8cd144729a89daab346ace386f3a381a to your computer and use it in GitHub Desktop.
PayoutBusiness async/await refactor
using Confluent.Kafka;
using LinqKit;
using Medallion.Threading;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Runtime.CompilerServices;
using Techops.Shared.Helpers.Currency;
using Techops.Shared.Helpers.Dates;
using Techops.Shared.Helpers.Exceptions;
using Techops.Shared.Kafka.Interfaces;
using Techops.Way4.Exporter.Business.Services.Interfaces;
using Techops.Way4.Exporter.Data.Entity;
using Techops.Way4.Exporter.Data.Repositories.Interfaces;
using Techops.Way4.Exporter.Models.Exceptions;
using Techops.Way4.Exporter.Models.Generic;
using Techops.Way4.Exporter.Models.Job;
using Techops.Way4.Exporter.Models.Payouts;
using AMGModels = AMG.Client;
namespace Techops.Way4.Exporter.Business.Services
{
public class PayoutBusiness : IPayoutBusiness
{
private readonly ILogger<PayoutBusiness> _logger;
private readonly IAMGService _amgService;
private readonly IKafkaProducer<string, Payout> _kafkaProducer;
private readonly IPayoutRepository _repository;
private readonly IOptionsMonitor<AppSettingsModel> _appSettings;
private readonly IDistributedLock _distributedLock;
public PayoutBusiness(
ILogger<PayoutBusiness> logger,
IOptionsMonitor<AppSettingsModel> options,
IAMGService amgService,
IKafkaProducer<string, Payout> kafkaProducer,
IPayoutRepository repository,
IDistributedLock distributedLock)
{
_amgService = amgService;
_logger = logger;
_appSettings = options;
_kafkaProducer = kafkaProducer;
_repository = repository;
_distributedLock = distributedLock;
}
public async Task<PayoutProcessorJobStats> ProcessPayoutsAsync(DateTime? date = null)
{
var stopwatch = new Stopwatch();
stopwatch.Start();
if (date == null)
{
stopwatch.Stop();
throw new ValidationException("Date cannot be null");
}
var successfulPayouts = 0;
var failedPayouts = 0;
var payoutEntities = await _repository.GetAsync(
x => x.State == PayoutState.TO_PROCESS
&& x.IssuedAt == date.Value.ToString("yyyy-MM-dd"));
var payoutToDoCount = payoutEntities.Count();
if (!payoutEntities.Any())
{
stopwatch.Stop();
_logger.Log(LogLevel.Debug, "No payouts found to be processed with date: {date}", date.Value.PrettyDate());
return new PayoutProcessorJobStats(payoutToDoCount,
successfulPayouts,
failedPayouts,
(int)stopwatch.ElapsedMilliseconds / 1000,
nameof(JobType.PayoutExporter),
payoutToDoCount <= 0 ? "Success" : "Failure");
}
foreach (var payout in payoutEntities)
{
var temp = new Payout
{
Institution = new PayoutInstitution { Name = "SaltPay-Way4" },
PayoutId = payout.Id.ToString(),
CardAcceptorId = payout.CardAcceptorId,
StoreId = payout.StoreId.ToString(),
IssuedAt = payout.IssuedAt,
Amounts = new Amount
{
PayoutCurrency = payout.Currency,
PayoutAmount = payout.Amount,
GrossTransactionAmount = payout.GrossTransactionAmount,
TotalFeesAmount = payout.TotalDeductionsAmount
}
};
var kafkaMessageKey = $"payouts_{payout.CardAcceptorId}";
var persistenceStatus = await this.SafeExecuteKafkaProducer(_kafkaProducer.ProduceAsync(_appSettings.CurrentValue.Kafka.PayoutsTopic, kafkaMessageKey, temp));
_logger.Log(LogLevel.Debug,
"Message {KafkaMessageKey} has been sent with result: {PersistenceStatus}", kafkaMessageKey,
persistenceStatus);
switch (persistenceStatus)
{
case PersistenceStatus.Persisted:
case PersistenceStatus.PossiblyPersisted:
payout.State = PayoutState.PROCESSED;
successfulPayouts++;
break;
case PersistenceStatus.NotPersisted:
default:
payout.State = PayoutState.FAILED;
failedPayouts++;
break;
}
}
await _repository.BulkUpdateAsync(payoutEntities);
stopwatch.Stop();
_logger.Log(LogLevel.Information, "Sent successfully {messages} payouts to kafka", successfulPayouts);
return new PayoutProcessorJobStats(payoutToDoCount,
successfulPayouts,
failedPayouts,
(int)stopwatch.ElapsedMilliseconds / 1000,
nameof(JobType.PayoutExporter),
payoutToDoCount <= 0 || (successfulPayouts / payoutToDoCount) == 1 ? "Success" : "Failure");
}
public async Task<int> PopulatePayoutsAsync(DateTime? date = null)
{
if (date == null)
throw new ValidationException("Date cannot be null");
// Only run this if it hasn't for the requested date
if (await _repository.Exists(x => x.IssuedAt == date.Value.ToString("yyyy-MM-dd")))
return default;
var results = await _amgService.FetchPayoutReportData((DateTime)date);
if (results == null || !results.Any())
{
_logger.Log(LogLevel.Information, "No payout data on Date: {Date}", date.ToString());
return default;
}
_logger.Log(LogLevel.Debug, "Processing {payout_count} payouts", results.Count());
var payoutEntities = new List<PayoutEntity>();
foreach (var payout in results)
{
if (payout.Status.Equals(PayoutStatus.UNPAID.ToString()))
continue;
var payoutEntity = new PayoutEntity
{
Id = payout.Id,
Currency = payout.Currency,
CardAcceptorId = payout.Mid,
CompanyId = string.IsNullOrWhiteSpace(payout.Gmd_company_id) ? Guid.Empty : new Guid(payout.Gmd_company_id),
StoreId = string.IsNullOrWhiteSpace(payout.Gmd_store_id) ? Guid.Empty : new Guid(payout.Gmd_store_id),
StartedAt = payout.Started_at.DateTime,
EndedAt = payout.Ended_at.DateTime.AddDays(1).AddSeconds(-1),
IssuedAt = payout.Issued_at.ToString("yyyy-MM-dd"),
State = PayoutState.TO_PROCESS,
CreatedBy = "System",
ExportToPayments = payout.Export_to_payments
};
payoutEntity.State = this.FormatAmountValues(ref payoutEntity, payout, date.Value)
? payoutEntity.State
: PayoutState.UNPROCESSABLE_CURRENCY;
var payoutStatus = this.ParsePayoutStatus(payout, date.Value);
payoutEntity.Status = payoutStatus;
if (payoutStatus == PayoutStatus.UNKNOWN
&& payoutEntity.State != PayoutState.UNPROCESSABLE_CURRENCY)
{
payoutEntity.State = PayoutState.UNPROCESSABLE_STATUS;
}
payoutEntities.Add(payoutEntity);
}
await _repository.AddAsync(payoutEntities);
return payoutEntities.Count;
}
public async Task<int> ForcePopulatePayoutsAsync(DateTime? date = null)
{
if (date == null)
throw new ValidationException("Date cannot be null");
// var results = await _amgService.FetchPayoutData((DateTime)date);
var results = await _amgService.FetchPayoutReportData((DateTime)date);
if (results == null || results.Count() == 0)
{
_logger.Log(LogLevel.Information, "No payout data on Date: {Date}", date.ToString());
return default;
}
var payoutData = (await _repository.GetAsync(x => x.IssuedAt == date.Value.ToString("yyyy-MM-dd"))).ToList();
if (payoutData.Any())
{
results = results.Where(x => !payoutData.Any(y => y.Id == x.Id)).ToList();
if (!results.Any(y => y.Status.Equals(PayoutStatus.PAID.ToString())))
{
_logger.Log(LogLevel.Information, "No payout data on Date: {Date} left to be processed", date.ToString());
return 0;
}
}
_logger.Log(LogLevel.Debug, "Processing {payout_count} payouts", results.Count());
var payoutEntities = new List<PayoutEntity>();
foreach (var payout in results)
{
if (payout.Status.Equals(PayoutStatus.UNPAID.ToString()))
continue;
var payoutEntity = new PayoutEntity
{
Id = payout.Id,
Currency = payout.Currency,
CardAcceptorId = payout.Mid,
CompanyId = string.IsNullOrWhiteSpace(payout.Gmd_company_id) ? Guid.Empty : new Guid(payout.Gmd_company_id),
StoreId = string.IsNullOrWhiteSpace(payout.Gmd_store_id) ? Guid.Empty : new Guid(payout.Gmd_store_id),
StartedAt = payout.Started_at.DateTime,
EndedAt = payout.Ended_at.DateTime.AddDays(1).AddSeconds(-1),
IssuedAt = payout.Issued_at.ToString("yyyy-MM-dd"),
State = PayoutState.TO_PROCESS,
CreatedBy = "System",
ExportToPayments = payout.Export_to_payments
};
payoutEntity.State = this.FormatAmountValues(ref payoutEntity, payout, date.Value)
? payoutEntity.State
: PayoutState.UNPROCESSABLE_CURRENCY;
var payoutStatus = this.ParsePayoutStatus(payout, date.Value);
payoutEntity.Status = payoutStatus;
if (payoutStatus == PayoutStatus.UNKNOWN
&& payoutEntity.State != PayoutState.UNPROCESSABLE_CURRENCY)
{
payoutEntity.State = PayoutState.UNPROCESSABLE_STATUS;
}
payoutEntities.Add(payoutEntity);
}
await _repository.AddAsync(payoutEntities);
return payoutEntities.Count;
}
public async Task<PayoutEntity> GetPayoutByIdAsync(Guid id)
{
if (id == Guid.Empty)
throw new ArgumentException("Id cannot be empty");
var payout = await _repository.GetAsync(id);
if (payout == null)
throw new ArgumentException("There is no payout with given id");
return payout;
}
public async Task<PayoutEntity> UpdatePayoutByIdAsync(Guid id, PayoutUpdate updatedPayout)
{
if (id == Guid.Empty)
throw new ArgumentException("Id cannot be empty");
var payout = await _repository.GetAsync(id);
if (payout == null)
throw new ArgumentException("There is no payout with given id");
bool isUpdated = false;
if (updatedPayout.State.HasValue)
(isUpdated, payout.State) = (true, (PayoutState)updatedPayout.State);
if (updatedPayout.Active.HasValue)
(isUpdated, payout.Active) = (true, (bool)updatedPayout.Active);
if (!isUpdated)
return payout;
bool updated = await _repository.UpdateAsync(payout);
if (!updated)
throw new Exception("Problem saving changes to database");
return payout;
}
public async Task<bool> PurgeAsync(DateTime createdBefore, CancellationToken cancellationToken)
{
return await _repository.PurgeAsync(createdBefore, cancellationToken);
}
public async Task<(IEnumerable<PayoutEntity>, int)> GetPayoutsAsync(string q, PayoutState? state, bool? active, bool? export, int? offset, int? limit)
{
offset ??= 0;
limit ??= 100;
var filter = GenerateFilter(q, state, active, export);
var payouts = await _repository.GetAsync(filter, (int)offset, (int)limit);
var count = await _repository.GetCount(filter);
return (payouts, count);
}
public async Task<IJobStats> ProcessPayoutsInBatchAsync(DateTime? date = null, int batchSize = -1)
{
var stopwatch = new Stopwatch();
stopwatch.Start();
if (date == null)
{
stopwatch.Stop();
throw new ValidationException("Date cannot be null");
}
var successfulPayouts = 0;
var failedPayouts = 0;
IEnumerable<PayoutEntity> payoutEntities;
int payoutToDoCount;
await using (await _distributedLock.AcquireAsync())
{
payoutEntities = await _repository.GetPayoutsToExportInBatch(date.Value, batchSize);
payoutToDoCount = payoutEntities.Count();
if (!payoutEntities.Any())
{
stopwatch.Stop();
_logger.Log(LogLevel.Debug, "No payouts found to be processed with date: {date}", date.Value.PrettyDate());
return new JobReportStats(
(int)stopwatch.ElapsedMilliseconds / 1000,
nameof(JobType.PayoutExporter),
payoutToDoCount <= 0 || (successfulPayouts / payoutToDoCount) == 1 ? nameof(JobStatus.Successful) : nameof(JobStatus.Failed),
payoutToDoCount,
successfulPayouts,
failedPayouts);
}
Parallel.ForEach(payoutEntities, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, (entity) => entity.State = PayoutState.PROCESSING);
await _repository.BulkUpdateAsync(payoutEntities);
}
// Necessary variables for the update phase
var slim = new SemaphoreSlim(1, 1);
var processedPayouts = new List<PayoutEntity>(payoutEntities.Count());
const int updateBatchCount = 20;
ConcurrentBag<Task> tasks = new();
var payoutTopic = _appSettings.CurrentValue.Kafka.PayoutsTopic;
await Parallel.ForEachAsync(payoutEntities, new ParallelOptions() { MaxDegreeOfParallelism = 2 },
(entity, cancellationToken) =>
{
tasks.Add(Task.Run(async () =>
{
var temp = new Payout
{
Institution = new PayoutInstitution { Name = "SaltPay-Way4" },
PayoutId = entity.Id.ToString(),
CardAcceptorId = entity.CardAcceptorId,
StoreId = entity.StoreId.ToString(),
IssuedAt = entity.IssuedAt,
Amounts = new Amount
{
PayoutCurrency = entity.Currency,
PayoutAmount = entity.Amount,
GrossTransactionAmount = entity.GrossTransactionAmount,
TotalFeesAmount = entity.TotalDeductionsAmount
}
};
var kafkaMessageKey = $"payouts_{entity.CardAcceptorId}";
var persistenceStatus = await this.SafeExecuteKafkaProducer(_kafkaProducer.ProduceAsync(payoutTopic, kafkaMessageKey, temp));
_logger.Log(LogLevel.Debug, "Message {KafkaMessageKey} has been sent with result: {PersistenceStatus}", kafkaMessageKey, persistenceStatus);
switch (persistenceStatus)
{
case PersistenceStatus.Persisted:
case PersistenceStatus.PossiblyPersisted:
entity.State = PayoutState.PROCESSED;
Interlocked.Increment(ref successfulPayouts);
break;
// PersistenceStatus.NotPersisted
default:
entity.State = PayoutState.FAILED;
Interlocked.Increment(ref failedPayouts);
break;
}
try
{
await slim.WaitAsync(cancellationToken);
processedPayouts.Add(entity);
if (processedPayouts.Count >= updateBatchCount)
{
await _repository.BulkUpdateAsync(processedPayouts);
processedPayouts.Clear();
}
}
finally
{
slim.Release();
}
}, cancellationToken));
return ValueTask.CompletedTask;
});
await Task.WhenAll(tasks);
if (processedPayouts.Count > 0)
await _repository.BulkUpdateAsync(processedPayouts);
stopwatch.Stop();
_logger.Log(LogLevel.Information, "Sent successfully {messages} payouts to kafka", successfulPayouts);
return new JobReportStats(
(int)stopwatch.ElapsedMilliseconds / 1000,
nameof(JobType.PayoutExporter),
payoutToDoCount <= 0 || (successfulPayouts / payoutToDoCount) == 1 ? nameof(JobStatus.Successful) : nameof(JobStatus.Failed),
payoutToDoCount,
successfulPayouts,
failedPayouts);
}
#region private methods
private Expression<Func<PayoutEntity, bool>> GenerateFilter(string q, PayoutState? state, bool? active, bool? export)
{
var predicate = PredicateBuilder.New<PayoutEntity>(true);
if (!string.IsNullOrEmpty(q))
{
q = q.Trim();
predicate = predicate
.Or(x => x.Id.ToString() == q)
.Or(x => x.CompanyId.ToString() == q)
.Or(x => x.StoreId.ToString() == q)
.Or(x => x.EntityId.ToString() == q)
.Or(x => x.CardAcceptorId == q)
.Or(x => x.IssuedAt == q)
.Or(x => x.Currency == q);
}
if (state.HasValue)
predicate = predicate.And(x => x.State == state);
if (active.HasValue)
predicate = predicate.And(x => x.Active == active);
if (export.HasValue)
predicate = predicate.And(x => x.ExportToPayments == export);
return predicate;
}
private async Task<PersistenceStatus> SafeExecuteKafkaProducer(Task<PersistenceStatus> functionToExecute, [CallerMemberName] string nameOfFunction = "")
{
try
{
return await functionToExecute;
}
catch (Exception ex)
{
_logger!.Log(LogLevel.Warning, ex, "There was an error executing the function: {nameOfFunction}.", nameOfFunction);
return PersistenceStatus.NotPersisted;
}
}
private bool FormatAmountValues(ref PayoutEntity entity, AMGModels.PayoutReportResponse payoutReportResponse, DateTime date)
{
var sucess = true;
try
{
entity.Amount = payoutReportResponse.Payout_amount.FormatCurrencyAmount(payoutReportResponse.Currency);
}
catch (Exception ex) when (ex is InvalidCurrencyException || ex is UnsupportedCurrencyException)
{
entity.Amount = 0;
sucess = false;
_logger.Log(LogLevel.Error, ex, "Failed to convert payment amount! Date = {Date}, PayoutId = {PayoutId}, Currency = {currency}", date, payoutReportResponse.Id, payoutReportResponse.Currency);
}
try
{
entity.GrossTransactionAmount = payoutReportResponse.Gross_transaction_amount.FormatCurrencyAmount(payoutReportResponse.Currency);
}
catch (Exception ex) when (ex is InvalidCurrencyException || ex is UnsupportedCurrencyException)
{
entity.GrossTransactionAmount = 0;
sucess = false;
_logger.Log(LogLevel.Error, ex, "Failed to convert gross transaction amount! Date = {Date}, PayoutId = {PayoutId}, Currency = {currency}", date, payoutReportResponse.Id, payoutReportResponse.Currency);
}
try
{
entity.TotalDeductionsAmount = payoutReportResponse.Total_fees_amount.FormatCurrencyAmount(payoutReportResponse.Currency);
}
catch (Exception ex) when (ex is InvalidCurrencyException || ex is UnsupportedCurrencyException)
{
entity.TotalDeductionsAmount = 0;
sucess = false;
_logger.Log(LogLevel.Error, ex, "Failed to convert total deductions amount! Date = {Date}, PayoutId = {PayoutId}, Currency = {currency}", date, payoutReportResponse.Id, payoutReportResponse.Currency);
}
try
{
entity.SettleAmount = payoutReportResponse.Settle_amount.FormatCurrencyAmount(payoutReportResponse.Currency);
}
catch (Exception ex) when (ex is InvalidCurrencyException || ex is UnsupportedCurrencyException)
{
entity.SettleAmount = 0;
sucess = false;
_logger.Log(LogLevel.Error, ex, "Failed to convert settle amount! Date = {Date}, PayoutId = {PayoutId}, Currency = {currency}", date, payoutReportResponse.Id, payoutReportResponse.Currency);
}
try
{
entity.BeginBalance = payoutReportResponse.Begin_balance.FormatCurrencyAmount(payoutReportResponse.Currency);
}
catch (Exception ex) when (ex is InvalidCurrencyException || ex is UnsupportedCurrencyException)
{
entity.BeginBalance = 0;
sucess = false;
_logger.Log(LogLevel.Error, ex, "Failed to convert begin balance! Date = {Date}, PayoutId = {PayoutId}, Currency = {currency}", date, payoutReportResponse.Id, payoutReportResponse.Currency);
}
return sucess;
}
private PayoutStatus ParsePayoutStatus(AMGModels.PayoutReportResponse payoutReportResponse, DateTime date)
{
PayoutStatus status = PayoutStatus.UNKNOWN;
try
{
status = (PayoutStatus)Enum.Parse(typeof(PayoutStatus), payoutReportResponse.Status);
}
catch (Exception ex)
{
_logger.Log(LogLevel.Error, ex, "Failed to set payout status! Date = {Date}, PayoutId = {PayoutId}, Status = {Status}", date, payoutReportResponse.Id, payoutReportResponse.Status);
}
return status;
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment