Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
using Google.Protobuf;
using Microsoft.Azure.EventHubs;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Notifier.Common;
using Notifier.Common.Settings;
using Notifier.WebApi.Common.Data;
using Notifier.WebApi.Common.Data.Entities;
using Notifier.WebApi.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Notifier.WebApi.Services
{
public class NotificationService : INotificationService
{
private readonly IRepository<NotificationEntity> _repository;
private readonly ILogger<NotificationService> _logger;
private readonly EventHubSettings _eventHubSettings;
public NotificationService(
IRepository<NotificationEntity> repository,
ILogger<NotificationService> logger,
IOptions<EventHubSettings> eventHubSettings)
{
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_eventHubSettings = eventHubSettings.Value ?? throw new ArgumentNullException(nameof(eventHubSettings.Value));
}
public async Task CreateAndSendAsync(string message, CancellationToken cancellationToken)
{
NotificationEntity entity = await CreateNotification(message, cancellationToken);
await SendMessage(entity);
}
public Task<ICollection<NotificationModel>> GetAsync(CancellationToken cancellationToken)
{
var entities = _repository.GetAll(cancellationToken);
if (entities.Any())
_logger.LogInformation("Notifications found and will be returned.");
else
_logger.LogInformation("No notifications found.");
return Task.FromResult(GetMappedNotifications(entities));
}
private async Task<NotificationEntity> CreateNotification(string message, CancellationToken cancellationToken)
{
var now = DateTime.Now;
var entity = new NotificationEntity
{
Message = message,
PartitionKey = now.ToString("yyyy-MM-dd"),
RowKey = now.ToString("hh:mm:ss.fffffff"),
Timestamp = now,
};
await _repository.AddAsync(entity, cancellationToken);
_logger.LogInformation($"Notification with the partition key: {entity.PartitionKey} and row key: {entity.RowKey} has been succesfully created.");
return entity;
}
private async Task SendMessage(NotificationEntity entity)
{
var eventHubClient = EventHubClient.CreateFromConnectionString(_eventHubSettings.ConnectionString);
var notificationMessage = new NotificationMessage
{
DateTime = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(entity.Timestamp.UtcDateTime),
Message = entity.Message,
};
byte[] messageBinary = MessageExtensions.ToByteArray(notificationMessage);
await eventHubClient.SendAsync(new EventData(new ArraySegment<byte>(messageBinary)));
_logger.LogInformation($"Notification with the partition key: {entity.PartitionKey} and row key: {entity.RowKey} has been succesfully sended to event hubs.");
}
private ICollection<NotificationModel> GetMappedNotifications(IList<NotificationEntity> entities)
{
var result = new List<NotificationModel>();
foreach (var entity in entities)
{
result.Add(new NotificationModel
{
DateTime = entity.Timestamp.DateTime,
Message = entity.Message,
});
}
return result;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment