Skip to content

Instantly share code, notes, and snippets.

@rikkit
Created December 21, 2015 12:04
Show Gist options
  • Save rikkit/93ca50a3c20ed6610de6 to your computer and use it in GitHub Desktop.
Save rikkit/93ca50a3c20ed6610de6 to your computer and use it in GitHub Desktop.
Bulk Nest Publisher
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Nest;
using TheFilter.Core.Entity.Customers.Managers;
/// <summary>
/// Publishes bulk ES operations in batches
/// </summary>
public class NestBulkPublisher : INestBulkPublisher
{
internal class NestBulkOperation
{
public Func<BulkDescriptor, BulkDescriptor> Operation { get; set; }
public BulkOperationResponseItem Response { get; set; }
public CancellationTokenSource Cts { get; set; }
public Exception Error { get; set; }
}
private readonly Subject<NestBulkOperation> _bulkSubject;
private readonly Subject<System.Reactive.Unit> _flushSubject;
private readonly TimeSpan _maxTaskWait;
private readonly TimeSpan _bufferTimeoutMs;
private readonly bool _shouldRefresh;
public event EventHandler<int> Published;
public IElasticClient Client { get; }
public NestBulkPublisher(IElasticClient nest, BulkPublisherConfig config)
: this(nest, config.BulkSize, config.PublishFrequency, config.Tolerance)
{
}
public NestBulkPublisher(IElasticClient nest, int bufferLimit, TimeSpan publishFrequency, TimeSpan tolerance, bool refresh = true)
{
Client = nest;
_bulkSubject = new Subject<NestBulkOperation>();
_flushSubject = new Subject<System.Reactive.Unit>();
_bufferTimeoutMs = publishFrequency;
_maxTaskWait = tolerance;
_shouldRefresh = refresh;
// Publish items after x ms, y items, or when FlushAsync() is called
var bufferTrigger = _bulkSubject
.Buffer(publishFrequency, bufferLimit)
.Select(op => System.Reactive.Unit.Default)
.Merge(_flushSubject);
_bulkSubject
.Buffer(() => bufferTrigger)
.Subscribe(PublishBuffer);
}
public Task<Result<BulkCreateResponseItem>> CreateAsync<T>(Func<BulkCreateDescriptor<T>, BulkCreateDescriptor<T>> operation)
where T : class
{
return BulkAsync<BulkCreateResponseItem>(bulk => bulk.Create(operation));
}
public Task<Result<BulkIndexResponseItem>> IndexAsync<T>(Func<BulkIndexDescriptor<T>, BulkIndexDescriptor<T>> operation)
where T : class
{
return BulkAsync<BulkIndexResponseItem>(bulk => bulk.Index(operation));
}
public Task<Result<BulkUpdateResponseItem>> UpdateAsync<T, U>(Func<BulkUpdateDescriptor<T, U>, BulkUpdateDescriptor<T, U>> operation)
where T : class
where U : class
{
return BulkAsync<BulkUpdateResponseItem>(bulk => bulk.Update(operation));
}
public Task<Result<BulkDeleteResponseItem>> DeleteAsync<T>(Func<BulkDeleteDescriptor<T>, BulkDeleteDescriptor<T>> operation)
where T : class
{
return BulkAsync<BulkDeleteResponseItem>(bulk => bulk.Delete(operation));
}
/// <summary>
/// Add the bulk operation to the queue, and return when done.
/// </summary>
private async Task<Result<T>> BulkAsync<T>(Func<BulkDescriptor, BulkDescriptor> operation) where T : BulkOperationResponseItem
{
var cts = new CancellationTokenSource();
var bulkOp = new NestBulkOperation
{
Operation = operation,
Cts = cts
};
// Publish the operation, this will be picked up by the subscribing observable.
_bulkSubject.OnNext(bulkOp);
// This isn't great.
// Delay on this thread for longer than it will take the buffer to be published. Expect a cancellation.
try
{
await Task.Delay(_maxTaskWait, cts.Token).ConfigureAwait(false);
}
catch (TaskCanceledException tce)
{
}
if (!cts.IsCancellationRequested)
{
return Result<T>.SetException(bulkOp.Error ?? new TaskCanceledException("The bulk request was not sent within the expected time limit."));
}
return !bulkOp.Response.IsValid
? Result<T>.SetException(new ApplicationException(bulkOp.Response.Error)) // TODO throw different kinds of exception based on actual error
: Result<T>.SetValue((T) bulkOp.Response);
}
private void PublishBuffer(IList<NestBulkOperation> currentBuffer)
{
if (!currentBuffer.Any())
{
return;
}
var bulkDescriptor = new BulkDescriptor();
foreach (var bulkOperation in currentBuffer)
{
bulkOperation.Operation(bulkDescriptor);
}
bulkDescriptor.Refresh(_shouldRefresh);
var bulkResponse = Client.Bulk(bulkDescriptor);
Published?.Invoke(this, currentBuffer.Count);
if (bulkResponse.Items != null)
{
// Set response values and release all waiting tasks
var zip = currentBuffer.Zip(bulkResponse.Items, (op, result) => new {op, result});
foreach (var a in zip)
{
a.op.Response = a.result;
a.op.Cts.Cancel();
}
}
else
{
foreach (var operation in currentBuffer)
{
operation.Error = bulkResponse.ConnectionStatus.OriginalException;
}
}
}
/// <summary>
/// Flush all contents of the buffer.
/// </summary>
public async Task FlushAsync()
{
_flushSubject.OnNext(System.Reactive.Unit.Default);
await Task.Delay(_bufferTimeoutMs);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment