Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save florinvirdol/cbee082e2949f0f79c1b79051fc6f641 to your computer and use it in GitHub Desktop.
Save florinvirdol/cbee082e2949f0f79c1b79051fc6f641 to your computer and use it in GitHub Desktop.
Ways to get all the points in shapes or all the shapes in shapes with Geo query (geo_shape) with multiple pre-indexed shape with big geo-spatial data in ElasticSearch?
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Reflection.Metadata;
using System.Runtime.ExceptionServices;
using System.Runtime.Serialization.Formatters.Binary;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Bogus.DataSets;
using Elasticsearch.Net;
using ElasticSearch;
using GeoAPI.Geometries;
using Microsoft.Extensions.Configuration;
using Nest;
using Nest.JsonNetSerializer;
using NetTopologySuite.Features;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NetTopologySuite.IO;
// using NetTopologySuite.IO.GeoJSON;
using NetTopologySuite.Geometries;
using NetTopologySuite.IO.Converters;
using Newtonsoft.Json.Converters;
using Coordinate = NetTopologySuite.Geometries.Coordinate;
using GeometryCollection = NetTopologySuite.Geometries.GeometryCollection;
namespace ElasticSearch.IndexDemo
{
public class FrameDocumentGeoShapeSlim
{
public string Id { get; set; }
public string Town { get; set; }
[JsonProperty("location")]
[GeoShape]
public PointGeoShape Location { get; set; }
}
public class FrameDocumentGeoPointSlim : FrameDocument
{
public string Id { get; set; }
public string Town { get; set; }
[JsonProperty("location")]
public GeoLocation Location { get; set; }
}
public class GeoDocumentSlim
{
public GeoDocumentSlim(string id, string name)
{
Id = id;
Name = name;
}
public string Id { get; set; }
public string Name { get; set; }
}
[Serializable]
public class GeoDocument
{
public GeoDocument(string name, Geometry geometry)
{
Name = name;
Geometry = geometry;
}
/*public GeoDocument(string id, string name, Geometry geometry)
{
// Id = id;
Name = name;
Geometry = geometry;
}*/
// public string Id { get; set; }
public string Name { get; set; }
public Geometry Geometry { get; set; }
}
static class Program
{
static async Task Main(string[] args)
{
var normalQuery = MakeNormalQuery<GeoDocumentSlim>(flagMatchAll: true);
//WhenAll Tasks
await QueryAllWithScrollAPIAsyncByQueryAsyncMultipleParallel(normalQuery);
//Nested IAsyncEnumerables
await QueryAllWithScrollAPIAsyncByQueryAsyncMultipleParallel11(normalQuery);
//WhenAll Tasks in Batches
await QueryAllWithScrollAPIAsyncByQueryAsyncMultipleParallel12(normalQuery);
}
public static (string indexName, string indexField, string preIndexedShapeIndexName, string preIndexedShapeId,
string preIndexPath, GeoShapeRelation geoShapeRelation, int numberOfSlices, int maxDegreeOfParallelism,
string pointsIndexName, string shapesIndexName, string pointsIndexField, string shapesIndexField)
SetQueryDefaults()
{
//these actually come from a config file
var indexName = "index-name";
//geoshapes
var pointsIndexName = "index-geodoc-geoshape-points-3";
//geopoints
var shapesIndexName = "index-geodoc-geoshape-shapes-2";
var pointsIndexField = PointsIndexField ?? "location";
var shapesIndexField = ShapesIndexField ?? "geometry";
//for frames
var indexField = "location";
//for shapes
// var indexField = "geometry";
var preIndexedShapeIndexName = "pre-index-name"; //where searching
var preIndexedShapeId = "_id_"; //where searching
var preIndexPath = "geometry";
// var geoShapeRelation = GeoShapeRelation.Intersects;
var geoShapeRelation = GeoShapeRelation.Within;
QueryContainerDescriptor<FrameDocumentGeoShape> x = null;
int numberOfSlices = NumberOfShards + NumberOfReplicas;
int maxDegreeOfParallelism = numberOfSlices * 3;
return (indexName, indexField, preIndexedShapeIndexName, preIndexedShapeId, preIndexPath, geoShapeRelation,
numberOfSlices, maxDegreeOfParallelism, pointsIndexName, shapesIndexName, pointsIndexField,
shapesIndexField);
}
public static QueryContainer MakeNormalQuery<T>(string fieldKey = "", string fieldValue = "",
bool flagMatchAll = false) where T : class
{
var qcd = new QueryContainerDescriptor<T>();
var query = qcd
// .Term(f => f.Mediaowner, "ClearChannel")
.Match(m => m
// .Field(f => f.Mediaowner)
.Field(fieldKey)
.Query(fieldValue)
);
if (flagMatchAll)
{
query = qcd
.MatchAll();
}
return query;
}
public static QueryContainer MakeGeoShapeQuery<T>(string indexField, string preIndexName, string preIndexId,
string preIndexPath, GeoShapeRelation geoShapeRelation) where T : class
{
var qcd = new QueryContainerDescriptor<T>();
var query = qcd.GeoShape(c => c
.Name("named_query")
.Boost(1.1)
.Field(indexField)
.Relation(geoShapeRelation)
.IndexedShape(p => p
.Index(preIndexName)
.Id(preIndexId)
.Path(preIndexPath)
// .Routing(Project.Instance.Name)
)
);
return query;
}
public static async Task AsyncParallelForEach<T>(this IAsyncEnumerable<T> source, Func<T, Task> body,
int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler scheduler = null)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
};
if (scheduler != null)
options.TaskScheduler = scheduler;
var block = new ActionBlock<T>(body, options);
await foreach (var item in source)
block.Post(item);
block.Complete();
await block.Completion;
}
public static async IAsyncEnumerable<T> QueryIAsyncEnumerable<T>(
ElasticClient _elasticClient,
QueryContainer queryContainer,
string indexName,
bool flagOnlyIds = false
) where T : class
{
var result = await _elasticClient.SearchAsync<T>(q => q
.Index(indexName)
.Query(q => queryContainer)
.Size(SearchBulkBatchSize)
.Scroll(SearchAsyncScrollTime)
// .Sort(w => w.Ascending(e => e.Timestamp))
.Sort(so => so.Field(f => f.Field("_doc")))
);
if (!result.IsValid)
{
yield break;
throw new Exception($"Error searching: {result.DebugInformation}");
}
//var collection = result.Documents;
IEnumerable<T> collection = result.Documents;
if (flagOnlyIds)
{
//only for GeoDocumentSlim
collection = (IEnumerable<T>) result.Hits.Select(h => new GeoDocumentSlim(h.Id, "hardcoded-name"));
}
//todo check with initial implementation as other function
while (collection.Any())
{
foreach (var document in collection)
{
yield return document;
}
result = await _elasticClient.ScrollAsync<T>(SearchBulkBatchSize, result.ScrollId);
collection = result.Documents;
}
}
public static async IAsyncEnumerable<T> QueryIAsyncEnumerableIds<T>(
ElasticClient _elasticClient,
QueryContainer queryContainer,
string indexName,
bool flagOnlyIds = false
) where T : class
{
var result = await _elasticClient.SearchAsync<T>(q => q
.Index(indexName)
.Query(q => queryContainer)
.Size(SearchBulkBatchSize)
.Scroll(SearchAsyncScrollTime)
// .Sort(w => w.Ascending(e => e.Timestamp))
.Sort(so => so.Field(f => f.Field("_doc")))
);
if (!result.IsValid)
{
yield break;
throw new Exception($"Error searching: {result.DebugInformation}");
}
// var alteredDocument = new GeoDocumentSlim(document.Id, "hardcoded-name");
//var collection = result.Documents;
IEnumerable<T> collection = result.Documents;
if (flagOnlyIds)
{
//only for GeoDocumentSlim
collection = (IEnumerable<T>) result.Hits.Select(h => new GeoDocumentSlim(h.Id, "hardcoded-name"));
}
//todo check with initial implementation as other function
while (collection.Any())
{
foreach (var document in collection)
{
yield return document;
}
result = await _elasticClient.ScrollAsync<T>(SearchBulkBatchSize, result.ScrollId);
collection = result.Documents;
}
}
public static async IAsyncEnumerable<T> QueryIAsyncEnumerable2<T>(
ElasticClient _elasticClient,
QueryContainer queryContainer,
string indexName,
bool flagOnlyIds = false
) where T : class
{
var result = await _elasticClient.SearchAsync<T>(q => q
.Index(indexName)
.Query(q => queryContainer)
.Size(SearchBulkBatchSize)
.Scroll(SearchAsyncScrollTime)
// .Sort(w => w.Ascending(e => e.Timestamp))
.Sort(so => so.Field(f => f.Field("_doc")))
);
if (!result.IsValid)
{
yield break;
throw new Exception($"Error searching: {result.DebugInformation}");
}
while (result.Documents.Any())
{
foreach (var document in result.Documents)
{
yield return document;
}
result = await _elasticClient.ScrollAsync<T>(SearchBulkBatchSize, result.ScrollId);
}
}
public static async Task QueryAllWithScrollAPIAsyncByQueryAsyncMultipleParallel(QueryContainer queryContainer)
{
//https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/scrolling-documents.html#scrollall-observable
//bulkall & scrollall
//https://github.com/elastic/elasticsearch-net/blob/67fe8ce6cbaea8ab593dfe77d038fa5b6bb4f903/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs
var queryDefaults = SetQueryDefaults();
var logsList = new System.Collections.Generic.List<string>();
var elasticClient = InitElasticClient(logsList, queryDefaults.indexName);
List<FrameDocumentGeoShape> searchedDocuments = new List<FrameDocumentGeoShape>();
var watch = Stopwatch.StartNew();
/*var searchedDocumentsConcurrentBag = await QueryAllWithScrollAPIAsyncByDetails<FrameDocumentGeoShape>(queryDefaults.numberOfSlices, queryDefaults.maxDegreeOfParallelism, queryDefaults.indexName, queryDefaults.indexField, queryDefaults.preIndexedShapeIndexName, queryDefaults.preIndexedShapeId, queryDefaults.preIndexPath, queryDefaults.geoShapeRelation, elasticClient);*/
// var searchedDocumentsConcurrentBag = await QueryAllWithScrollAPIAsyncByDetails<FrameDocumentGeoShape>(queryDefaults.numberOfSlices, queryDefaults.maxDegreeOfParallelism, queryDefaults.indexName, elasticClient);
//slim docs
// var searchedDocumentsConcurrentBag = await QueryAllWithScrollAPIAsyncByDetails<FrameDocumentGeoShapeSlim>(numberOfSlices, maxDegreeOfParallelism, indexName, indexField, preIndexedShapeIndexName, preIndexedShapeId, preIndexPath, geoShapeRelation);
//query all pre-indexed shapes
// ConcurrentBag<GeoDocumentSlim> searchedShapesDocumentsConcurrentBag = new ConcurrentBag<GeoDocumentSlim>();
var searchedShapesDocumentsConcurrentBag = QueryAllWithScrollAPIAsyncByDetailsOnlyIds<GeoDocumentSlim>(
queryDefaults.numberOfSlices, queryDefaults.maxDegreeOfParallelism, queryDefaults.shapesIndexName,
elasticClient, queryContainer, true);
var ids = searchedShapesDocumentsConcurrentBag.ToList();
//for each pre-indexed shapes
// query all QueryAllWithScrollAPIAsyncByDetails by index name and each pre-indexed shape
// Parallel.ForEach()
ConcurrentBag<FrameDocumentGeoShapeSlim> searchedPointsDocumentsConcurrentBag =
new ConcurrentBag<FrameDocumentGeoShapeSlim>();
//https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach
var searchTasks = ids.Select(i =>
{
var spatialQuery = MakeGeoShapeQuery<FrameDocumentGeoShapeSlim>(queryDefaults.pointsIndexField,
queryDefaults.shapesIndexName, i, queryDefaults.shapesIndexField,
queryDefaults.geoShapeRelation);
return elasticClient.SearchAsync<FrameDocumentGeoShapeSlim>(s => s
.Index(queryDefaults.pointsIndexName)
.Size(SearchBulkBatchSize)
.Sort(srt => srt
.Ascending("_id")
)
.Query(q => spatialQuery)
);
});
//TODO batch instead of WhenAll
var searchResponses = await Task.WhenAll(searchTasks);
//parallel?
/*foreach (var searchResponse in searchResponses)
{
if (!searchResponse.IsValid)
{
throw new Exception($"Error searching: {searchResponse.DebugInformation}");
}
var searchedPointsDocumentsConcurrentBagTemp = searchResponse.Documents;
foreach (var searchedPoint in searchedPointsDocumentsConcurrentBagTemp)
{
searchedPointsDocumentsConcurrentBag.Add(searchedPoint);
}
}*/
Parallel.ForEach(searchResponses, searchResponse =>
{
if (!searchResponse.IsValid)
{
throw new Exception($"Error searching: {searchResponse.DebugInformation}");
}
var searchedPointsDocumentsConcurrentBagTemp = searchResponse.Documents;
foreach (var searchedPoint in searchedPointsDocumentsConcurrentBagTemp)
{
searchedPointsDocumentsConcurrentBag.Add(searchedPoint);
}
});
//!!! for postaldistricts with 500k frames => Throw The request was canceled due to the configured HttpClient.Timeout of 60 seconds elapsing
//TPL //https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach
/*var getTransformBlock = new TransformBlock<string, ISearchResponse<FrameDocumentGeoShapeSlim>>(
async i =>
{
var spatialQuery = MakeGeoShapeQuery<FrameDocumentGeoShapeSlim>(queryDefaults.pointsIndexField,
queryDefaults.shapesIndexName, i, queryDefaults.shapesIndexField,
queryDefaults.geoShapeRelation);
return await elasticClient.SearchAsync<FrameDocumentGeoShapeSlim>(s => s
.Index(queryDefaults.pointsIndexName)
.Size(10000)
.Sort(srt => srt
.Ascending("_id")
)
.Query(q => spatialQuery)
);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeActionBlock = new ActionBlock<ISearchResponse<FrameDocumentGeoShapeSlim>>(c =>
{
if (!c.IsValid)
{
throw new Exception($"Error searching: {c.DebugInformation}");
}
var searchedPointsDocumentsConcurrentBagTemp s= c.Documents;
foreach (var searchedPoint in searchedPointsDocumentsConcurrentBagTemp)
{
searchedPointsDocumentsConcurrentBag.Add(searchedPoint);
}
});
getTransformBlock.LinkTo(
writeActionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getTransformBlock.Post(id);
getTransformBlock.Complete();
writeActionBlock.Completion.Wait();*/
var shapesCount = 0;
var pointsCount = 0;
var totalSearchedDocuments = 0;
var totalSearchedDocumentsConcurrentBag = 0;
totalSearchedDocumentsConcurrentBag = searchedPointsDocumentsConcurrentBag.Count;
totalSearchedDocuments = searchedDocuments.Count;
watch.Stop();
var formattedElapsedTime = FormatElapsedTimeFromMilliseconds(watch.ElapsedMilliseconds);
var statisticsList = new System.Collections.Generic.List<string>
{
$"Search method: Query ALL With Scroll API by two indices (matchmaking) WhenAll Tasks",
$"ES Index (search results from this index) : {queryDefaults.pointsIndexName}",
$"ES Index Pre-Index-Shapes: {queryDefaults.shapesIndexName}",
$"Query relation : {queryDefaults.geoShapeRelation}",
$"Number of shards: {NumberOfShards}",
$"Number of replicas: {NumberOfReplicas}",
$"Number of slices: {queryDefaults.numberOfSlices}",
$"Degree of parallelism: {queryDefaults.maxDegreeOfParallelism}",
$"Processor count: {MaxDegreeOfParallelismProcessorCount}",
$"Search Bulk batch size: {SearchBulkBatchSize}",
$"Total searched documents: {totalSearchedDocuments}",
$"Total searched documents concurrent bag: {totalSearchedDocumentsConcurrentBag}",
$"Total shapes count: {shapesCount}",
$"Total points count: {pointsCount}",
$"Total Elapsed time: {formattedElapsedTime}",
};
Console.WriteLine(string.Join("\n", statisticsList));
}
public static async Task QueryAllWithScrollAPIAsyncByQueryAsyncMultipleParallel12(QueryContainer queryContainer)
{
//https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/scrolling-documents.html#scrollall-observable
//bulkall & scrollall
//https://github.com/elastic/elasticsearch-net/blob/67fe8ce6cbaea8ab593dfe77d038fa5b6bb4f903/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs
var queryDefaults = SetQueryDefaults();
var logsList = new System.Collections.Generic.List<string>();
var elasticClient = InitElasticClient(logsList, queryDefaults.indexName);
List<FrameDocumentGeoShape> searchedDocuments = new List<FrameDocumentGeoShape>();
var watch = Stopwatch.StartNew();
//TODO!!!!! matchmaking
//query all pre-indexed shapes
// ConcurrentBag<GeoDocumentSlim> searchedShapesDocumentsConcurrentBag = new ConcurrentBag<GeoDocumentSlim>();
var searchedShapesDocumentsConcurrentBag = QueryAllWithScrollAPIAsyncByDetailsOnlyIds<GeoDocumentSlim>(
queryDefaults.numberOfSlices, queryDefaults.maxDegreeOfParallelism, queryDefaults.shapesIndexName,
elasticClient, queryContainer, true);
var ids = searchedShapesDocumentsConcurrentBag.ToList();
//for each pre-indexed shapes
// query all QueryAllWithScrollAPIAsyncByDetails by index name and each pre-indexed shape
// Parallel.ForEach()
ConcurrentBag<FrameDocumentGeoShapeSlim> searchedPointsDocumentsConcurrentBag =
new ConcurrentBag<FrameDocumentGeoShapeSlim>();
//https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach
var searchTasks = ids.Select(i =>
{
var spatialQuery = MakeGeoShapeQuery<FrameDocumentGeoShapeSlim>(queryDefaults.pointsIndexField,
queryDefaults.shapesIndexName, i, queryDefaults.shapesIndexField,
queryDefaults.geoShapeRelation);
return elasticClient.SearchAsync<FrameDocumentGeoShapeSlim>(s => s
.Index(queryDefaults.pointsIndexName)
.Size(SearchBulkBatchSize)
.Sort(srt => srt
.Ascending("_id")
)
.Query(q => spatialQuery)
);
});
var allBatchListsSearchTasks = GetBatches<Task<ISearchResponse<FrameDocumentGeoShapeSlim>>>(searchTasks.ToList(), SearchTasksBatchSize);
foreach (var searchTasks2 in allBatchListsSearchTasks)
{
var searchResponses2 = await Task.WhenAll(searchTasks2);
Parallel.ForEach(searchResponses2, searchResponse =>
{
if (!searchResponse.IsValid)
{
throw new Exception($"Error searching: {searchResponse.DebugInformation}");
}
var searchedPointsDocumentsConcurrentBagTemp = searchResponse.Documents;
foreach (var searchedPoint in searchedPointsDocumentsConcurrentBagTemp)
{
searchedPointsDocumentsConcurrentBag.Add(searchedPoint);
}
});
}
var shapesCount = 0;
var pointsCount = 0;
var totalSearchedDocuments = 0;
var totalSearchedDocumentsConcurrentBag = 0;
totalSearchedDocumentsConcurrentBag = searchedPointsDocumentsConcurrentBag.Count;
totalSearchedDocuments = searchedDocuments.Count;
watch.Stop();
var formattedElapsedTime = FormatElapsedTimeFromMilliseconds(watch.ElapsedMilliseconds);
var statisticsList = new System.Collections.Generic.List<string>
{
$"Search method: Query ALL With Scroll API by two indices (matchmaking) WhenAll Tasks in Batches",
$"ES Index (search results from this index) : {queryDefaults.pointsIndexName}",
$"ES Index Pre-Index-Shapes: {queryDefaults.shapesIndexName}",
$"Query relation : {queryDefaults.geoShapeRelation}",
$"Number of shards: {NumberOfShards}",
$"Number of replicas: {NumberOfReplicas}",
$"Number of slices: {queryDefaults.numberOfSlices}",
$"Degree of parallelism: {queryDefaults.maxDegreeOfParallelism}",
$"Processor count: {MaxDegreeOfParallelismProcessorCount}",
$"Search Tasks batch size: {SearchTasksBatchSize}",
$"Search Bulk batch size: {SearchBulkBatchSize}",
$"Total searched documents: {totalSearchedDocuments}",
$"Total searched documents concurrent bag: {totalSearchedDocumentsConcurrentBag}",
$"Total shapes count: {shapesCount}",
$"Total points count: {pointsCount}",
$"Total Elapsed time: {formattedElapsedTime}",
};
Console.WriteLine(string.Join("\n", statisticsList));
}
public static async Task QueryAllWithScrollAPIAsyncByQueryAsyncMultipleParallel11(QueryContainer queryContainer)
{
//https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/scrolling-documents.html#scrollall-observable
//bulkall & scrollall
//https://github.com/elastic/elasticsearch-net/blob/67fe8ce6cbaea8ab593dfe77d038fa5b6bb4f903/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs
var queryDefaults = SetQueryDefaults();
var logsList = new System.Collections.Generic.List<string>();
var elasticClient = InitElasticClient(logsList, queryDefaults.indexName);
List<FrameDocumentGeoShape> searchedDocuments = new List<FrameDocumentGeoShape>();
var watch = Stopwatch.StartNew();
//query all pre-indexed shapes
ConcurrentBag<GeoDocumentSlim> searchedShapesDocumentsConcurrentBag = new ConcurrentBag<GeoDocumentSlim>();
//for each pre-indexed shapes
// query all QueryAllWithScrollAPIAsyncByDetails by index name and each pre-indexed shape
// Parallel.ForEach()
ConcurrentBag<FrameDocumentGeoShapeSlim> searchedPointsDocumentsConcurrentBag =
new ConcurrentBag<FrameDocumentGeoShapeSlim>();
//https://medium.com/@alex.puiu/parallel-foreach-async-in-c-36756f8ebe62
//https://scatteredcode.net/parallel-foreach-async-in-c/
/*using (var session = documentStore.OpenSession())
{
session.Advanced.MaxNumberOfRequestsPerSession = int.MaxValue;
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
await GetDocumentsFromDatabase(session).AsyncParallelForEach(async entry => {
Console.WriteLine($"Processing entry '{entry.Id}'");
}, 20, TaskScheduler.FromCurrentSynchronizationContext()
);
}*/
var shapesCount = 0;
var pointsCount = 0;
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
// var shapesAsyncEnumerable = QueryIAsyncEnumerable<GeoDocumentSlim>(elasticClient, queryContainer, queryDefaults.shapesIndexName, true);
var shapesAsyncEnumerable = QueryIAsyncEnumerableIds<GeoDocumentSlim>(elasticClient, queryContainer,
queryDefaults.shapesIndexName, true);
await shapesAsyncEnumerable.AsyncParallelForEach(
async entry =>
{
//for each shape id
// shapesCount++;
Interlocked.Increment(ref shapesCount);
// searchedShapesDocumentsConcurrentBag.Add(entry);
// Console.WriteLine($"Processing shape entry '{entry.Id}'");
var spatialQuery = MakeGeoShapeQuery<FrameDocumentGeoShapeSlim>(queryDefaults.pointsIndexField,
queryDefaults.shapesIndexName, entry.Id, queryDefaults.shapesIndexField,
queryDefaults.geoShapeRelation);
//for each shapeId => launch multiple frameSearch
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
// var pointsAsyncEnumerable = QueryIAsyncEnumerable<FrameDocumentGeoShapeSlim>(elasticClient, spatialQuery, queryDefaults.pointsIndexName, false);
var pointsAsyncEnumerable = QueryIAsyncEnumerable2<FrameDocumentGeoShapeSlim>(elasticClient,
spatialQuery, queryDefaults.pointsIndexName, false);
await pointsAsyncEnumerable.AsyncParallelForEach(
async entry2 =>
{
//each frame result
// pointsCount++;
Interlocked.Increment(ref pointsCount);
// Console.WriteLine($"Processing frame entry '{entry.Town}'");
searchedPointsDocumentsConcurrentBag.Add(entry2);
}, 40,
TaskScheduler.FromCurrentSynchronizationContext()
);
/*await foreach (var point in pointsAsyncEnumerable)
{
Interlocked.Increment(ref pointsCount);
searchedPointsDocumentsConcurrentBag.Add(point);
}*/
}, 20,
TaskScheduler.FromCurrentSynchronizationContext()
);
/*await foreach (var shape in shapesAsyncEnumerable)
{
var spatialQuery = MakeGeoShapeQuery<FrameDocumentGeoShapeSlim>(queryDefaults.pointsIndexField,
queryDefaults.shapesIndexName, shape.Id, queryDefaults.shapesIndexField,
queryDefaults.geoShapeRelation);
var pointsAsyncEnumerable = QueryIAsyncEnumerable<FrameDocumentGeoShapeSlim>(elasticClient, spatialQuery,
queryDefaults.pointsIndexName, false);
await foreach (var point in pointsAsyncEnumerable)
{
searchedPointsDocumentsConcurrentBag.Add(point);
}
}*/
var totalSearchedDocuments = 0;
var totalSearchedDocumentsConcurrentBag = 0;
totalSearchedDocumentsConcurrentBag = searchedPointsDocumentsConcurrentBag.Count;
totalSearchedDocuments = searchedDocuments.Count;
watch.Stop();
var formattedElapsedTime = FormatElapsedTimeFromMilliseconds(watch.ElapsedMilliseconds);
var statisticsList = new System.Collections.Generic.List<string>
{
$"Search method: Query ALL With Scroll API by two indices (matchmaking) Nested IAsyncEnumerables",
$"ES Index (search results from this index) : {queryDefaults.pointsIndexName}",
$"ES Index Pre-Index-Shapes: {queryDefaults.shapesIndexName}",
$"Query relation : {queryDefaults.geoShapeRelation}",
$"Number of shards: {NumberOfShards}",
$"Number of replicas: {NumberOfReplicas}",
$"Number of slices: {queryDefaults.numberOfSlices}",
$"Degree of parallelism: {queryDefaults.maxDegreeOfParallelism}",
$"Processor count: {MaxDegreeOfParallelismProcessorCount}",
$"Search Bulk batch size: {SearchBulkBatchSize}",
$"Total searched documents: {totalSearchedDocuments}",
$"Total searched documents concurrent bag: {totalSearchedDocumentsConcurrentBag}",
$"Total shapes count: {shapesCount}",
$"Total points count: {pointsCount}",
$"Total Elapsed time: {formattedElapsedTime}",
};
Console.WriteLine(string.Join("\n", statisticsList));
}
static ConcurrentBag<string> QueryAllWithScrollAPIAsyncByDetailsOnlyIds<T>(int numberOfSlices,
int maxDegreeOfParallelism, string indexName, ElasticClient elasticClient,
QueryContainer queryContainer = null, bool flagSlimDocs = false) where T : class
{
var searchedDocumentsConcurrentBag = new ConcurrentBag<string>();
;
var totalSearchedDocuments = 0;
var totalSearchedDocumentsConcurrentBag = 0;
IObservable<ScrollAllResponse<T>> scrollAllObservable;
scrollAllObservable = elasticClient.ScrollAll<T>("1m", numberOfSlices, sc => sc
.MaxDegreeOfParallelism(maxDegreeOfParallelism)
.Search(s => s
.Index(indexName)
.Size(SearchBulkBatchSize) //only ~6k?
/*.Source(sf =>
{
//todo convert to array
if (flagSlimDocs)
{
return sf
.Includes(i => i // <1> **Include** the following fields
.Fields(
"_id",
"name"
// "location"
)
);
}
return sf;
})*/
// .FilterPath(new[] { "hits.hits._id" })
.Sort(so => so.Field(f => f.Field("_doc")))
.Query(q => queryContainer)
)
);
var waitHandle = new ManualResetEvent(false);
Exception exception = null;
scrollAllObservable.Subscribe(new ScrollAllObserver<T>(
onNext: r =>
{
var ids = r.SearchResponse.Hits.Select(h => h.Id);
Parallel.ForEach(ids, id => { searchedDocumentsConcurrentBag.Add(id); });
},
onError: e =>
{
exception = e;
// Console.WriteLine($"Error {e.Message} \n {e.DebugInformation}");
Console.WriteLine($"Error {e.Message} \n e.InnerException?.DebugInformation");
waitHandle.Set();
},
onCompleted: () => waitHandle.Set()
));
waitHandle.WaitOne();
if (exception != null)
throw exception;
return searchedDocumentsConcurrentBag;
}
public static List<List<T>> GetBatches<T>(List<T> ids, int batchSize)
{
var list = new List<List<T>>();
for (int i = 0; i < ids.Count; i += batchSize)
{
list.Add(ids.GetRange(i, Math.Min(batchSize, ids.Count - i)));
}
return list;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment