using Google.Protobuf;
using Microsoft.Azure.EventHubs;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Notifier.Common;
using Notifier.Common.Settings;
using Notifier.WebApi.Common.Data;
using Notifier.WebApi.Common.Data.Entities;
using Notifier.WebApi.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Notifier.WebApi.Services
public class NotificationService : INotificationService
private readonly IRepository<NotificationEntity> _repository;
private readonly ILogger<NotificationService> _logger;
private readonly EventHubSettings _eventHubSettings;
public NotificationService(
IRepository<NotificationEntity> repository,
ILogger<NotificationService> logger,
IOptions<EventHubSettings> eventHubSettings)
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_eventHubSettings = eventHubSettings.Value ?? throw new ArgumentNullException(nameof(eventHubSettings.Value));
public async Task CreateAndSendAsync(string message, CancellationToken cancellationToken)
NotificationEntity entity = await CreateNotification(message, cancellationToken);
await SendMessage(entity);
public Task<ICollection<NotificationModel>> GetAsync(CancellationToken cancellationToken)
var entities = _repository.GetAll(cancellationToken);
if (entities.Any())
_logger.LogInformation("Notifications found and will be returned.");
_logger.LogInformation("No notifications found.");
return Task.FromResult(GetMappedNotifications(entities));
private async Task<NotificationEntity> CreateNotification(string message, CancellationToken cancellationToken)
var now = DateTime.Now;
var entity = new NotificationEntity
Message = message,
PartitionKey = now.ToString("yyyy-MM-dd"),
RowKey = now.ToString("hh:mm:ss.fffffff"),
Timestamp = now,
await _repository.AddAsync(entity, cancellationToken);
_logger.LogInformation($"Notification with the partition key: {entity.PartitionKey} and row key: {entity.RowKey} has been succesfully created.");
return entity;
private async Task SendMessage(NotificationEntity entity)
var eventHubClient = EventHubClient.CreateFromConnectionString(_eventHubSettings.ConnectionString);
var notificationMessage = new NotificationMessage
DateTime = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(entity.Timestamp.UtcDateTime),
Message = entity.Message,
byte[] messageBinary = MessageExtensions.ToByteArray(notificationMessage);
await eventHubClient.SendAsync(new EventData(new ArraySegment<byte>(messageBinary)));
_logger.LogInformation($"Notification with the partition key: {entity.PartitionKey} and row key: {entity.RowKey} has been succesfully sended to event hubs.");
private ICollection<NotificationModel> GetMappedNotifications(IList<NotificationEntity> entities)
var result = new List<NotificationModel>();
foreach (var entity in entities)
result.Add(new NotificationModel
DateTime = entity.Timestamp.DateTime,
Message = entity.Message,
return result;
