Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Ways to get all the points in shapes or all the shapes in shapes with Geo query (geo_shape) with multiple pre-indexed shape with big geo-spatial data in ElasticSearch?
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Reflection.Metadata;
using System.Runtime.ExceptionServices;
using System.Runtime.Serialization.Formatters.Binary;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Bogus.DataSets;
using Elasticsearch.Net;
using ElasticSearch;
using GeoAPI.Geometries;
using Microsoft.Extensions.Configuration;
using Nest;
using Nest.JsonNetSerializer;
using NetTopologySuite.Features;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NetTopologySuite.IO;
// using NetTopologySuite.IO.GeoJSON;
using NetTopologySuite.Geometries;
using NetTopologySuite.IO.Converters;
using Newtonsoft.Json.Converters;
using Coordinate = NetTopologySuite.Geometries.Coordinate;
using GeometryCollection = NetTopologySuite.Geometries.GeometryCollection;
namespace ElasticSearch.IndexDemo
{
public class FrameDocumentGeoShapeSlim
{
public string Id { get; set; }
public string Town { get; set; }
[JsonProperty("location")]
[GeoShape]
public PointGeoShape Location { get; set; }
}
public class FrameDocumentGeoPointSlim : FrameDocument
{
public string Id { get; set; }
public string Town { get; set; }
[JsonProperty("location")]
public GeoLocation Location { get; set; }
}
public class GeoDocumentSlim
{
public GeoDocumentSlim(string id, string name)
{
Id = id;
Name = name;
}
public string Id { get; set; }
public string Name { get; set; }
}
[Serializable]
public class GeoDocument
{
public GeoDocument(string name, Geometry geometry)
{
Name = name;
Geometry = geometry;
}
/*public GeoDocument(string id, string name, Geometry geometry)
{
// Id = id;
Name = name;
Geometry = geometry;
}*/
// public string Id { get; set; }
public string Name { get; set; }
public Geometry Geometry { get; set; }
}
static class Program
{
static async Task Main(string[] args)
{
var normalQuery = MakeNormalQuery<GeoDocumentSlim>(flagMatchAll: true);
//WhenAll Tasks
await QueryAllWithScrollAPIAsyncByQueryAsyncMultipleParallel(normalQuery);
//Nested IAsyncEnumerables
await QueryAllWithScrollAPIAsyncByQueryAsyncMultipleParallel11(normalQuery);
//WhenAll Tasks in Batches
await QueryAllWithScrollAPIAsyncByQueryAsyncMultipleParallel12(normalQuery);
}
public static (string indexName, string indexField, string preIndexedShapeIndexName, string preIndexedShapeId,
string preIndexPath, GeoShapeRelation geoShapeRelation, int numberOfSlices, int maxDegreeOfParallelism,
string pointsIndexName, string shapesIndexName, string pointsIndexField, string shapesIndexField)
SetQueryDefaults()
{
//these actually come from a config file
var indexName = "index-name";
//geoshapes
var pointsIndexName = "index-geodoc-geoshape-points-3";
//geopoints
var shapesIndexName = "index-geodoc-geoshape-shapes-2";
var pointsIndexField = PointsIndexField ?? "location";
var shapesIndexField = ShapesIndexField ?? "geometry";
//for frames
var indexField = "location";
//for shapes
// var indexField = "geometry";
var preIndexedShapeIndexName = "pre-index-name"; //where searching
var preIndexedShapeId = "_id_"; //where searching
var preIndexPath = "geometry";
// var geoShapeRelation = GeoShapeRelation.Intersects;
var geoShapeRelation = GeoShapeRelation.Within;
QueryContainerDescriptor<FrameDocumentGeoShape> x = null;
int numberOfSlices = NumberOfShards + NumberOfReplicas;
int maxDegreeOfParallelism = numberOfSlices * 3;
return (indexName, indexField, preIndexedShapeIndexName, preIndexedShapeId, preIndexPath, geoShapeRelation,
numberOfSlices, maxDegreeOfParallelism, pointsIndexName, shapesIndexName, pointsIndexField,
shapesIndexField);
}
public static QueryContainer MakeNormalQuery<T>(string fieldKey = "", string fieldValue = "",
bool flagMatchAll = false) where T : class
{
var qcd = new QueryContainerDescriptor<T>();
var query = qcd
// .Term(f => f.Mediaowner, "ClearChannel")
.Match(m => m
// .Field(f => f.Mediaowner)
.Field(fieldKey)
.Query(fieldValue)
);
if (flagMatchAll)
{
query = qcd
.MatchAll();
}
return query;
}
public static QueryContainer MakeGeoShapeQuery<T>(string indexField, string preIndexName, string preIndexId,
string preIndexPath, GeoShapeRelation geoShapeRelation) where T : class
{
var qcd = new QueryContainerDescriptor<T>();
var query = qcd.GeoShape(c => c
.Name("named_query")
.Boost(1.1)
.Field(indexField)
.Relation(geoShapeRelation)
.IndexedShape(p => p
.Index(preIndexName)
.Id(preIndexId)
.Path(preIndexPath)
// .Routing(Project.Instance.Name)
)
);
return query;
}
public static async Task AsyncParallelForEach<T>(this IAsyncEnumerable<T> source, Func<T, Task> body,
int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler scheduler = null)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
};
if (scheduler != null)
options.TaskScheduler = scheduler;
var block = new ActionBlock<T>(body, options);
await foreach (var item in source)
block.Post(item);
block.Complete();
await block.Completion;
}
public static async IAsyncEnumerable<T> QueryIAsyncEnumerable<T>(
ElasticClient _elasticClient,
QueryContainer queryContainer,
string indexName,
bool flagOnlyIds = false
) where T : class
{
var result = await _elasticClient.SearchAsync<T>(q => q
.Index(indexName)
.Query(q => queryContainer)
.Size(SearchBulkBatchSize)
.Scroll(SearchAsyncScrollTime)
// .Sort(w => w.Ascending(e => e.Timestamp))
.Sort(so => so.Field(f => f.Field("_doc")))
);
if (!result.IsValid)
{
yield break;
throw new Exception($"Error searching: {result.DebugInformation}");
}
//var collection = result.Documents;
IEnumerable<T> collection = result.Documents;
if (flagOnlyIds)
{
//only for GeoDocumentSlim
collection = (IEnumerable<T>) result.Hits.Select(h => new GeoDocumentSlim(h.Id, "hardcoded-name"));
}
//todo check with initial implementation as other function
while (collection.Any())
{
foreach (var document in collection)
{
yield return document;
}
result = await _elasticClient.ScrollAsync<T>(SearchBulkBatchSize, result.ScrollId);
collection = result.Documents;
}
}
public static async IAsyncEnumerable<T> QueryIAsyncEnumerableIds<T>(
ElasticClient _elasticClient,
QueryContainer queryContainer,
string indexName,
bool flagOnlyIds = false
) where T : class
{
var result = await _elasticClient.SearchAsync<T>(q => q
.Index(indexName)
.Query(q => queryContainer)
.Size(SearchBulkBatchSize)
.Scroll(SearchAsyncScrollTime)
// .Sort(w => w.Ascending(e => e.Timestamp))
.Sort(so => so.Field(f => f.Field("_doc")))
);
if (!result.IsValid)
{
yield break;
throw new Exception($"Error searching: {result.DebugInformation}");
}
// var alteredDocument = new GeoDocumentSlim(document.Id, "hardcoded-name");
//var collection = result.Documents;
IEnumerable<T> collection = result.Documents;
if (flagOnlyIds)
{
//only for GeoDocumentSlim
collection = (IEnumerable<T>) result.Hits.Select(h => new GeoDocumentSlim(h.Id, "hardcoded-name"));
}
//todo check with initial implementation as other function
while (collection.Any())
{
foreach (var document in collection)
{
yield return document;
}
result = await _elasticClient.ScrollAsync<T>(SearchBulkBatchSize, result.ScrollId);
collection = result.Documents;
}
}
public static async IAsyncEnumerable<T> QueryIAsyncEnumerable2<T>(
ElasticClient _elasticClient,
QueryContainer queryContainer,
string indexName,
bool flagOnlyIds = false
) where T : class
{
var result = await _elasticClient.SearchAsync<T>(q => q
.Index(indexName)
.Query(q => queryContainer)
.Size(SearchBulkBatchSize)
.Scroll(SearchAsyncScrollTime)
// .Sort(w => w.Ascending(e => e.Timestamp))
.Sort(so => so.Field(f => f.Field("_doc")))
);
if (!result.IsValid)
{
yield break;
throw new Exception($"Error searching: {result.DebugInformation}");
}
while (result.Documents.Any())
{
foreach (var document in result.Documents)
{
yield return document;
}
result = await _elasticClient.ScrollAsync<T>(SearchBulkBatchSize, result.ScrollId);
}
}
public static async Task QueryAllWithScrollAPIAsyncByQueryAsyncMultipleParallel(QueryContainer queryContainer)
{
//https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/scrolling-documents.html#scrollall-observable
//bulkall & scrollall
//https://github.com/elastic/elasticsearch-net/blob/67fe8ce6cbaea8ab593dfe77d038fa5b6bb4f903/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs
var queryDefaults = SetQueryDefaults();
var logsList = new System.Collections.Generic.List<string>();
var elasticClient = InitElasticClient(logsList, queryDefaults.indexName);
List<FrameDocumentGeoShape> searchedDocuments = new List<FrameDocumentGeoShape>();
var watch = Stopwatch.StartNew();
/*var searchedDocumentsConcurrentBag = await QueryAllWithScrollAPIAsyncByDetails<FrameDocumentGeoShape>(queryDefaults.numberOfSlices, queryDefaults.maxDegreeOfParallelism, queryDefaults.indexName, queryDefaults.indexField, queryDefaults.preIndexedShapeIndexName, queryDefaults.preIndexedShapeId, queryDefaults.preIndexPath, queryDefaults.geoShapeRelation, elasticClient);*/
// var searchedDocumentsConcurrentBag = await QueryAllWithScrollAPIAsyncByDetails<FrameDocumentGeoShape>(queryDefaults.numberOfSlices, queryDefaults.maxDegreeOfParallelism, queryDefaults.indexName, elasticClient);
//slim docs
// var searchedDocumentsConcurrentBag = await QueryAllWithScrollAPIAsyncByDetails<FrameDocumentGeoShapeSlim>(numberOfSlices, maxDegreeOfParallelism, indexName, indexField, preIndexedShapeIndexName, preIndexedShapeId, preIndexPath, geoShapeRelation);
//query all pre-indexed shapes
// ConcurrentBag<GeoDocumentSlim> searchedShapesDocumentsConcurrentBag = new ConcurrentBag<GeoDocumentSlim>();
var searchedShapesDocumentsConcurrentBag = QueryAllWithScrollAPIAsyncByDetailsOnlyIds<GeoDocumentSlim>(
queryDefaults.numberOfSlices, queryDefaults.maxDegreeOfParallelism, queryDefaults.shapesIndexName,
elasticClient, queryContainer, true);
var ids = searchedShapesDocumentsConcurrentBag.ToList();
//for each pre-indexed shapes
// query all QueryAllWithScrollAPIAsyncByDetails by index name and each pre-indexed shape
// Parallel.ForEach()
ConcurrentBag<FrameDocumentGeoShapeSlim> searchedPointsDocumentsConcurrentBag =
new ConcurrentBag<FrameDocumentGeoShapeSlim>();
//https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach
var searchTasks = ids.Select(i =>
{
var spatialQuery = MakeGeoShapeQuery<FrameDocumentGeoShapeSlim>(queryDefaults.pointsIndexField,
queryDefaults.shapesIndexName, i, queryDefaults.shapesIndexField,
queryDefaults.geoShapeRelation);
return elasticClient.SearchAsync<FrameDocumentGeoShapeSlim>(s => s
.Index(queryDefaults.pointsIndexName)
.Size(SearchBulkBatchSize)
.Sort(srt => srt
.Ascending("_id")
)
.Query(q => spatialQuery)
);
});
//TODO batch instead of WhenAll
var searchResponses = await Task.WhenAll(searchTasks);
//parallel?
/*foreach (var searchResponse in searchResponses)
{
if (!searchResponse.IsValid)
{
throw new Exception($"Error searching: {searchResponse.DebugInformation}");
}
var searchedPointsDocumentsConcurrentBagTemp = searchResponse.Documents;
foreach (var searchedPoint in searchedPointsDocumentsConcurrentBagTemp)
{
searchedPointsDocumentsConcurrentBag.Add(searchedPoint);
}
}*/
Parallel.ForEach(searchResponses, searchResponse =>
{
if (!searchResponse.IsValid)
{
throw new Exception($"Error searching: {searchResponse.DebugInformation}");
}
var searchedPointsDocumentsConcurrentBagTemp = searchResponse.Documents;
foreach (var searchedPoint in searchedPointsDocumentsConcurrentBagTemp)
{
searchedPointsDocumentsConcurrentBag.Add(searchedPoint);
}
});
//!!! for postaldistricts with 500k frames => Throw The request was canceled due to the configured HttpClient.Timeout of 60 seconds elapsing
//TPL //https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach
/*var getTransformBlock = new TransformBlock<string, ISearchResponse<FrameDocumentGeoShapeSlim>>(
async i =>
{
var spatialQuery = MakeGeoShapeQuery<FrameDocumentGeoShapeSlim>(queryDefaults.pointsIndexField,
queryDefaults.shapesIndexName, i, queryDefaults.shapesIndexField,
queryDefaults.geoShapeRelation);
return await elasticClient.SearchAsync<FrameDocumentGeoShapeSlim>(s => s
.Index(queryDefaults.pointsIndexName)
.Size(10000)
.Sort(srt => srt
.Ascending("_id")
)
.Query(q => spatialQuery)
);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeActionBlock = new ActionBlock<ISearchResponse<FrameDocumentGeoShapeSlim>>(c =>
{
if (!c.IsValid)
{
throw new Exception($"Error searching: {c.DebugInformation}");
}
var searchedPointsDocumentsConcurrentBagTemp s= c.Documents;
foreach (var searchedPoint in searchedPointsDocumentsConcurrentBagTemp)
{
searchedPointsDocumentsConcurrentBag.Add(searchedPoint);
}
});
getTransformBlock.LinkTo(
writeActionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getTransformBlock.Post(id);
getTransformBlock.Complete();
writeActionBlock.Completion.Wait();*/
var shapesCount = 0;
var pointsCount = 0;
var totalSearchedDocuments = 0;
var totalSearchedDocumentsConcurrentBag = 0;
totalSearchedDocumentsConcurrentBag = searchedPointsDocumentsConcurrentBag.Count;
totalSearchedDocuments = searchedDocuments.Count;
watch.Stop();
var formattedElapsedTime = FormatElapsedTimeFromMilliseconds(watch.ElapsedMilliseconds);
var statisticsList = new System.Collections.Generic.List<string>
{
$"Search method: Query ALL With Scroll API by two indices (matchmaking) WhenAll Tasks",
$"ES Index (search results from this index) : {queryDefaults.pointsIndexName}",
$"ES Index Pre-Index-Shapes: {queryDefaults.shapesIndexName}",
$"Query relation : {queryDefaults.geoShapeRelation}",
$"Number of shards: {NumberOfShards}",
$"Number of replicas: {NumberOfReplicas}",
$"Number of slices: {queryDefaults.numberOfSlices}",
$"Degree of parallelism: {queryDefaults.maxDegreeOfParallelism}",
$"Processor count: {MaxDegreeOfParallelismProcessorCount}",
$"Search Bulk batch size: {SearchBulkBatchSize}",
$"Total searched documents: {totalSearchedDocuments}",
$"Total searched documents concurrent bag: {totalSearchedDocumentsConcurrentBag}",
$"Total shapes count: {shapesCount}",
$"Total points count: {pointsCount}",
$"Total Elapsed time: {formattedElapsedTime}",
};
Console.WriteLine(string.Join("\n", statisticsList));
}
public static async Task QueryAllWithScrollAPIAsyncByQueryAsyncMultipleParallel12(QueryContainer queryContainer)
{
//https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/scrolling-documents.html#scrollall-observable
//bulkall & scrollall
//https://github.com/elastic/elasticsearch-net/blob/67fe8ce6cbaea8ab593dfe77d038fa5b6bb4f903/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs
var queryDefaults = SetQueryDefaults();
var logsList = new System.Collections.Generic.List<string>();
var elasticClient = InitElasticClient(logsList, queryDefaults.indexName);
List<FrameDocumentGeoShape> searchedDocuments = new List<FrameDocumentGeoShape>();
var watch = Stopwatch.StartNew();
//TODO!!!!! matchmaking
//query all pre-indexed shapes
// ConcurrentBag<GeoDocumentSlim> searchedShapesDocumentsConcurrentBag = new ConcurrentBag<GeoDocumentSlim>();
var searchedShapesDocumentsConcurrentBag = QueryAllWithScrollAPIAsyncByDetailsOnlyIds<GeoDocumentSlim>(
queryDefaults.numberOfSlices, queryDefaults.maxDegreeOfParallelism, queryDefaults.shapesIndexName,
elasticClient, queryContainer, true);
var ids = searchedShapesDocumentsConcurrentBag.ToList();
//for each pre-indexed shapes
// query all QueryAllWithScrollAPIAsyncByDetails by index name and each pre-indexed shape
// Parallel.ForEach()
ConcurrentBag<FrameDocumentGeoShapeSlim> searchedPointsDocumentsConcurrentBag =
new ConcurrentBag<FrameDocumentGeoShapeSlim>();
//https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach
var searchTasks = ids.Select(i =>
{
var spatialQuery = MakeGeoShapeQuery<FrameDocumentGeoShapeSlim>(queryDefaults.pointsIndexField,
queryDefaults.shapesIndexName, i, queryDefaults.shapesIndexField,
queryDefaults.geoShapeRelation);
return elasticClient.SearchAsync<FrameDocumentGeoShapeSlim>(s => s
.Index(queryDefaults.pointsIndexName)
.Size(SearchBulkBatchSize)
.Sort(srt => srt
.Ascending("_id")
)
.Query(q => spatialQuery)
);
});
var allBatchListsSearchTasks = GetBatches<Task<ISearchResponse<FrameDocumentGeoShapeSlim>>>(searchTasks.ToList(), SearchTasksBatchSize);
foreach (var searchTasks2 in allBatchListsSearchTasks)
{
var searchResponses2 = await Task.WhenAll(searchTasks2);
Parallel.ForEach(searchResponses2, searchResponse =>
{
if (!searchResponse.IsValid)
{
throw new Exception($"Error searching: {searchResponse.DebugInformation}");
}
var searchedPointsDocumentsConcurrentBagTemp = searchResponse.Documents;
foreach (var searchedPoint in searchedPointsDocumentsConcurrentBagTemp)
{
searchedPointsDocumentsConcurrentBag.Add(searchedPoint);
}
});
}
var shapesCount = 0;
var pointsCount = 0;
var totalSearchedDocuments = 0;
var totalSearchedDocumentsConcurrentBag = 0;
totalSearchedDocumentsConcurrentBag = searchedPointsDocumentsConcurrentBag.Count;
totalSearchedDocuments = searchedDocuments.Count;
watch.Stop();
var formattedElapsedTime = FormatElapsedTimeFromMilliseconds(watch.ElapsedMilliseconds);
var statisticsList = new System.Collections.Generic.List<string>
{
$"Search method: Query ALL With Scroll API by two indices (matchmaking) WhenAll Tasks in Batches",
$"ES Index (search results from this index) : {queryDefaults.pointsIndexName}",
$"ES Index Pre-Index-Shapes: {queryDefaults.shapesIndexName}",
$"Query relation : {queryDefaults.geoShapeRelation}",
$"Number of shards: {NumberOfShards}",
$"Number of replicas: {NumberOfReplicas}",
$"Number of slices: {queryDefaults.numberOfSlices}",
$"Degree of parallelism: {queryDefaults.maxDegreeOfParallelism}",
$"Processor count: {MaxDegreeOfParallelismProcessorCount}",
$"Search Tasks batch size: {SearchTasksBatchSize}",
$"Search Bulk batch size: {SearchBulkBatchSize}",
$"Total searched documents: {totalSearchedDocuments}",
$"Total searched documents concurrent bag: {totalSearchedDocumentsConcurrentBag}",
$"Total shapes count: {shapesCount}",
$"Total points count: {pointsCount}",
$"Total Elapsed time: {formattedElapsedTime}",
};
Console.WriteLine(string.Join("\n", statisticsList));
}
public static async Task QueryAllWithScrollAPIAsyncByQueryAsyncMultipleParallel11(QueryContainer queryContainer)
{
//https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/scrolling-documents.html#scrollall-observable
//bulkall & scrollall
//https://github.com/elastic/elasticsearch-net/blob/67fe8ce6cbaea8ab593dfe77d038fa5b6bb4f903/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs
var queryDefaults = SetQueryDefaults();
var logsList = new System.Collections.Generic.List<string>();
var elasticClient = InitElasticClient(logsList, queryDefaults.indexName);
List<FrameDocumentGeoShape> searchedDocuments = new List<FrameDocumentGeoShape>();
var watch = Stopwatch.StartNew();
//query all pre-indexed shapes
ConcurrentBag<GeoDocumentSlim> searchedShapesDocumentsConcurrentBag = new ConcurrentBag<GeoDocumentSlim>();
//for each pre-indexed shapes
// query all QueryAllWithScrollAPIAsyncByDetails by index name and each pre-indexed shape
// Parallel.ForEach()
ConcurrentBag<FrameDocumentGeoShapeSlim> searchedPointsDocumentsConcurrentBag =
new ConcurrentBag<FrameDocumentGeoShapeSlim>();
//https://medium.com/@alex.puiu/parallel-foreach-async-in-c-36756f8ebe62
//https://scatteredcode.net/parallel-foreach-async-in-c/
/*using (var session = documentStore.OpenSession())
{
session.Advanced.MaxNumberOfRequestsPerSession = int.MaxValue;
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
await GetDocumentsFromDatabase(session).AsyncParallelForEach(async entry => {
Console.WriteLine($"Processing entry '{entry.Id}'");
}, 20, TaskScheduler.FromCurrentSynchronizationContext()
);
}*/
var shapesCount = 0;
var pointsCount = 0;
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
// var shapesAsyncEnumerable = QueryIAsyncEnumerable<GeoDocumentSlim>(elasticClient, queryContainer, queryDefaults.shapesIndexName, true);
var shapesAsyncEnumerable = QueryIAsyncEnumerableIds<GeoDocumentSlim>(elasticClient, queryContainer,
queryDefaults.shapesIndexName, true);
await shapesAsyncEnumerable.AsyncParallelForEach(
async entry =>
{
//for each shape id
// shapesCount++;
Interlocked.Increment(ref shapesCount);
// searchedShapesDocumentsConcurrentBag.Add(entry);
// Console.WriteLine($"Processing shape entry '{entry.Id}'");
var spatialQuery = MakeGeoShapeQuery<FrameDocumentGeoShapeSlim>(queryDefaults.pointsIndexField,
queryDefaults.shapesIndexName, entry.Id, queryDefaults.shapesIndexField,
queryDefaults.geoShapeRelation);
//for each shapeId => launch multiple frameSearch
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
// var pointsAsyncEnumerable = QueryIAsyncEnumerable<FrameDocumentGeoShapeSlim>(elasticClient, spatialQuery, queryDefaults.pointsIndexName, false);
var pointsAsyncEnumerable = QueryIAsyncEnumerable2<FrameDocumentGeoShapeSlim>(elasticClient,
spatialQuery, queryDefaults.pointsIndexName, false);
await pointsAsyncEnumerable.AsyncParallelForEach(
async entry2 =>
{
//each frame result
// pointsCount++;
Interlocked.Increment(ref pointsCount);
// Console.WriteLine($"Processing frame entry '{entry.Town}'");
searchedPointsDocumentsConcurrentBag.Add(entry2);
}, 40,
TaskScheduler.FromCurrentSynchronizationContext()
);
/*await foreach (var point in pointsAsyncEnumerable)
{
Interlocked.Increment(ref pointsCount);
searchedPointsDocumentsConcurrentBag.Add(point);
}*/
}, 20,
TaskScheduler.FromCurrentSynchronizationContext()
);
/*await foreach (var shape in shapesAsyncEnumerable)
{
var spatialQuery = MakeGeoShapeQuery<FrameDocumentGeoShapeSlim>(queryDefaults.pointsIndexField,
queryDefaults.shapesIndexName, shape.Id, queryDefaults.shapesIndexField,
queryDefaults.geoShapeRelation);
var pointsAsyncEnumerable = QueryIAsyncEnumerable<FrameDocumentGeoShapeSlim>(elasticClient, spatialQuery,
queryDefaults.pointsIndexName, false);
await foreach (var point in pointsAsyncEnumerable)
{
searchedPointsDocumentsConcurrentBag.Add(point);
}
}*/
var totalSearchedDocuments = 0;
var totalSearchedDocumentsConcurrentBag = 0;
totalSearchedDocumentsConcurrentBag = searchedPointsDocumentsConcurrentBag.Count;
totalSearchedDocuments = searchedDocuments.Count;
watch.Stop();
var formattedElapsedTime = FormatElapsedTimeFromMilliseconds(watch.ElapsedMilliseconds);
var statisticsList = new System.Collections.Generic.List<string>
{
$"Search method: Query ALL With Scroll API by two indices (matchmaking) Nested IAsyncEnumerables",
$"ES Index (search results from this index) : {queryDefaults.pointsIndexName}",
$"ES Index Pre-Index-Shapes: {queryDefaults.shapesIndexName}",
$"Query relation : {queryDefaults.geoShapeRelation}",
$"Number of shards: {NumberOfShards}",
$"Number of replicas: {NumberOfReplicas}",
$"Number of slices: {queryDefaults.numberOfSlices}",
$"Degree of parallelism: {queryDefaults.maxDegreeOfParallelism}",
$"Processor count: {MaxDegreeOfParallelismProcessorCount}",
$"Search Bulk batch size: {SearchBulkBatchSize}",
$"Total searched documents: {totalSearchedDocuments}",
$"Total searched documents concurrent bag: {totalSearchedDocumentsConcurrentBag}",
$"Total shapes count: {shapesCount}",
$"Total points count: {pointsCount}",
$"Total Elapsed time: {formattedElapsedTime}",
};
Console.WriteLine(string.Join("\n", statisticsList));
}
static ConcurrentBag<string> QueryAllWithScrollAPIAsyncByDetailsOnlyIds<T>(int numberOfSlices,
int maxDegreeOfParallelism, string indexName, ElasticClient elasticClient,
QueryContainer queryContainer = null, bool flagSlimDocs = false) where T : class
{
var searchedDocumentsConcurrentBag = new ConcurrentBag<string>();
;
var totalSearchedDocuments = 0;
var totalSearchedDocumentsConcurrentBag = 0;
IObservable<ScrollAllResponse<T>> scrollAllObservable;
scrollAllObservable = elasticClient.ScrollAll<T>("1m", numberOfSlices, sc => sc
.MaxDegreeOfParallelism(maxDegreeOfParallelism)
.Search(s => s
.Index(indexName)
.Size(SearchBulkBatchSize) //only ~6k?
/*.Source(sf =>
{
//todo convert to array
if (flagSlimDocs)
{
return sf
.Includes(i => i // <1> **Include** the following fields
.Fields(
"_id",
"name"
// "location"
)
);
}
return sf;
})*/
// .FilterPath(new[] { "hits.hits._id" })
.Sort(so => so.Field(f => f.Field("_doc")))
.Query(q => queryContainer)
)
);
var waitHandle = new ManualResetEvent(false);
Exception exception = null;
scrollAllObservable.Subscribe(new ScrollAllObserver<T>(
onNext: r =>
{
var ids = r.SearchResponse.Hits.Select(h => h.Id);
Parallel.ForEach(ids, id => { searchedDocumentsConcurrentBag.Add(id); });
},
onError: e =>
{
exception = e;
// Console.WriteLine($"Error {e.Message} \n {e.DebugInformation}");
Console.WriteLine($"Error {e.Message} \n e.InnerException?.DebugInformation");
waitHandle.Set();
},
onCompleted: () => waitHandle.Set()
));
waitHandle.WaitOne();
if (exception != null)
throw exception;
return searchedDocumentsConcurrentBag;
}
public static List<List<T>> GetBatches<T>(List<T> ids, int batchSize)
{
var list = new List<List<T>>();
for (int i = 0; i < ids.Count; i += batchSize)
{
list.Add(ids.GetRange(i, Math.Min(batchSize, ids.Count - i)));
}
return list;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment