Created
February 26, 2019 09:03
-
-
Save riccardone/bf3e7c4b5ffc754ee24c6f10615a074b to your computer and use it in GitHub Desktop.
C# ElasticSearch Synchroniser for CQRS apps
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
using System.Timers; | |
using Nest; | |
using TaskZero.ReadModels.Elastic.Model; | |
using Timer = System.Timers.Timer; | |
namespace TaskZero.ReadModels.Elastic | |
{ | |
public interface IIndexer<T> where T : class | |
{ | |
void Index(T doc); | |
void Remove(string id); | |
} | |
public class Indexer<T> : IIndexer<T> where T : class | |
{ | |
private readonly int _interval; | |
private readonly IElasticClient _elasticClient; | |
private readonly string _indexName; | |
private readonly List<T> _toBeAdded = new List<T>(); | |
private readonly List<string> _toBeDeleted = new List<string>(); | |
private Timer _timer; | |
public Indexer(int interval, IElasticClient elasticClient, string indexName) | |
{ | |
_interval = interval; | |
_elasticClient = elasticClient; | |
_indexName = indexName; | |
Init(); | |
} | |
private void Init() | |
{ | |
if (!_elasticClient.IndexExists(_indexName).Exists) | |
{ | |
_elasticClient.CreateIndex(_indexName, c => c | |
.InitializeUsing(GetIndexConfig()) | |
.Mappings(m => m.Map<T>(mp => mp.AutoMap())) | |
); | |
} | |
_timer = new Timer(_interval); | |
_timer.Elapsed += Flush; | |
_timer.AutoReset = true; | |
_timer.Enabled = true; | |
_timer.Start(); | |
} | |
private IndexState GetIndexConfig() | |
{ | |
var settings = new IndexSettings(); | |
var indexConfig = new IndexState | |
{ | |
Settings = settings | |
}; | |
return indexConfig; | |
} | |
public void Index(T doc) | |
{ | |
_toBeAdded.Add(doc); | |
} | |
public void Remove(string id) | |
{ | |
_toBeDeleted.Add(id); | |
} | |
private void Flush(object source, ElapsedEventArgs ea) | |
{ | |
if (_toBeAdded.Any()) | |
{ | |
var docs = _toBeAdded.ToList(); | |
_toBeAdded.Clear(); | |
var waitHandle = new CountdownEvent(1); | |
var bulkAll = _elasticClient.BulkAll(docs, b => b | |
.Index(_indexName) /* index */ | |
.Type<ZeroTask>() | |
.BackOffRetries(2) | |
.BackOffTime("30s") | |
.RefreshOnCompleted(true) | |
.MaxDegreeOfParallelism(4) | |
.Size(1000) | |
); | |
bulkAll.Subscribe(new BulkAllObserver( | |
onNext: (b) => { Console.Write("."); }, | |
onError: (e) => throw e, | |
onCompleted: () => waitHandle.Signal() | |
)); | |
waitHandle.Wait(); | |
} | |
if (_toBeDeleted.Any()) | |
{ | |
var toBeDeleted = _toBeDeleted.ToList(); | |
_toBeDeleted.Clear(); | |
foreach (var id in toBeDeleted) | |
_elasticClient.Delete<T>(id, d => d.Index(_indexName)); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment