Skip to content

Instantly share code, notes, and snippets.

@andrx
Created October 16, 2019 23:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrx/b87585c190916aea0e9ed177ecdb1d7e to your computer and use it in GitHub Desktop.
Save andrx/b87585c190916aea0e9ed177ecdb1d7e to your computer and use it in GitHub Desktop.
...
Parallel.ForEach(divisions,
new ParallelOptions { MaxDegreeOfParallelism = 5 },
division => _loaderFactory.Create().Load(division)
);
...
public virtual void Load(DivisionDto division)
{
var sqlRecords = GetSqlRecords(division);
if (sqlRecords.Length == 0) return;
var bulkCollection = new ConcurrentDictionary<string, Team>();
const int pageSize = 300;
var pages = Paginator.ConvertToPages(sqlRecords.Length, pageSize);
Parallel.ForEach(Enumerable.Range(0, pages), new ParallelOptions {MaxDegreeOfParallelism = 20}, page =>
{
var batch = sqlRecords.Skip(page * pageSize).Take(pageSize).ToArray();
GetUpdatedCollection(division, batch, bulkCollection);
});
if (bulkCollection.Count > 0) _repository.UpdateAll(bulkCollection.Values, division, onVersionConflictFn);
}
private void GetUpdatedCollection(DivisionDto division, GameEvent[] batch, ConcurrentDictionary<string, Team> bulkCollection)
{
var teams = _repository.FindByGames(batch, division);
teams.ForEach(team =>
{
// mutation logic
// adding to bulkCollection
});
}
public IEnumerable<Team> FindByGames(IEnumerable<GameEvent> facts, DivisionDto division)
{
_indexName = _indexNameGenerator.GetIndexName(division);
return Search(q => MatchByGames(facts));
}
private static QueryContainer MatchByGames(IEnumerable<GameEvent> facts)
{
var factIds = facts.Select(f => f.cId as object).AsEnumerable();
return Query<Team>.Bool(b => b.Must(new QueryContainer(new TermsQuery {Field = "cIds", Terms = factIds})));
So basically terms will contain 300 values
}
private IEnumerable<Team> Search(Func<QueryContainerDescriptor<GameEvent>, QueryContainer> query)
{
var sw = new Stopwatch();
sw.Start();
EXCEPTION HAPPENS ON NEXT LINE
var response = _esClient.Search<Team>(s => s.Index(_indexName).Size(10000).Query(query).Version().SequenceNumberPrimaryTerm());
sw.Stop();
_logger.Info($"network + parsing time: {sw.ElapsedMilliseconds}ms. es side time: {response.Took}ms. records found: {response.HitsMetadata?.Total.Value} records returned: {response.Total}; response text length: {response.DebugInformation?.Length}");
// mapping logic and return
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment