Skip to content

Instantly share code, notes, and snippets.

@ahsanmster
Created November 2, 2022 11:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ahsanmster/1f3670a4eab00d95f33ecac954967824 to your computer and use it in GitHub Desktop.
Save ahsanmster/1f3670a4eab00d95f33ecac954967824 to your computer and use it in GitHub Desktop.
<?php
namespace App\Services\Property;
use App\Models\BulkOperationData;
use App\Models\Property\Address;
use App\Models\PropertyLeadStatus;
use App\Services\BulkOperationService;
use Elasticsearch\ClientBuilder;
use Exception;
use Illuminate\Container\Container;
use Illuminate\Pagination\LengthAwarePaginator;
use Illuminate\Pagination\Paginator;
use Illuminate\Support\Arr;
use Illuminate\Support\Collection;
use Illuminate\Support\Str;
use ReflectionClass;
use Illuminate\Support\Facades\DB;
class PropertyElasticService extends ElasticsearchService
{
protected $clientBuilder;
public function __construct()
{
parent::__construct(config('constant.elastic.property.index'));
$this->baseQuery['query'] = [
'bool' => [
'filter' => [],
]
];
$this->clientBuilder = (ClientBuilder::create()->setHosts([config('services.elasticsearch.host')])
->setBasicAuthentication(config('services.elasticsearch.username'), config('services.elasticsearch.password'))
->setSelector('\Elasticsearch\ConnectionPool\Selectors\StickyRoundRobinSelector')
->setConnectionPool('\Elasticsearch\ConnectionPool\SimpleConnectionPool', [])
->build());
}
public static function generateLatLon($storageArray, $latitude, $longitude, $type)
{
if (is_numeric($longitude) && is_numeric($latitude) && in_array($type, ['property', 'mailing'])) {
$storageArray[$type . '_lat_lon'] = [(float)$longitude, (float)$latitude];
}
return $storageArray;
}
/**
* Update elastic documents
* @param $properties
* @param array $fields
* @param bool $isIdsOnly
* @param bool $queryLess
* @param bool $doc_as_upsert
* @return bool
* @throws Exception
*/
public function update($properties, $fields = [], $isIdsOnly = true, $queryLess = false, $doc_as_upsert=true, $refresh="wait_for") {
if (empty($properties)) {
return true;
}
if ($queryLess) {
$params = $this->buildUpdateParamsQueryLess($properties, $doc_as_upsert, $refresh);
} else {
$params = $this->buildUpdateParams($properties, $fields, $isIdsOnly);
}
if (empty($params['body'])) {
return true;
}
try {
$response = $this->clientBuilder->bulk($params);
if ($response['errors']) {
$responseCollection = collect($response['items'])->whereNotNull('update.error');
bugsnag(new \Exception('Bulk upsert response: ' . $responseCollection->toJson()));
return false;
}
return true;
} catch (\Exception $e) {
$data = [
'Data' => [
'Query' => $params,
'Environment' => config('app.env'),
'Host' => request()->getHttpHost(),
]
];
bugsnag($e, 'error', $data);
throw $e;
}
}
public function index($properties)
{
if (empty($properties)) {
return true;
}
$params = $this->buildIndexQuery($properties);
if (empty($params['body'])) {
return true;
}
$response = (ClientBuilder::create()->setHosts([\config('services.elasticsearch.host')])
->setBasicAuthentication(\config('services.elasticsearch.username'), \config('services.elasticsearch.password'))->build())->bulk($params);
return $this->getErrorLog($response, 'index');
}
/**
* Build update query for elasticsearch without db queries
*
* @param $addresses
* @param bool $doc_as_upsert
* @return mixed
*/
public function buildUpdateParamsQueryLess($addresses, $doc_as_upsert=true, $refresh) {
$params['body'] = [];
$params['refresh'] = $refresh;
foreach ($addresses as $address) {
if (empty($address['id'])) {
continue;
}
$params['body'][] = [
'update' => [
'_id' => $address['id'],
'_index' => config('constant.elastic.property.index'),
'_type' => config('constant.elastic.property.type'),
'retry_on_conflict' => 3,
],
];
$address['index_updated_at'] = str_replace(' ', 'T', now()->toDateTimeString());
$params['body'][] = [
'doc' => $address,
'doc_as_upsert' => $doc_as_upsert,
];
}
return $params;
}
public function buildIndexQuery($addresses)
{
$params['body'] = [];
foreach ($addresses as $address) {
$params['body'][] = [
'index' => [
'_id' => $address['id'],
'_index' => config('constant.elastic.property.index'),
'_type' => config('constant.elastic.property.type'),
],
];
$address['index_updated_at'] = str_replace(' ', 'T', now()->toDateTimeString());
$params['body'][] = $address;
}
return $params;
}
/**
* prepare query to update elastic documents
* @param $properties
* @param array $fields
* @param bool $isIdsOnly
* @return mixed
*/
public function buildUpdateParams($properties, $fields = [], $isIdsOnly = true)
{
$addresses = $properties;
if (is_array($properties)) {
$addresses = collect($properties);
}
// Get addresses model based on the ids
if ($isIdsOnly) {
$addresses = Address::query()->findMany($properties);
}
$params['body'] = [];
$addresses->each(function ($address) use (&$params, $fields) {
$params['body'][] = [
'update' => [
'_id' => $address->getKey(),
'_index' => config('constant.elastic.property.index'),
'_type' => config('constant.elastic.property.type'),
'retry_on_conflict' => 3
],
];
$addressArray = $address->toSearchableArrayByFields($fields);
$addressArray['index_updated_at'] = str_replace(' ', 'T', now()->toDateTimeString());
$params['body'][] = [
'doc' => $addressArray
];
});
return $params;
}
/**
* delete elastic documents
*
* @param $properties
* @param bool $isIdsOnly
* @return bool
* @throws \ReflectionException
*/
public function delete($properties, $isIdsOnly = true)
{
if (empty($properties)) {
return true;
}
$params = $this->buildDeleteParams($properties, $isIdsOnly);
$response = (ClientBuilder::create()->setHosts([config('services.elasticsearch.host')])
->setBasicAuthentication(config('services.elasticsearch.username'), config('services.elasticsearch.password'))
->build())->bulk($params);
return $this->getErrorLog($response, 'delete');
}
/**
* @param $response
* @param $type
* @return bool
*/
private function getErrorLog($response, $type): bool
{
if ($response['errors']) {
$responseCollection = collect($response['items'])->whereNotNull($type . '.error');
bugsnag(new \Exception('Bulk operation response of type: ' . $type . ' : ' . $responseCollection->toJson()));
return false;
}
return true;
}
/**
* prepare query to delete elastic documents
*
* @param $properties
* @param bool $isIdsOnly
* @return mixed
* @throws \ReflectionException
*/
public function buildDeleteParams($properties, $isIdsOnly = true)
{
$params['body'] = [];
foreach ($properties as $property) {
$params['body'][] = [
'delete' => [
'_id' => $isIdsOnly ? $property : $property->getKey(),
'_index' => $isIdsOnly ? config('constant.elastic.property.index') : $property->searchableAs(),
'_type' => $isIdsOnly ? config('constant.elastic.property.type') : Str::lower((new ReflectionClass($property))->getShortName()),
],
];
}
return $params;
}
/**
* Prepare search query and get addresses from elasticsearch
*
* @param $data
* @return mixed|object
*/
public function getAddresses($data)
{
$this->prepareQuery($data);
$this->aggregate();
$elasticHits = $this->search();
if (isset($data['preference']) && !empty($data['preference'])) {
$this->preference = $data['preference'];
}
return Container::getInstance()->makeWith(LengthAwarePaginator::class, [
'items' => $elasticHits->hits->hits,
'total' => $elasticHits->hits->total->value,
'perPage' => $data['pagesize'] ?: 15,
'currentPage' => $data['page'] ?: 1,
'options' => [
'path' => Paginator::resolveCurrentPath(),
'pageName' => 'page',
'aggregations' => $elasticHits->aggregations,
]
]);
}
public function getAddressCount($data = [], $shard=NULL)
{
$this->prepareQuery($data);
$elasticHits = $this->count($shard);
$this->reset();
return $elasticHits->count;
}
public function getPropertiesCountByReset($data = [])
{
$this->prepareQuery($data);
unset($this->baseQuery['sort']); // unset sort due to count doesnt support sort
$elasticHits = $this->count();
$this->reset();
return $elasticHits->count;
}
/**
* Get all Address Ids from the elasticsearch based on filters with custom source
* @param $data
* @param $source
* @return Collection
*/
public function getRawProperties($data, $source, $user=null): Collection
{
$this->prepareQuery($data, true, false, $user);
$this->source($source);
if (!empty($data['limit'])) {
$this->limit($data['limit']);
}
if (!empty($data['offset'])) {
$this->prepareCustomPaginate($data['offset']);
}
$elasticHits = $this->search();
$addressesHits = collect($elasticHits->hits->hits);
$this->reset();
$this->resetSort();
return $addressesHits;
}
/**
* Get all Address Ids from the elasticsearch based on filters
* @param $data
* @return array
*/
public function getAddressIds($data)
{
$requestLimit = !empty($data['limit']) ? $data['limit'] : 10000;
$data['limit'] = $requestLimit > 10000 ? 10000 : $requestLimit;
$this->prepareQuery($data, false);
$this->limit($data['limit']);
$elasticHits = $this->search();
$addressesHits = collect($elasticHits->hits->hits);
$addressesCount = $addressesHits->count();
$addresses = $addressesHits->pluck('_source.id')->toArray();
$lastAddress = $addressesHits->last();
// Loop the requests to get more than 10,000 records
if ($requestLimit > 10000) {
$searchAfterLimit = $requestLimit - $addressesCount;
while ($searchAfterLimit > 0 && $lastAddress) {
$searchRequestLimit = $searchAfterLimit > 10000 ? 10000 : $searchAfterLimit;
$this->limit($searchRequestLimit);
$this->searchAfter($lastAddress->sort);
$elasticSearchAfterHits = $this->search();
$addressesSearchAfterHits = collect($elasticSearchAfterHits->hits->hits);
$addressesCount += $addressesSearchAfterHits->count();
$searchAfterAddresses = $addressesSearchAfterHits->pluck('_source.id')->toArray();
$addresses = array_merge($addresses, $searchAfterAddresses);
$lastAddress = $addressesSearchAfterHits->last();
$searchAfterLimit = $requestLimit - $addressesCount;
}
}
return $addresses;
}
/**
* @param $data
* @param bool $checkAmount
* @return array
*/
public function getAddressIdsScanScroll($data, $checkAmount = false): array
{
$elasticMaxResultWindow = 10000;
$searchQueryParams = ['scroll' => '1m'];
$requestLimit = !empty($data['limit']) ? $data['limit'] : $elasticMaxResultWindow;
$data['limit'] = $requestLimit > $elasticMaxResultWindow ? $elasticMaxResultWindow : $requestLimit;
$this->prepareQuery($data, false);
if (!empty($data['from']) && !empty($data['to'])) {
$elasticFrom = $data['from'] - 1;
$elasticSize = ($data['to'] - $data['from']) + 1;
$ranges = splitRange($elasticFrom, $elasticSize, $elasticMaxResultWindow);
$firstRange = array_shift($ranges);
if ($firstRange) {
$elasticFrom = $firstRange['from'];
$elasticSize = $firstRange['to'];
}
$searchQueryParams = [];
$this->prepareCustomPaginate($elasticFrom);
$this->limit($elasticSize);
} else {
$this->limit($data['limit']);
}
if ($checkAmount) {
$this->source(['id', 'is_skip_traced']);
}
$elasticHits = $this->search($searchQueryParams);
$addressesHits = collect($elasticHits->hits->hits);
$addressesCount = $addressesHits->count();
if ($checkAmount) {
$addresses = $addressesHits->map(function ($item) {
return [
'id' => $item->_source->id,
'is_skip_traced' => $item->_source->is_skip_traced
];
})->toArray();
} else {
$addresses = $addressesHits->pluck('_source.id')->toArray();
}
$scrollIds = [];
$scrollId = data_get($elasticHits, '_scroll_id');
if ($scrollId) {
$scrollIds = [$scrollId];
}
// Loop the requests to get more than 10,000 records
if ($requestLimit > $elasticMaxResultWindow && !empty($scrollId)) {
$searchAfterLimit = $requestLimit - $addressesCount;
while ($searchAfterLimit > 0) {
$elasticSearchAfterHits = $this->searchScroll($scrollId);
//Collect different scroll ids to delete scroll session post completion
if ($elasticSearchAfterHits->_scroll_id != $scrollId) {
$scrollId = $elasticSearchAfterHits->_scroll_id;
$scrollIds[] = $scrollId;
}
$addressesSearchAfterHits = collect($elasticSearchAfterHits->hits->hits);
if ($checkAmount) {
$searchAfterAddresses = $addressesSearchAfterHits->map(function ($item) {
return [
'id' => $item->_source->id,
'is_skip_traced' => $item->_source->is_skip_traced
];
});
} else {
$searchAfterAddresses = $addressesSearchAfterHits->pluck('_source.id');
}
if (!$addressesSearchAfterHits->count()) {
break;
}
if ($searchAfterLimit < $elasticMaxResultWindow) {
$searchAfterAddresses->splice($searchAfterLimit);
$searchAfterAddresses = collect($searchAfterAddresses->all());
}
$addressesCount += count($searchAfterAddresses);
$addresses = array_merge($addresses, $searchAfterAddresses->toArray());
$searchAfterLimit = $requestLimit - $addressesCount;
}
}
// Loop the requests to get remaining records of pending ranges
if (!empty($ranges)) {
foreach ($ranges as $range) {
$searchQueryParams = [];
$this->prepareCustomPaginate($range['from']);
$this->limit($range['to']);
$elasticHits = $this->search($searchQueryParams);
$addressesSearchAfterHits = collect($elasticHits->hits->hits);
if ($checkAmount) {
$searchAfterAddresses = $addressesSearchAfterHits->map(function ($item) {
return [
'id' => $item->_source->id,
'is_skip_traced' => $item->_source->is_skip_traced
];
});
} else {
$searchAfterAddresses = $addressesSearchAfterHits->pluck('_source.id');
}
$addresses = array_merge($addresses, $searchAfterAddresses->toArray());
}
}
if ($scrollIds) {
$this->clearScrolls($scrollIds);
}
return $addresses;
}
/**
* Prepare the main
*
* @param $data
* @param bool $getPaginate
* @param bool $allFields
*/
public function prepareQuery($data, $getPaginate = true, $allFields = false, $user=null)
{
if (isset($data['bound_box_lat_lon'])) {
$this->baseQuery['query']['bool']['must'][] = $this->term('inserted_by', empty($data['user_id']) ? appCompanyId() : $data['user_id']);
} else {
$this->baseQuery['query']['bool']['filter'][] = $this->term('inserted_by', empty($data['user_id']) ? appCompanyId() : $data['user_id']);
}
if ($allFields === false) {
$this->selectFields($data);
} else {
unset($this->baseQuery['_source']);
}
if (!empty($data['aggregate_filter'])) {
$aggregateFields = config('constant.aggregation');
if (isset($aggregateFields[$data['aggregate_filter']])) {
$value = $aggregateFields[$data['aggregate_filter']]['aggregation_field'] == 'property_is_vacant'
? $aggregateFields[$data['aggregate_filter']]['aggregation_value']
: !!$aggregateFields[$data['aggregate_filter']]['aggregation_value'];
$this->baseQuery['post_filter'] = $this->term($aggregateFields[$data['aggregate_filter']]['aggregation_field'], $value);
}
}
if (!empty($data['global_filter'])) {
$this->baseQuery['query']['bool']['filter'][] = $this->searchByKeyword($data['global_filter']);
}
if (!empty($data['address_ids'])) {
$this->baseQuery['query']['bool']['filter'][] = $this->terms('id', $data['address_ids']);
}
if (!empty($data['uncheckedids'])) {
$this->baseQuery['query']['bool']['must_not'][] = $this->terms('id', $data['uncheckedids']);
}
if (isset($data['opt_out']) && in_array($data['opt_out'], [1, 0, 'both'])) {
$this->baseQuery['query']['bool']['filter'][] = $this->optOut($data['opt_out']);
}
if (isset($data['self_managed']) && in_array($data['self_managed'], [1, 0, 'both'])) {
$this->baseQuery['query']['bool']['filter'][] = $this->selfManaged($data['self_managed']);
}
if (isset($data['is_dialer_pushed']) && in_array($data['is_dialer_pushed'], [true, false])) {
$value = $data['is_dialer_pushed'];
$field = 'is_dialer_pushed';
$term = $this->term($field, $value);
if ($value) {
$this->baseQuery['query']['bool']['filter'][] = $term;
} else {
// field not exist or false value query
$orQuery = [$this->notExist($field), $term];
$this->baseQuery['query']['bool']['must'][] = [
"bool" => [
"minimum_should_match" => 1,
"should" => $orQuery
]
];
}
}
if (isset($data['quick_list']) && is_array($data['quick_list']) && !empty($data['quick_list'])) {
$segment = $this->setQuickListQuery($data['quick_list']);
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $segment);
}
if (!empty($data['list_id']) && !empty($data['camp_cond_filter'])) {
$segment = $this->includeList($data['list_id'], $data['camp_cond_filter']);
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $segment);
}
if (!empty($data['lead_status'])) {
if (in_array(PropertyLeadStatus::LEAD_STATUS_DEFAULT_ID, $data['lead_status'])) {
$this->baseQuery['query']['bool']['must'][] = [
"bool" => [
"minimum_should_match" => 1,
"should" => [$this->terms('lead_status', $data['lead_status']), $this->mustNotExistField('lead_status')]
]
];
} else {
$segment = $this->includeLeadStatus($data['lead_status']);
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $segment);
}
}
if (!empty($data['list_id2'])) {
$this->baseQuery['query']['bool']['must_not'][] = $this->excludeList($data['list_id2']);
}
if (!empty($data['mailer1']) && !empty($data['mailer_cond_filter'])) {
$segment = $this->includeTag($data['mailer1'], $data['mailer_cond_filter']);
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $segment);
}
if (!empty($data['mailer2'])) {
$this->baseQuery['query']['bool']['must_not'][] = $this->excludeTag($data['mailer2']);
}
if (isset($data['camp_from']) && !is_null($data['camp_from']) && isset($data['camp_to']) && !is_null($data['camp_to'])) {
$this->baseQuery['query']['bool']['filter'][] = $this->listCount($data['camp_from'], $data['camp_to']);
}
if (isset($data['lead_score_from']) && !is_null($data['lead_score_from']) && isset($data['lead_score_to']) && !is_null($data['lead_score_to'])) {
$leadScoreFrom = $data['lead_score_from'];
$leadScoreTo = $data['lead_score_to'];
if($leadScoreFrom <= 0 && $leadScoreTo >= 0) {
$this->baseQuery['query']['bool']['must'][] = [
"bool" => [
"minimum_should_match" => 1,
"should" => [$this->range('lead_score', $leadScoreFrom, $leadScoreTo), $this->mustNotExistField('lead_score')]
]
];
}
else {
$this->baseQuery['query']['bool']['must'][] = [
"bool" => [
"minimum_should_match" => 1,
"should" => [$this->range('lead_score', $leadScoreFrom, $leadScoreTo)]
]
];
}
}
if (isset($data['mailer_from']) && !is_null($data['mailer_from']) && isset($data['mailer_to']) && !is_null($data['mailer_to'])) {
$this->baseQuery['query']['bool']['filter'][] = $this->tagCount($data['mailer_from'], $data['mailer_to']);
}
if (isset($data['image_count_from']) && !is_null($data['image_count_from']) && isset($data['image_count_to']) && !is_null($data['image_count_to'])) {
$this->baseQuery['query']['bool']['filter'][] = $this->imageCount($data['image_count_from'], $data['image_count_to']);
}
if (isset($data['images_count_from']) && !is_null($data['images_count_from']) && isset($data['images_count_to']) && !is_null($data['images_count_to'])) {
$this->baseQuery['query']['bool']['filter'][] = $this->imagesCount($data['images_count_from'], $data['images_count_to']);
}
if (!empty($data['absentee']) && in_array($data['absentee'], Address::getAbsenteeStatuses())) {
$segment = $this->absentee($data['absentee']);
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $segment);
}
if (!empty($data['skiptraced'])) {
$this->baseQuery['query']['bool']['filter'][] = $this->skipTraced($data['skiptraced']);
}
if (!empty($data['is_vacant'])) {
$this->baseQuery['query']['bool']['filter'][] = $this->propertyVacant($data['is_vacant']);
}
if (!empty($data['is_mailing_vacant'])) {
$this->baseQuery['query']['bool']['filter'][] = $this->mailingVacant($data['is_mailing_vacant']);
}
if (isset($data['has_phone_numbers']) && $data['has_phone_numbers'] != '') {
if ($data['has_phone_numbers'] == '1') {
$this->baseQuery['query']['bool']['filter'][] = $this->mustExistsField('phone_numbers');
} else {
$this->baseQuery['query']['bool']['filter'][] = $this->notExist('phone_numbers');
}
}
if (!empty($data['bound_box_lat_lon'])) {
$this->baseQuery['query']['bool']['filter']['geo_bounding_box']['property_lat_lon'] = [
'top_left' => [
'lat' => $data['bound_box_lat_lon']['nw_lat'],
'lon' => $data['bound_box_lat_lon']['nw_lon'],
],
'bottom_right' => [
'lat' => $data['bound_box_lat_lon']['se_lat'],
'lon' => $data['bound_box_lat_lon']['se_lon'],
],
];
}
if (!empty($data['greater_than_id'])) {
$this->baseQuery['query']['bool']['filter'][] = $this->greaterThan('id',$data['greater_than_id'], false);
}
if (!empty($data['filter_data']->rules)) {
$condition = strtolower($data['filter_data']->condition ?? 'and');
$segments = $this->customQuery($data['filter_data'], $user);
// Append conditions to base query for must and must not operations
if ($condition == 'and') {
if (!empty($segments['not'])) {
if (!isset($this->baseQuery['query']['bool']['must_not'])) {
$this->baseQuery['query']['bool']['must_not'] = [];
}
$this->baseQuery['query']['bool']['must_not'] = array_merge($this->baseQuery['query']['bool']['must_not'], $segments['not']);
}
if (!empty($segments['filter'])) {
if (!isset($this->baseQuery['query']['bool']['filter'])) {
$this->baseQuery['query']['bool']['filter'] = [];
}
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $segments['filter']);
}
if (!empty($segments['must'])) {
if (!isset($this->baseQuery['query']['bool']['must'])) {
$this->baseQuery['query']['bool']['must'] = [];
}
$this->baseQuery['query']['bool']['must'] = array_merge($this->baseQuery['query']['bool']['must'], $segments['must']);
}
} // Append conditions to base query for should and should not operations
else {
if (!empty($segments['filter']) || !empty($segments['not']) || !empty($segments['must'])) {
if (!isset($this->baseQuery['query']['bool']['should'])) {
$this->baseQuery['query']['bool']['should'] = [];
}
if (!empty($segments['not'])) {
$query = [];
$query['bool']['must_not'] = $segments['not'];
if (!isset($this->baseQuery['query']['bool']['should'])) {
$this->baseQuery['query']['bool']['should'] = [];
}
$this->baseQuery['query']['bool']['should'] = array_merge($this->baseQuery['query']['bool']['should'], [$query]);
}
if (!empty($segments['filter'])) {
if (!isset($this->baseQuery['query']['bool']['should'])) {
$this->baseQuery['query']['bool']['should'] = [];
}
$this->baseQuery['query']['bool']['should'] = array_merge($this->baseQuery['query']['bool']['should'], $segments['filter']);
}
if (!empty($segments['must'])) {
if (!isset($this->baseQuery['query']['bool']['should'])) {
$this->baseQuery['query']['bool']['should'] = [];
}
$this->baseQuery['query']['bool']['should'] = array_merge($this->baseQuery['query']['bool']['should'], $segments['must']);
}
}
}
}
if (!empty($data['column_filter']->rules)) {
$filterSegment = $this->customQuery($data['column_filter'], $user);
if (!empty($filterSegment['not'])) {
if (!isset($this->baseQuery['query']['bool']['must_not'])) {
$this->baseQuery['query']['bool']['must_not'] = [];
}
$this->baseQuery['query']['bool']['must_not'] = array_merge($this->baseQuery['query']['bool']['must_not'], $filterSegment['not']);
}
if (!empty($filterSegment['filter'])) {
if (!isset($this->baseQuery['query']['bool']['filter'])) {
$this->baseQuery['query']['bool']['filter'] = [];
}
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $filterSegment['filter']);
}
if (!empty($filterSegment['must'])) {
if (!isset($this->baseQuery['query']['bool']['must'])) {
$this->baseQuery['query']['bool']['must'] = [];
}
$this->baseQuery['query']['bool']['must'] = array_merge($this->baseQuery['query']['bool']['must'], $filterSegment['must']);
}
/**
* Special case for lead status to manage lead_status field , if no key lead_status exist in the document
* */
if (collect($data['column_filter']->rules)->pluck('field')->count() > 0) {
$leadStatusFilter = collect($data['column_filter']->rules)->filter(function ($obj) {
if ($obj->field == 'lead_status') {
return true;
}
});
$leadStatusFilter = $leadStatusFilter->first();
if (!empty($leadStatusFilter) && $leadStatusFilter->value == PropertyLeadStatus::LEAD_STATUS_DEFAULT_ID) {
$this->baseQuery['query']['bool']['must'][] = [
"bool" => [
"minimum_should_match" => 1,
"should" => [$this->term('lead_status', $leadStatusFilter->value), $this->mustNotExistField('lead_status')]
]
];
}
}
}
if (!empty($data['sort_data'])) {
if (!empty($data['global_filter']) && $data['sort_data'] == 'id') {
$data['sort_data'] = '_score';
$data['sort_type'] = 'desc';
}
$this->baseQuery['sort'][] = $this->sortBy($data['sort_data'], isset($data['sort_type']) ? $data['sort_type'] : 'asc');
}
if (Arr::has($data, 'skip')) {
$this->prepareCustomPaginate(Arr::get($data, 'skip'));
}
if ($getPaginate) {
$this->paginate(Arr::get($data, 'page', 1), Arr::get($data, 'pagesize', 15));
}
}
private function sortBy($field, $direction)
{
$field = $field == 'updated_at' ? 'updated_date' : $field;
$mapping = config('constant.filter.mapping');
$mappingCollection = collect($mapping);
if (isset($mapping[$field])) {
$map = $mapping[$field];
} else {
$map = $mappingCollection->where('elastic', $field)->first();
if (empty($map)) {
$map = [
'elastic' => 'id',
'type' => 'long'
];
}
}
if ($field == '_score') {
$map = [
'elastic' => $field,
'type' => 'long'
];
}
$sortBy = $this->getSortField($map['elastic'], $map['type']);
return $this->sort($sortBy, $direction);
}
private function getSortField($field, $type = '')
{
if ($type == 'text') {
$field .= '.keyword';
}
// create sorting query for nested data type loan
if ($field == "loan") {
$field = "loan.interest_rate";
}
return $field;
}
/**
* Generate sub segment for search by keyword
*
* @param $keyword
* @return array[]
*/
private function searchByKeyword($keyword)
{
$phoneKeys = config('constant.phone_keys');
$emailKeys = config('constant.email_keys');
$customKeys = config('constant.custom_keys');
$specialCar = strpos($keyword, '+') == false;
if ($specialCar) {
$fields = [
'full_name',
'property_full_address',
'mailing_full_address',
];
} else {
$fields = [
'full_name.keyword',
'property_full_address.keyword',
'mailing_full_address.keyword',
];
}
foreach ($phoneKeys as $phoneKey) {
$fields[] = $phoneKey . ($specialCar ? '' : '.keyword');
}
foreach ($emailKeys as $emailKey) {
$fields[] = $emailKey . ($specialCar ? '' : '.keyword');
}
foreach ($customKeys as $customKey) {
$fields[] = $customKey . ($specialCar ? '' : '.keyword');
}
return $this->queryString($keyword, $fields);
}
/**
* generate sub segment for opt_out
*
* @param $optOutStatus
* @return array[]
*/
private function optOut($optOutStatus)
{
if ($optOutStatus == 'both') {
return $this->exists('opt_out');
}
return $this->term('opt_out', !!$optOutStatus);
}
/**
* generate sub segment for self_managed
*
* @param $selfManagedStatus
* @return array[]
*/
private function selfManaged($selfManagedStatus)
{
if ($selfManagedStatus == 'both') {
return $this->exists('self_managed');
}
return $this->term('self_managed', !!$selfManagedStatus);
}
/**
* generate sub segment for quick lists
*
* @param $quickList
* @return array[]
*/
private function setQuickListQuery($quickList)
{
$segment = [];
foreach ($quickList as $listName => $value) {
$segment[] = $this->term("quick_list.{$listName}", $value);
}
return $segment;
}
/**
* generate sub segment for include lists
*
* @param $listIds
* @param $type
* @return array
*/
private function includeList($listIds, $type)
{
if ($type == 'exactMatchAny') {
$segment = [];
foreach ($listIds as $listId) {
$segment[] = $this->terms('list_ids', [$listId]);
}
return $segment;
}
return [$this->terms('list_ids', $listIds)];
}
/**
* generate sub segment for include lists
*
* @param $listIds
* @param $type
* @return array
*/
private function includeLeadStatus($listIds)
{
return [$this->terms('lead_status', $listIds)];
}
/**
* generate sub segment for exclude lists
*
* @param $listIds
* @return array[]
*/
private function excludeList($listIds)
{
return $this->terms('list_ids', $listIds);
}
/**
* generate sub segment for include tags
*
* @param $tagIds
* @param $type
* @return array
*/
private function includeTag($tagIds, $type)
{
if ($type == 'exactMatchAny') {
$segment = [];
foreach ($tagIds as $tagId) {
$segment[] = $this->terms('tag_ids', [$tagId]);
}
return $segment;
}
return [$this->terms('tag_ids', $tagIds)];
}
/**
* generate sub segment for exclude tags
*
* @param $tagIds
* @return array[]
*/
private function excludeTag($tagIds)
{
return $this->terms('tag_ids', $tagIds);
}
/**
* generate sub segment for list count
*
* @param $from
* @param $to
* @return array
*/
private function listCount($from, $to)
{
return $this->range('lists_count', $from, $to);
}
/**
* generate sub segment for tag count
*
* @param $from
* @param $to
* @return array
*/
private function tagCount($from, $to)
{
return $this->range('tags_count', $from, $to);
}
/**
* generate sub segment for image count
*
* @param $from
* @param $to
* @return array
*/
private function imageCount($from, $to)
{
return $this->range('image_count', $from, $to);
}
/**
* generate sub segment for images count
*
* @param $from
* @param $to
* @return array
*/
private function imagesCount($from, $to)
{
return $this->range('images_count', $from, $to);
}
/**
* generate sub segment for absentee
*
* @param $absenteeStatus
* @return array
*/
private function absentee($absenteeStatus)
{
$segment = [];
switch ($absenteeStatus) {
case Address::ABSENTEE_YES:
case Address::ABSENTEE_NO:
$segment[] = $this->term('sub_address_match', $absenteeStatus == Address::ABSENTEE_NO);
break;
case Address::ABSENTEE_IN_STATE:
$segment[] = $this->term('sub_address_match', false);
$segment[] = $this->term('is_in_state', true);
break;
case Address::ABSENTEE_OUT_OF_STATE:
$segment[] = $this->term('is_in_state', false);
break;
}
return $segment;
}
/**
* generate sub segment for skip trace
*
* @param $skipTraceStatus
* @return array[]
*/
public function skipTraced($skipTraceStatus)
{
return $this->term('is_skip_traced', $skipTraceStatus == Address::SKIPTRACE_YES);
}
/**
* generate sub segment for property vacant status
*
* @param $vacantStatus
* @return array[]
*/
private function propertyVacant($vacantStatus)
{
$status = 0;
$field = 'property_is_vacant';
if ($vacantStatus == Address::VACANT_NEW) {
$status = 2;
} elseif ($vacantStatus == Address::VACANT_YES) {
$status = 1;
} else if ($vacantStatus == Address::VACANT_BOTH) {
$status = [0, 1];
return $this->terms($field, $status);
}
return $this->term($field, $status);
}
/**
* generate sub segment for mailing vacant status
*
* @param $vacantStatus
* @return array[]
*/
private function mailingVacant($vacantStatus)
{
return $this->term('mailing_is_vacant', $vacantStatus == Address::VACANT_YES ? true : false);
}
/**
* Prepare custom queries based on query builder rules
*
* @param $filterData
* @return array[]
*/
private function customQuery($filterData, $user=null)
{
$notConditions = [];
$filterConditions = [];
$mustConditions = [];
$rules = $filterData->rules;
$mapping = config('constant.filter.mapping');
foreach ($rules as $rule) {
if ($rule->field == 'lead_status' && $rule->value == PropertyLeadStatus::LEAD_STATUS_DEFAULT_ID) {
continue;
}
if (is_null($rule->value) && !in_array($rule->operator, ['is_empty', 'is_not_empty'])) {
continue;
}
$mapped = $mapping[$rule->id];
$field = $mapped['elastic'];
$query = $this->getQuery($rule, $field, $mapped, $user);
if (!$query) {
continue;
}
if ($this->isNegateCondition($rule->operator)) {
$notConditions[] = $query;
continue;
}
if (!empty($mapped['match']) || $rule->operator == 'contains') {
$mustConditions[] = $query;
continue;
}
$filterConditions[] = $query;
}
return ['not' => $notConditions, 'filter' => $filterConditions, 'must' => $mustConditions];
}
/**
* check if the operator is negative condition
*
* @param $operator
* @return bool
*/
private function isNegateCondition($operator)
{
if ($operator == 'not_contains' || $operator == 'not_between' || $operator == 'is_empty') {
return true;
}
return false;
}
/**
* get query based on builder operator
*
* @param $rule
* @param $field
* @param $mapped
* @return array
*/
private function getQuery($rule, $field, $mapped, $user=null)
{
$match = !empty($mapped['match']);
$value = $rule->value;
$operator = $rule->operator;
// create nested query
if ($rule->type == "nested") {
if ($rule->field == "loan_type") {
return $this->nestedTypeQuery($field, $rule->field, $value, $rule->operator);
}
if ($rule->field == "interest_rate") {
return $this->nestedTypeRangeQuery($field, $rule->field, $value, $rule->operator);
}
}
if (in_array($field, ['property_vacant_from_date', 'mailing_vacant_from_date']) && isset($rule->input) && $rule->input == 'text') {
$value = now()->subMonths(intval($rule->value))->toDateString();
if ($operator == 'less') {
$operator = 'greater';
} else {
$operator = 'less';
}
}
switch ($operator) {
case 'is_empty':
case 'is_not_empty':
return $this->exists($field);
case 'contains':
case 'not_contains':
return $mapped['type'] == 'phrase' ? $this->matchPhrase($field, $rule->value) : $this->match($field, $rule->value);
case 'less':
return $this->lessThan($field, $value);
case 'greater':
return $this->greaterThan($field, $value);
case 'between':
case 'not_between':
$valueGTE = $rule->value[0];
$valueLTE = $rule->value[1];
$userTimeZone = isset($user) ? $user->default_timezone : userTimeZone();
if ($rule->type == 'date') {
$startDate = $rule->value[0] ? $rule->value[0] . ' 00:00:00' : $rule->value[0];
$endDate = $rule->value[1] ? $rule->value[1] . ' 23:59:59' : $rule->value[1];
$valueGTE = str_replace(' ', 'T', gmtTime($startDate, 'Y-m-d H:i:s', $userTimeZone));
$valueLTE = str_replace(' ', 'T', gmtTime($endDate, 'Y-m-d H:i:s', $userTimeZone));
} else if ($rule->type == 'datetime') {
$startDate = $rule->value[0];
$endDate = $rule->value[1];
$valueGTE = str_replace(' ', 'T', gmtTime($startDate, 'Y-m-d H:i:s', $userTimeZone));
$valueLTE = str_replace(' ', 'T', gmtTime($endDate, 'Y-m-d H:i:s', $userTimeZone));
}
// we are creating should query for lead score, because not all property contain lead_score field
if($rule->field == "lead_score"){
// we need to check filter range condition already applied in lead_score, Then we need to remove that query
$mustQuery = $this->baseQuery['query']['bool']['must'];
if(!empty($mustQuery)){
foreach($mustQuery as $key => $must) {
$shouldQuery = $must['bool']['should'];
foreach ($shouldQuery as $key => $should) {
// check and remove for exist lead_score query
if(isset($should['bool']['must_not']['exists']['field']) && $should['bool']['must_not']['exists']['field'] == 'lead_score') {
unset($shouldQuery[$key]);
}
// check filter lead_score range query
if(isset($should['range']['lead_score'])) {
unset($shouldQuery[$key]);
}
}
$must['bool']['should'] = array_values($shouldQuery);
}
}
// if user selected range include 0, We need to return lead_score column not created properties too
if($valueGTE <= 0 && $valueLTE >= 0){
$this->baseQuery['query']['bool']['must'][] = [
"bool" => [
"minimum_should_match" => 1,
"should" => [$this->range('lead_score', $valueGTE, $valueLTE), $this->notExist('lead_score')]
]
];
}
else {
$this->baseQuery['query']['bool']['must'][] = [
"bool" => [
"minimum_should_match" => 1,
"should" => [$this->range('lead_score', $valueGTE, $valueLTE)]
]
];
}
return false;
}
$valueGTE = ($valueGTE == '' || $valueGTE == null) ? 0 : $valueGTE;
$valueLTE = ($valueLTE == '' || $valueLTE == null) ? 0 : $valueLTE;
return $this->range($field, $valueGTE, $valueLTE);
default:
if ($match) {
return $this->match($field, $value);
}
if ($rule->type == 'string') {
$field .= '.keyword';
}
if ($rule->id == 'is_dialer_pushed' && !$rule->value) {
// field not exist or false value query
$orQuery = [$this->notExist($field), $this->term($field, false)];
$this->baseQuery['query']['bool']['must'][] = [
"bool" => [
"minimum_should_match" => 1,
"should" => $orQuery
]
];
return false;
}
if (is_array($rule->value)) {
return $this->terms($field, $rule->value);
}
$value = $rule->value;
if (($rule->type == 'boolean' && !is_bool($value)) || $field == 'property_is_usps_valid') {
$value = !!((int)$rule->value);
}
return $this->term($field, $value);
}
}
/**
* Get fields from elasticsearch object
*
* @param $data
*/
private function selectFields($data)
{
$fields = ['id'];
$visualColumns = isset($data['visual_columns']) ? $data['visual_columns'] : [];
$mapping = config('constant.filter.mapping');
foreach ($visualColumns as $visualColumn) {
if (isset($mapping[$visualColumn])) {
$fields[] = $mapping[$visualColumn]['elastic'];
}
}
if (count($fields)) {
$this->source($fields);
}
}
/**
* Get address by user
*
* @param $userIds
* @return \Illuminate\Support\Collection
*/
public function getAddressesByUsers($userIds): \Illuminate\Support\Collection
{
$this->baseQuery['size'] = 0;
$this->baseQuery['query']['bool']['filter'][] = $this->terms('inserted_by', $userIds);
$this->termAggregate('inserted_by');
$elasticResponse = $this->search();
return collect($elasticResponse->aggregations->inserted_by_field->buckets);
}
private function aggregate()
{
$aggregationFields = config('constant.aggregation');
if (empty($aggregationFields)) {
return;
}
$this->baseQuery['aggregations'] = [];
foreach ($aggregationFields as $aggregationFieldName => $aggregationField) {
$this->baseQuery['aggregations'][$aggregationFieldName . '_field'] = [
'terms' => [
'field' => $aggregationField['aggregation_field']
]
];
}
}
/**
* @param $userId
* @return mixed
*/
public function deleteUserAddresses($userId)
{
$this->prepareQuery(['user_id' => $userId]);
return $this->deleteByQuery();
}
public function getAddressByID($addressID)
{
$elasticDocument = $this->getDocumentByID(config('constant.elastic.property.type'), $addressID);
return $elasticDocument->found ? $elasticDocument->_source : null;
}
/**
* returns the complete address object from elastic for given IDs without pagination.
*
* @param $data
* @return \Illuminate\Support\Collection
*/
public function getAllAddressByMultipleIDs($data)
{
$totalRecordsPerLoop = 1000;
$perPageSize = $data['pagesize'];
$totalRecords = count($data['addresses']);
$totalPages = (int)ceil($totalRecords / $perPageSize);
$addressIds = collect($data['addresses'])->chunk($totalRecordsPerLoop)->values();
$addressIds = ($addressIds->values()->toArray());
$allAddresses = collect();
foreach ($addressIds as $addresses) {
$data['pagesize'] = $totalRecordsPerLoop;
$data['size'] = $totalRecordsPerLoop;
$data['address_ids'] = [];
$data['address_ids'] = array_values($addresses);
$this->baseQuery = [];
$this->baseQuery['from'] = 0;
$this->baseQuery['size'] = $totalRecordsPerLoop;
$this->prepareQuery($data, false, true);
$elasticHits = $this->search();
$allAddresses->push($elasticHits->hits->hits);
}
return collect($allAddresses)->flatten();
}
/**
* Get address IDs greater than last id
* @param $userId
* @param $addressId
* @param int $size
* @return array
*/
public function getAddressIdsGreaterThan($userId, $addressId, $size = 1000): array
{
$this->baseQuery['query']['bool']['filter'][] = $this->term('inserted_by', $userId);
$this->baseQuery['query']['bool']['filter'][] = $this->greaterThan('id', $addressId, false);
$this->baseQuery['size'] = $size;
$this->baseQuery['_source'] = ['id'];
$this->baseQuery['sort'][] = $this->sortBy('id', 'asc');
$elasticHits = $this->search();
return collect($elasticHits->hits->hits)->pluck('_source.id')->toArray();
}
/**
* returns list ids with property count
*
* @param $data
* @return \Illuminate\Support\Collection
*/
public function getAddressCountByLists($data = []): \Illuminate\Support\Collection
{
$this->prepareQuery($data);
$this->baseQuery['from'] = 0;
$this->baseQuery['size'] = 0;
$this->termAggregate('list_ids', count($data));
$elasticResponse = $this->search();
return collect($elasticResponse->aggregations->list_ids_field->buckets);
}
/**
* Get address chart data using aggregation
*
* @param $userId
* @param $totalLists
* @return \Illuminate\Support\Collection
*/
public function getDashBoardChartData($userId, $totalLists): \Illuminate\Support\Collection
{
$this->baseQuery['size'] = 0;
$this->baseQuery['query']['bool']['filter'][] = $this->term('inserted_by', $userId);
$this->termAggregate('property_is_vacant', 3);
$this->subTermAggregate('property_is_vacant_field', 'lists_count', $totalLists);
$elasticResponse = $this->search();
return collect($elasticResponse->aggregations->property_is_vacant_field->buckets);
}
/**
* returns aggregate data ids with property count
*
* @param $data
* @param $field
* @param $size
* @return \Illuminate\Support\Collection
*/
public function getAggregateAddressCounts($data = [], $field, $size): \Illuminate\Support\Collection
{
$this->prepareQuery($data);
$this->baseQuery['from'] = 0;
$this->baseQuery['size'] = 0;
$this->termAggregate($field, $size);
$elasticResponse = $this->search();
$response = collect($elasticResponse->aggregations->{$field . "_field"}->buckets);
$this->reset();
$this->resetAggregate();
return $response;
}
public function setDrivingRouteIdToNull($drivingRouteIds = [], $userId = null)
{
if (empty($drivingRouteIds)) {
return false;
}
if (!empty($drivingRouteIds)) {
$this->baseQuery['query']['bool']['filter'][] = $this->terms(
'driving_route_id',
$drivingRouteIds
);
}
if (!empty($userId)) {
$this->baseQuery['query']['bool']['must'][] = $this->term(
'inserted_by',
$userId
);
}
$this->baseQuery['script'] = [
"source" => "ctx._source.driving_route_id = null",
"lang" => "painless"
];
return $this->updateByQuery();
}
/**
* Get address using route id
* @param $userId
* @param $routeId
* @return array
*/
public function getAddressUsingRouteId($userId, $routeId, $perPage = 15): object
{
$this->baseQuery['size'] = $perPage;
$this->baseQuery['query']['bool']['filter'][] = $this->term('inserted_by', $userId);
$this->baseQuery['query']['bool']['filter'][] = $this->term('driving_route_id', $routeId);
$this->baseQuery['_source'] = ["mls_status","last_sale_price","property_full_address","property_line_1","property_line_2","property_city","property_county","property_state","property_zip","property_lat_lon","quick_list.absentee_owner","quick_list.preforeclosure","quick_list.vacant","image_count","images_count","image_urls","owner_occupied","owner_2_first_name","owner_2_last_name","mailing_county","equity_current_estimated_balance","year_built","bedroom_count","bathroom_count","total_building_area_square_feet","total_open_lien_balance","last_sale_date","lead_status","full_name","first_name","last_name","hash","mls_listing_date","mls_price","owner_residence_months","open_lien_count","id"];
$elasticHits = $this->search();
$addressesHits = collect($elasticHits->hits->hits);
$this->reset();
return $addressesHits;
}
/**
* @param int $userId
* @param string $distance
* @param array $propertyLatLon
* @return Collection
*/
public function getPropertiesUsingGeoDistance(int $userId, string $distance, array $propertyLatLon)
{
$this->baseQuery['query']['bool']['must'][] = $this->term('inserted_by', $userId);
$this->baseQuery['query']['bool']['filter'][] = $this->geoDistance([
'distance' => $distance . 'm',
'property_lat_lon' => $propertyLatLon
]);
$this->baseQuery['size'] = 1000;
$this->baseQuery['_source'] = [
"mls_status",
"property_full_address",
"property_lat_lon",
"lead_status",
"image_urls",
"hash"
];
$elasticHits = $this->search();
$addressesHits = collect($elasticHits->hits->hits);
$this->reset();
$this->baseQuery['size'] = 15; // resetting size to default value
return $addressesHits;
}
/**
* Get minmum and maximum of a field by user
*
* @param $field
* @param $userId
* @return \Illuminate\Support\Collection
*/
public function getFieldMinMaxByUser($field, $userId = null): \Illuminate\Support\Collection
{
$userId = empty($userId) ? appCompanyId() : $userId;
$this->baseQuery['size'] = 0;
$this->baseQuery['query'] = $this->term('inserted_by', $userId);
$this->customAggregate($field, 'min');
$this->customAggregate($field, 'max');
$elasticResponse = $this->search();
$this->reset();
return collect($elasticResponse->aggregations);
}
/**
* Update in ES uising guzzle curl
* @param $conditions | array
* @param $updateFields | array
* @return array|mixed
*/
public function updateBulkRecord($conditions, $updateFields){
if(is_array($conditions)){
foreach($conditions as $field => $value){
$this->baseQuery['query']['bool']['must'][] = [
'match' => [
$field => $value
]
];
}
}
$updateStr = '';
if(is_array($updateFields)){
foreach($updateFields as $field => $value){
$updateStr .= !empty($updateStr) ? ";" : '';
$updateStr .= "ctx._source.".$field." = '".$value."'";
}
}
$this->baseQuery['script']['source'] = $updateStr;
$this->baseQuery['script']['lang'] = 'painless';
return $this->updateByQuery();
}
/**
* Delete in ES using guzzle curl
* @param $conditions | array
*
* @return array|mixed
*/
public function deleteBulkRecord($conditions)
{
if(is_array($conditions)){
foreach($conditions as $field => $value){
$this->baseQuery['query']['bool']['must'][] = [
'match' => [
$field => $value
]
];
}
return $this->deleteByQuery();
}
}
/**
* Set porperty custom_images array based on create or delete
*
* @param $addressId
* @param $data
* @return true|false
*/
public function setCustomImages($addressId, $data = [])
{
if ( empty($addressId) ) {
return false;
}
$elasticDocument = $this->getAddressByID($addressId);
if( empty($elasticDocument) ) {
return false;
}
$customImages = !empty($elasticDocument->custom_images) ? $elasticDocument->custom_images : [];
$imageUrls = !empty($elasticDocument->image_urls) ? $elasticDocument->image_urls : [];
//set create for insert images
if( !empty($data['create']) ) {
//merge two array and create one array customImages
$customImages = array_merge($data['create'], $customImages);
} elseif( !empty($data['delete']) ) { //set delete for remove images
array_filter($customImages, function( $value, $key ) use( &$customImages, $data ) {
//check for image url exists if yes then unlink else reuturn false.
if ( trim(parse_url($data['delete'])['path'],"/") == trim(parse_url($value)['path'],"/") ) {
unset($customImages[$key]);
}
}, ARRAY_FILTER_USE_BOTH);
}
//calculate count for final image count
$imageCount = count(array_merge($customImages, $imageUrls));
//prepare query for elastic serach and update
$this->baseQuery['query']['bool']['filter'][] = $this->term(
'id', $addressId
);
$this->baseQuery['script'] = [
"source" => "ctx._source.custom_images = params.custom_images;ctx._source.image_count = params.image_count;ctx._source.images_count = params.images_count;",
"lang" => "painless",
"params" => [
"custom_images" => array_values($customImages),
"image_count" => ($imageCount >= 127) ? null : $imageCount,
"images_count" => $imageCount,
]
];
return $this->updateByQuery();
}
/**
* @description Get BUlk Operation Data From Elastic and Return Postgres Table BulkOperationData's array of ids
* @param $data
* @param $jobId
* @return array
*/
public function getBulkOperationDataFromElasticWithScrollAndStoreIdsInTable($data, $jobId): array
{
$bulkOperationDataIds = [];
$elasticMaxResultWindow = 10000;
$subBatchSize = 1000;
$searchQueryParams = ['scroll' => '1m'];
if (isset($data['shard'])){
$searchQueryParams['preference'] = $data['shard'];
}
$requestLimit = !empty($data['limit']) ? $data['limit'] : $elasticMaxResultWindow;
$data['limit'] = $requestLimit > $elasticMaxResultWindow ? $elasticMaxResultWindow : $requestLimit;
$allFields = (isset($data['visual_columns'])) ? false : true;
$this->prepareQuery($data, false, $allFields);
if (!empty($data['from']) && !empty($data['to'])) {
$elasticFrom = $data['from'] - 1;
$elasticSize = ($data['to'] - $data['from']) + 1;
$ranges = splitRange($elasticFrom,$elasticSize,$elasticMaxResultWindow);
$firstRange = array_shift($ranges);
if ($firstRange) {
$elasticFrom = $firstRange['from'];
$elasticSize = $firstRange['to'];
}
$searchQueryParams = [];
$this->prepareCustomPaginate($elasticFrom);
$this->limit($elasticSize);
} else {
$this->limit($data['limit']);
}
// First request is called here as it could be scroll or skip/limit & for scroll we need scroll id for other api calls
$elasticHits = $this->search($searchQueryParams);
$addressesHits = collect($elasticHits->hits->hits);
$addressesCount = $addressesHits->count();
if (isset($data['visual_columns'])){
$addresses = ['scroll_id'=>$elasticHits->_scroll_id, 'hits'=> $addressesHits->toArray()];
$isIdsOnly = false;
}else{
$addresses = $addressesHits->pluck('_source.id')->toArray();
$isIdsOnly = true;
}
/** @var BulkOperationService $bulkOperationService */
$bulkOperationService = resolve(BulkOperationService::class);
$bulkOperationDataIds = array_merge($bulkOperationDataIds,$bulkOperationService->addDataInBulkOperationInChunk($jobId,$addresses,$subBatchSize,$isIdsOnly));
$scrollIds = [];
$scrollId = data_get($elasticHits,'_scroll_id');
if ($scrollId) {
$scrollIds = [$scrollId];
}
// Loop the requests to get more than 10,000 records
if ($requestLimit > $elasticMaxResultWindow && !empty($scrollId)) {
$searchAfterLimit = $requestLimit - $addressesCount;
while ($searchAfterLimit > 0) {
$elasticSearchAfterHits = $this->searchScroll($scrollId);
//Collect different scroll ids to delete scroll session post completion
if ($elasticSearchAfterHits->_scroll_id != $scrollId) {
$scrollId = $elasticSearchAfterHits->_scroll_id;
$scrollIds[] = $scrollId;
}
$addressesSearchAfterHits = collect($elasticSearchAfterHits->hits->hits);
if (isset($data['visual_columns'])){
$searchAfterAddresses =$addressesSearchAfterHits;
$isIdsOnly = false;
$subBatchSize = 1000;
}else{
$searchAfterAddresses = $addressesSearchAfterHits->pluck('_source.id');
$isIdsOnly = true;
}
if (!$addressesSearchAfterHits->count()) {
break;
}
if ($searchAfterLimit < $elasticMaxResultWindow) {
$searchAfterAddresses->splice($searchAfterLimit);
$searchAfterAddresses = collect($searchAfterAddresses->all());
}
$addressesCount += count($searchAfterAddresses);
if (isset($data['visual_columns'])){
$searchAfterAddresses = ['scroll_id'=>$elasticSearchAfterHits->_scroll_id, 'hits'=> $searchAfterAddresses->toArray()];
}
$bulkOperationDataIds = array_merge($bulkOperationDataIds,$bulkOperationService->addDataInBulkOperationInChunk($jobId,$searchAfterAddresses,$subBatchSize,$isIdsOnly));
$searchAfterLimit = $requestLimit - $addressesCount;
}
}
// Loop the requests to get remaining records of pending ranges
if (!empty($ranges)) {
foreach ($ranges as $range) {
$searchQueryParams = [];
$this->prepareCustomPaginate($range['from']);
$this->limit($range['to']);
$elasticHits = $this->search($searchQueryParams);
$addressesSearchAfterHits = collect($elasticHits->hits->hits);
$searchAfterAddresses = $addressesSearchAfterHits->pluck('_source.id');
$bulkOperationDataIds = array_merge($bulkOperationDataIds,$bulkOperationService->addDataInBulkOperationInChunk($jobId,$searchAfterAddresses->toArray()));
}
}
if ($scrollIds) {
$this->clearScrolls($scrollIds);
}
return $bulkOperationDataIds;
}
/**
* Fetching properties based on hash
*
* @param $userId
* @param $hashList
*/
public function fetchPropertiesByHashList($userId,$hashList)
{
$this->baseQuery['query']['bool']['filter'][] = $this->terms('hash',$hashList);
$this->baseQuery['query']['bool']['filter'][] = $this->term('inserted_by',$userId);
$elasticHits = $this->search();
$this->reset();
return $elasticHits;
}
/**
* Get Address with selected columns from ES with ScanScorll
* @param $data
* @return array
*/
public function getAddressSelectedColumnsScanScroll($data): array
{
$elasticMaxResultWindow = 10000;
$searchQueryParams = ['scroll' => '1m'];
$requestLimit = !empty($data['limit']) ? $data['limit'] : $elasticMaxResultWindow;
$data['limit'] = $requestLimit > $elasticMaxResultWindow ? $elasticMaxResultWindow : $requestLimit;
$this->prepareQuery($data, false);
if (!empty($data['from']) && !empty($data['to'])) {
$elasticFrom = $data['from'] - 1;
$elasticSize = ($data['to'] - $data['from']) + 1;
$ranges = splitRange($elasticFrom, $elasticSize, $elasticMaxResultWindow);
$firstRange = array_shift($ranges);
if ($firstRange) {
$elasticFrom = $firstRange['from'];
$elasticSize = $firstRange['to'];
}
$searchQueryParams = [];
$this->prepareCustomPaginate($elasticFrom);
$this->limit($elasticSize);
} else {
$this->limit($data['limit']);
}
$elasticHits = $this->search($searchQueryParams);
$addressesHits = collect($elasticHits->hits->hits);
$addressesCount = $addressesHits->count();
$addresses = $addressesHits->toArray();
$scrollIds = [];
$scrollId = data_get($elasticHits, '_scroll_id');
if ($scrollId) {
$scrollIds = [$scrollId];
}
// Loop the requests to get more than 10,000 records
if ($requestLimit > $elasticMaxResultWindow && !empty($scrollId)) {
$searchAfterLimit = $requestLimit - $addressesCount;
while ($searchAfterLimit > 0) {
$elasticSearchAfterHits = $this->searchScroll($scrollId);
//Collect different scroll ids to delete scroll session post completion
if ($elasticSearchAfterHits->_scroll_id != $scrollId) {
$scrollId = $elasticSearchAfterHits->_scroll_id;
$scrollIds[] = $scrollId;
}
$addressesSearchAfterHits = collect($elasticSearchAfterHits->hits->hits);
$searchAfterAddresses = $addressesSearchAfterHits;
if (!$addressesSearchAfterHits->count()) {
break;
}
if ($searchAfterLimit < $elasticMaxResultWindow) {
$searchAfterAddresses->splice($searchAfterLimit);
$searchAfterAddresses = collect($searchAfterAddresses->all());
}
$addressesCount += count($searchAfterAddresses);
$addresses = array_merge($addresses, $searchAfterAddresses->toArray());
$searchAfterLimit = $requestLimit - $addressesCount;
}
}
// Loop the requests to get remaining records of pending ranges
if (!empty($ranges)) {
foreach ($ranges as $range) {
$searchQueryParams = [];
$this->prepareCustomPaginate($range['from']);
$this->limit($range['to']);
$elasticHits = $this->search($searchQueryParams);
$addressesSearchAfterHits = collect($elasticHits->hits->hits);
$addresses = array_merge($addresses, $addressesSearchAfterHits->toArray());
}
}
if ($scrollIds) {
$this->clearScrolls($scrollIds);
}
return $addresses;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment