Skip to content

Instantly share code, notes, and snippets.

@rikkit
Created December 21, 2015 12:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rikkit/4e8e48f8930b895c1ca8 to your computer and use it in GitHub Desktop.
Save rikkit/4e8e48f8930b895c1ca8 to your computer and use it in GitHub Desktop.
Nest Bulk Searcher
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using Nest;
using _____.Managers;
namespace _____.Block
{
public class NestBulkSearcher<T> : INestBulkSearcher<T> where T : class
{
internal class NestSearchOperation
{
public Func<SearchDescriptor<T>, SearchDescriptor<T>> Operation { get; set; }
public ISearchResponse<T> Response { get; set; }
public CancellationTokenSource Cts { get; set; }
public Exception Error { get; set; }
}
private readonly Subject<NestSearchOperation> _subject;
private readonly TimeSpan _maxTaskWait;
public IElasticClient Client { get; }
public NestBulkSearcher(IElasticClient nest, BulkPublisherConfig config)
: this(nest, config.MsearchSize, config.PublishFrequency, config.Tolerance)
{
}
public NestBulkSearcher(IElasticClient nest, int bufferLimit, TimeSpan publishFrequency, TimeSpan tolerance)
{
Client = nest;
_subject = new Subject<NestSearchOperation>();
_maxTaskWait = tolerance;
// Publish items every so often
Observable
.Concat(_subject)
.Buffer(publishFrequency, bufferLimit)
.Subscribe(PublishBuffer);
}
/// <summary>
/// Executes a bulk search that will return exactly one item, or nothing.
/// </summary>
public async Task<Result<ISearchResponse<T>>> SearchAsync(Func<SearchDescriptor<T>, SearchDescriptor<T>> context)
{
var cts = new CancellationTokenSource();
var bulkOp = new NestSearchOperation
{
Operation = context,
Cts = cts
};
// Publish the operation, this will be picked up by the subscribing observable.
_subject.OnNext(bulkOp);
// This isn't great.
// Delay on this thread for longer than it will take the buffer to be published. Expect a cancellation.
try
{
await Task.Delay(_maxTaskWait, cts.Token).ConfigureAwait(false);
}
catch (TaskCanceledException tce)
{
}
// If the cancellation wasn't requested, then the bulk response didn't get sent
if (!cts.IsCancellationRequested)
{
return Result<ISearchResponse<T>>.SetException(new ApplicationException("The msearch request was not sent within the expected time limit."));
}
return bulkOp.Response?.IsValid ?? false
? Result<ISearchResponse<T>>.SetValue(bulkOp.Response)
: Result<ISearchResponse<T>>.SetException(bulkOp.Response?.RequestInformation.OriginalException
?? bulkOp.Error
?? new ApplicationException("An unknown error occurred with the msearch request."));
}
private void PublishBuffer(IList<NestSearchOperation> currentBuffer)
{
if (!currentBuffer.Any())
{
return;
}
var searchDescriptor = new MultiSearchDescriptor();
foreach (var searchOp in currentBuffer)
{
searchDescriptor.Search(searchOp.Operation);
}
try
{
var bulkResponse = Client.MultiSearch(searchDescriptor);
var items = bulkResponse.GetResponses<T>().ToList();
// Set response values and release all waiting tasks
var zip = currentBuffer.Zip(items, (op, result) => new { op, result });
foreach (var a in zip)
{
a.op.Response = a.result;
a.op.Cts.Cancel();
}
}
catch (Exception e)
{
foreach (var op in currentBuffer)
{
op.Error = e;
op.Cts.Cancel();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment