Skip to content

Instantly share code, notes, and snippets.

@tnayanam
Created November 21, 2019 20:25
Show Gist options
  • Save tnayanam/ceb1d32c78919c56fbf4a96e0b821121 to your computer and use it in GitHub Desktop.
Save tnayanam/ceb1d32c78919c56fbf4a96e0b821121 to your computer and use it in GitHub Desktop.
namespace SM.BrokerDomain.BLL
{
/// <summary>
/// Performs bulk-writes of logs to the database.
/// Do not use this class directly for logging. Instead, use the FederationLogger singleton for logging.
/// </summary>
public class BulkLogWriter
{
public class BulkLogWriterResult
{
public int LogsWritten { get; set; }
public bool WroteToDatabase { get; set; }
public bool WroteToAWS { get; set; }
public TimeSpan DatabaseWriteTime { get; set; }
public List<Exception> DatabaseWriteExceptions { get; set; }
public TimeSpan AwsWriteTime { get; set; }
public List<Exception> AwsWriteExceptions { get; set; }
}
private class LogTypeInfo
{
public string TableName { get; set; }
public Type DataReaderType { get; set; }
public LogTypeInfo(string tableName, Type dataReaderType) { TableName = tableName; DataReaderType = dataReaderType; }
}
private static readonly Dictionary<Type, LogTypeInfo> logTypeInfos = new Dictionary<Type, LogTypeInfo>() {
{ typeof(FederationAuditLog), new LogTypeInfo("[Log].[FederationAuditLogs]", typeof(FederationAuditLogDataReader)) },
{ typeof(FederationTraceLog), new LogTypeInfo("[Log].[FederationTraceLogs]", typeof(FederationTraceLogDataReader)) },
{ typeof(PublishRestSucessLog), new LogTypeInfo("[Log].[PublishRestSucessLogs]", typeof(PublishRestSuccessLogDataReader)) },
{ typeof(PublishRestErrorLog), new LogTypeInfo("[Log].[PublishRestErrorLogs]", typeof(PublishRestErrorLogDataReader)) },
{ typeof(EcomRequestLog), new LogTypeInfo("[Log].[EcomRequestLogs]", typeof(EcomRequestLogDataReader)) },
{ typeof(SystemLog), new LogTypeInfo("[Broker].[SystemLogs]", typeof(SystemLogDataReader)) }
};
/// <summary>
/// Bulk-writes a set of log entities to the database. Must be a supported log entry type with a mapping in the logTypeInfos dictionary.
/// </summary>
public static async Task<BulkLogWriterResult> LogAsync<TLog>(IEnumerable<TLog> logs, CancellationToken cancellationToken, int batchSize = 1000, int timeoutSeconds = 180)
{
var logTypeInfo = logTypeInfos[typeof(TLog)];
var databaseWriteExceptions = new List<Exception>();
var awsWriteExceptions = new List<Exception>();
Task<TimeSpan> databaseWriteTask = null;
Task<TimeSpan> awsWriteTask = null;
var logCount = logs.Count();
bool wroteToAwsLogs = false;
//Begin task writing to database
var logsDbContext = RepoCache.CreateContextFor<TLog>();
try
{
databaseWriteTask = logsDbContext.BulkInsertAsync(logTypeInfo.TableName, () => (BulkInsertDataReader<TLog>)Activator.CreateInstance(logTypeInfo.DataReaderType, logs), (batchSize < 1) ? logCount : batchSize, timeoutSeconds, cancellationToken);
}
catch (Exception ex)
{
databaseWriteExceptions.Add(ex);
}
//Begin task writing to AWS logs
try
{
var sendLogsToAWS = Config.TryGet(ConfigurationKeys.LoggingSendToAwsEnabled, false);
if (sendLogsToAWS)
{
wroteToAwsLogs = true;
awsWriteTask = SendLogsToCloudWatch(logs);
}
}
catch (Exception ex)
{
awsWriteExceptions.Add(ex);
}
//Await results of database write
if (databaseWriteTask != null)
{
try
{
await databaseWriteTask.ConfigureAwait(false);
}
catch (Exception ex)
{
databaseWriteExceptions.Add(ex);
}
}
//Await results of AWS log write
if (awsWriteTask != null)
{
try
{
await awsWriteTask.ConfigureAwait(false);
}
catch (Exception ex)
{
awsWriteExceptions.Add(ex);
}
}
return new BulkLogWriterResult
{
LogsWritten = logCount,
WroteToDatabase = true,
WroteToAWS = wroteToAwsLogs,
DatabaseWriteTime = (databaseWriteTask?.IsCompleted ?? false) ? databaseWriteTask.Result : TimeSpan.Zero,
DatabaseWriteExceptions = databaseWriteExceptions,
AwsWriteTime = (awsWriteTask?.IsCompleted ?? false) ? awsWriteTask.Result : TimeSpan.Zero,
AwsWriteExceptions = awsWriteExceptions
};
}
#region CloudWatchLogs
private static AmazonCloudWatchLogsClient _logClient;
public static AmazonCloudWatchLogsClient logClient => _logClient ?? (_logClient = new AmazonCloudWatchLogsClient(new ECSTaskCredentials(), new EnvironmentVariableAWSRegion().Region));
private const string LOG_GROUP_NOTFOUND = "The specified log group does not exist.";
private const int CLOUDWATCH_MAX_PAYLOAD_COUNT = 9000; // Actual Limit - 10000
private const int CLOUDWATCH_MAX_PAYLOAD_SIZE = 1048000; // Actual Limit - 1048576
private const int CLOUDWATCH_CONSTRAINT = 26;
private const int CLOUDWATCH_MAX_LOG_SIZE = 261900 - CLOUDWATCH_CONSTRAINT; // Actual Limit - 262144
/// <summary>
/// Holds the settings related to each type of logs (trace, audit, system, publish)
/// </summary>
private class AWSLogSetting
{
public string LogStreamName { get; set; }
public string SequenceToken { get; set; }
public int RetryCount { get; set; }
public Func<object, string> ConversionFunction { get; set; }
}
private static readonly Dictionary<Type, AWSLogSetting> awsLogTypeSettings = new Dictionary<Type, AWSLogSetting>() {
{ typeof(FederationAuditLog), new AWSLogSetting {
LogStreamName = FederationLogger.auditStreamName,
RetryCount = FederationLogger.auditRetryCount ,
SequenceToken = FederationLogger.auditSequenceToken ,
ConversionFunction = (log) => ConvertToAudit((FederationAuditLog)log) } },
{ typeof(FederationTraceLog), new AWSLogSetting {
LogStreamName = FederationLogger.traceStreamName,
RetryCount = FederationLogger.traceRetryCount,
SequenceToken = FederationLogger.traceSequenceToken,
ConversionFunction = (log) => ConvertToTrace((FederationTraceLog)log) } },
{ typeof(SystemLog), new AWSLogSetting {
LogStreamName = SystemLogger.systemStreamName,
RetryCount = SystemLogger.systemRetryCount,
SequenceToken = SystemLogger.systemSequenceToken ,
ConversionFunction = (log) => ConvertToSystem((SystemLog)log) } },
{ typeof(PublishRestSucessLog), new AWSLogSetting {
LogStreamName = PublishingLogger.publishSuccessStreamName,
RetryCount = PublishingLogger.publishSucessRetryCount,
SequenceToken = PublishingLogger.publishSuccessSequenceToken ,
ConversionFunction = (log) => ConvertPubSuccToAudit((PublishRestSucessLog)log) } },
{ typeof(PublishRestErrorLog), new AWSLogSetting {
LogStreamName = PublishingLogger.publishErrorStreamName,
RetryCount = PublishingLogger.publishErrorRetryCount,
SequenceToken = PublishingLogger.publishErrorSequenceToken,
ConversionFunction = (log) => ConvertPubErrToAudit((PublishRestErrorLog)log) } },
};
/// <summary>
/// Takes list of log and sends them in batch to Cloud Watch Logs
/// If the Log Group is not present in the CWL (which is read from web.config), exception will occur
/// If the Log Stream is not present it will be created in CWL.
/// If the Size of logs is > CLOUDWATCH_MAX_PAYLOAD_SIZE, the logs will be dumped
/// If the count of logs in the payload is > CLOUDWATCH_MAX_PAYLOAD_COUNT, old batch will be sent and new batch will be created
/// Every time batch is sent an "Upload Sequence Token" is returned and that is needed for next batch to be sent succesfully
/// If "Upload Sequence Token" doesn't match an exception will be thrown
/// When Resource not found exception is thrown because stream is not found a new stream is created and the batch of log messages are sent to it.
/// </summary>
/// <typeparam name="TLog"></typeparam>
/// <param name="logs"> Trace, Audit, System, Ecom and Publish Logs</param>
/// <param name="logCount"> Number of logs/messages that are currently proccesed</param>
/// <param name="logBatch"> Batch of logs/messages which needs to be resent coz of exception</param>
/// <returns></returns>
public static async Task<TimeSpan> SendLogsToCloudWatch<TLog>(IEnumerable<TLog> logs, int logCount = 0, List<InputLogEvent> logBatch = null)
{
var stopWatch = Stopwatch.StartNew();
if (!logs.Any())
return TimeSpan.Zero;
// read the setting from dictionary
var logTypeSettings = awsLogTypeSettings[typeof(TLog)];
var i = logCount;
if (logBatch == null)
logBatch = new List<InputLogEvent>();
try
{
var logGroup = Environment.GetEnvironmentVariable("LOG_GROUP");
int bufferSize = 0;
// create the stream
if (logTypeSettings.SequenceToken == null)
await logClient.CreateLogStreamAsync(new CreateLogStreamRequest { LogGroupName = logGroup, LogStreamName = logTypeSettings.LogStreamName }).ConfigureAwait(false);
// if there is any pending log batch send it due to exception/retries
if (logBatch.Count() > 0)
{
var response = await SendLogBatchAsync(logBatch, logGroup, logTypeSettings.LogStreamName, logTypeSettings.SequenceToken).ConfigureAwait(false);
logTypeSettings.SequenceToken = response.NextSequenceToken;
logBatch.Clear();
}
for (i = logCount; i < logs.Count(); i++)
{
InputLogEvent logEvent = new InputLogEvent
{
Message = logTypeSettings.ConversionFunction(logs.ElementAt(i)),
Timestamp = DateTime.Now
};
var msgSize = Encoding.UTF8.GetByteCount(logEvent.Message);
// if message size is more than the allowed limit truncate the log
if(msgSize > CLOUDWATCH_MAX_LOG_SIZE)
{
logEvent.Message = logEvent.Message.Substring(0, CLOUDWATCH_MAX_LOG_SIZE);
msgSize = CLOUDWATCH_MAX_LOG_SIZE;
}
// getting actual event payload log size by adding default size appended by Cloud Watch
msgSize += CLOUDWATCH_CONSTRAINT;
bufferSize += msgSize;
if (logBatch.Count < CLOUDWATCH_MAX_PAYLOAD_COUNT && bufferSize <= CLOUDWATCH_MAX_PAYLOAD_SIZE)
logBatch.Add(logEvent);
else
{
if (logBatch.Count > 0)
{
var response = await SendLogBatchAsync(logBatch, logGroup, logTypeSettings.LogStreamName, logTypeSettings.SequenceToken).ConfigureAwait(false);
logTypeSettings.SequenceToken = response.NextSequenceToken;
logBatch.Clear();
}
logBatch.Add(logEvent);
bufferSize = msgSize;
}
}
if (logBatch.Count > 0)
{
var response = await SendLogBatchAsync(logBatch, logGroup, logTypeSettings.LogStreamName, logTypeSettings.SequenceToken).ConfigureAwait(false);
logTypeSettings.SequenceToken = response.NextSequenceToken;
}
}
catch (ResourceNotFoundException ex)
{
// if log group does not exist then just supress, if log stream does not exist - retry
if (ex.Message != LOG_GROUP_NOTFOUND)
{
logTypeSettings.SequenceToken = null;
logTypeSettings.LogStreamName = Guid.NewGuid().ToString();
if (logTypeSettings.RetryCount > 0)
{
logTypeSettings.RetryCount--;
// resend the batch that threw exception
await SendLogsToCloudWatch(logs, i, logBatch).ConfigureAwait(false);
}
logTypeSettings.RetryCount = 2; // reset the retry count and proceed.
}
}
return stopWatch.Elapsed;
}
/// <summary>
/// Helper Method to send the log batch
/// </summary>
/// <param name="logBatch"> One batch containing many logs</param>
/// <param name="logGroup"> Location/Folder in Cloud Watch Logs where logBatch will be sent</param>
/// <param name="logStreamName"> Sub Folder in Cloud Watch Logs whers logBatch will be sent</param>
/// <param name="token"> Sequence Token is required for Cloud Watch Logs to place the logs</param>
/// <returns></returns>
public static Task<PutLogEventsResponse> SendLogBatchAsync(List<InputLogEvent> logBatch, string logGroup, string logStreamName, string token)
{
PutLogEventsRequest req = new PutLogEventsRequest
{
LogEvents = logBatch,
LogGroupName = logGroup,
LogStreamName = logStreamName,
SequenceToken = token
};
return logClient.PutLogEventsAsync(req);
}
public static string ConvertToAudit(FederationAuditLog auditLog)
{
FederationAuditAWSLogDTO audit = new FederationAuditAWSLogDTO
{
AuditID = auditLog.ID,
Domain = auditLog.Domain,
Status = auditLog.Status,
BusinessID = auditLog.RetailerID,
Event = auditLog.Event,
AuthorizedIntegratorID = auditLog.AuthorizedIntegratorID,
Request = auditLog.Request,
Response = auditLog.Response,
Started = auditLog.Started,
Completed = auditLog.Completed
};
return JsonHelper.SerializeObject(audit, true);
}
public static string ConvertToTrace(FederationTraceLog traceLog)
{
FederationTraceAWSLogDTO trace = new FederationTraceAWSLogDTO
{
ID = traceLog.ID,
Domain = traceLog.Domain,
Event = traceLog.Event,
EntityID = traceLog.EntityID,
ScopeObserverID = traceLog.ScopeObserverID,
Duration = traceLog.Duration,
LogLevel = traceLog.LogLevel,
Message = traceLog.Message,
StackTrace = traceLog.StackTrace,
LogType = traceLog.LogType,
TimeOfOccurrence = traceLog.TimeOfOccurrence,
AuditID = traceLog.FederationAuditLogID
};
return JsonHelper.SerializeObject(trace, true);
}
public static string ConvertToSystem(SystemLog sysLog)
{
SystemAWSLogDTO sys = new SystemAWSLogDTO
{
ID = sysLog.ID,
Duration = sysLog.Duration,
LogLevel = sysLog.LogLevel,
Message = sysLog.Message,
StackTrace = sysLog.StackTrace,
LogType = sysLog.LogType,
TimeOfOccurrence = sysLog.TimeOfOccurrence,
Domain = sysLog.Domain
};
return JsonHelper.SerializeObject(sys, true);
}
public static string ConvertPubSuccToAudit(PublishRestSucessLog restSuccessLog)
{
FederationAuditAWSLogDTO restSuccessAudit = new FederationAuditAWSLogDTO
{
AuditID = restSuccessLog.ID,
Domain = restSuccessLog.Domain,
Status = restSuccessLog.StatusCode,
BusinessID = restSuccessLog.RetailerID,
Event = restSuccessLog.Event,
AuthorizedIntegratorID = restSuccessLog.AuthorizedIntegratorID,
Request = JsonHelper.SerializeObject(new
{
Path = restSuccessLog.Path,
Headers = restSuccessLog.Headers,
Url = restSuccessLog.Url,
Payload = restSuccessLog.Payload
}, true),
Response = restSuccessLog.Message,
Started = restSuccessLog.TimeOfOccurence,
Completed = restSuccessLog.TimeOfOccurence
};
return JsonHelper.SerializeObject(restSuccessAudit, true);
}
public static string ConvertPubErrToAudit(PublishRestErrorLog restErrLog)
{
FederationAuditAWSLogDTO restErrAudit = new FederationAuditAWSLogDTO
{
AuditID = restErrLog.ID,
Domain = restErrLog.Domain,
Status = restErrLog.StatusCode,
BusinessID = restErrLog.RetailerID,
Event = restErrLog.Event,
AuthorizedIntegratorID = restErrLog.AuthorizedIntegratorID,
Request = JsonHelper.SerializeObject(new
{
Path = restErrLog.Path,
Headers = restErrLog.Headers,
Url = restErrLog.Url,
Payload = restErrLog.Payload
}, true),
Response = restErrLog.Message,
Started = restErrLog.TimeOfOccurence,
Completed = restErrLog.TimeOfOccurence
};
return JsonHelper.SerializeObject(restErrAudit, true);
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment