Created
November 2, 2022 11:42
-
-
Save ahsanmster/1f3670a4eab00d95f33ecac954967824 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace App\Services\Property; | |
use App\Models\BulkOperationData; | |
use App\Models\Property\Address; | |
use App\Models\PropertyLeadStatus; | |
use App\Services\BulkOperationService; | |
use Elasticsearch\ClientBuilder; | |
use Exception; | |
use Illuminate\Container\Container; | |
use Illuminate\Pagination\LengthAwarePaginator; | |
use Illuminate\Pagination\Paginator; | |
use Illuminate\Support\Arr; | |
use Illuminate\Support\Collection; | |
use Illuminate\Support\Str; | |
use ReflectionClass; | |
use Illuminate\Support\Facades\DB; | |
class PropertyElasticService extends ElasticsearchService | |
{ | |
protected $clientBuilder; | |
public function __construct() | |
{ | |
parent::__construct(config('constant.elastic.property.index')); | |
$this->baseQuery['query'] = [ | |
'bool' => [ | |
'filter' => [], | |
] | |
]; | |
$this->clientBuilder = (ClientBuilder::create()->setHosts([config('services.elasticsearch.host')]) | |
->setBasicAuthentication(config('services.elasticsearch.username'), config('services.elasticsearch.password')) | |
->setSelector('\Elasticsearch\ConnectionPool\Selectors\StickyRoundRobinSelector') | |
->setConnectionPool('\Elasticsearch\ConnectionPool\SimpleConnectionPool', []) | |
->build()); | |
} | |
public static function generateLatLon($storageArray, $latitude, $longitude, $type) | |
{ | |
if (is_numeric($longitude) && is_numeric($latitude) && in_array($type, ['property', 'mailing'])) { | |
$storageArray[$type . '_lat_lon'] = [(float)$longitude, (float)$latitude]; | |
} | |
return $storageArray; | |
} | |
/** | |
* Update elastic documents | |
* @param $properties | |
* @param array $fields | |
* @param bool $isIdsOnly | |
* @param bool $queryLess | |
* @param bool $doc_as_upsert | |
* @return bool | |
* @throws Exception | |
*/ | |
public function update($properties, $fields = [], $isIdsOnly = true, $queryLess = false, $doc_as_upsert=true, $refresh="wait_for") { | |
if (empty($properties)) { | |
return true; | |
} | |
if ($queryLess) { | |
$params = $this->buildUpdateParamsQueryLess($properties, $doc_as_upsert, $refresh); | |
} else { | |
$params = $this->buildUpdateParams($properties, $fields, $isIdsOnly); | |
} | |
if (empty($params['body'])) { | |
return true; | |
} | |
try { | |
$response = $this->clientBuilder->bulk($params); | |
if ($response['errors']) { | |
$responseCollection = collect($response['items'])->whereNotNull('update.error'); | |
bugsnag(new \Exception('Bulk upsert response: ' . $responseCollection->toJson())); | |
return false; | |
} | |
return true; | |
} catch (\Exception $e) { | |
$data = [ | |
'Data' => [ | |
'Query' => $params, | |
'Environment' => config('app.env'), | |
'Host' => request()->getHttpHost(), | |
] | |
]; | |
bugsnag($e, 'error', $data); | |
throw $e; | |
} | |
} | |
public function index($properties) | |
{ | |
if (empty($properties)) { | |
return true; | |
} | |
$params = $this->buildIndexQuery($properties); | |
if (empty($params['body'])) { | |
return true; | |
} | |
$response = (ClientBuilder::create()->setHosts([\config('services.elasticsearch.host')]) | |
->setBasicAuthentication(\config('services.elasticsearch.username'), \config('services.elasticsearch.password'))->build())->bulk($params); | |
return $this->getErrorLog($response, 'index'); | |
} | |
/** | |
* Build update query for elasticsearch without db queries | |
* | |
* @param $addresses | |
* @param bool $doc_as_upsert | |
* @return mixed | |
*/ | |
public function buildUpdateParamsQueryLess($addresses, $doc_as_upsert=true, $refresh) { | |
$params['body'] = []; | |
$params['refresh'] = $refresh; | |
foreach ($addresses as $address) { | |
if (empty($address['id'])) { | |
continue; | |
} | |
$params['body'][] = [ | |
'update' => [ | |
'_id' => $address['id'], | |
'_index' => config('constant.elastic.property.index'), | |
'_type' => config('constant.elastic.property.type'), | |
'retry_on_conflict' => 3, | |
], | |
]; | |
$address['index_updated_at'] = str_replace(' ', 'T', now()->toDateTimeString()); | |
$params['body'][] = [ | |
'doc' => $address, | |
'doc_as_upsert' => $doc_as_upsert, | |
]; | |
} | |
return $params; | |
} | |
public function buildIndexQuery($addresses) | |
{ | |
$params['body'] = []; | |
foreach ($addresses as $address) { | |
$params['body'][] = [ | |
'index' => [ | |
'_id' => $address['id'], | |
'_index' => config('constant.elastic.property.index'), | |
'_type' => config('constant.elastic.property.type'), | |
], | |
]; | |
$address['index_updated_at'] = str_replace(' ', 'T', now()->toDateTimeString()); | |
$params['body'][] = $address; | |
} | |
return $params; | |
} | |
/** | |
* prepare query to update elastic documents | |
* @param $properties | |
* @param array $fields | |
* @param bool $isIdsOnly | |
* @return mixed | |
*/ | |
public function buildUpdateParams($properties, $fields = [], $isIdsOnly = true) | |
{ | |
$addresses = $properties; | |
if (is_array($properties)) { | |
$addresses = collect($properties); | |
} | |
// Get addresses model based on the ids | |
if ($isIdsOnly) { | |
$addresses = Address::query()->findMany($properties); | |
} | |
$params['body'] = []; | |
$addresses->each(function ($address) use (&$params, $fields) { | |
$params['body'][] = [ | |
'update' => [ | |
'_id' => $address->getKey(), | |
'_index' => config('constant.elastic.property.index'), | |
'_type' => config('constant.elastic.property.type'), | |
'retry_on_conflict' => 3 | |
], | |
]; | |
$addressArray = $address->toSearchableArrayByFields($fields); | |
$addressArray['index_updated_at'] = str_replace(' ', 'T', now()->toDateTimeString()); | |
$params['body'][] = [ | |
'doc' => $addressArray | |
]; | |
}); | |
return $params; | |
} | |
/** | |
* delete elastic documents | |
* | |
* @param $properties | |
* @param bool $isIdsOnly | |
* @return bool | |
* @throws \ReflectionException | |
*/ | |
public function delete($properties, $isIdsOnly = true) | |
{ | |
if (empty($properties)) { | |
return true; | |
} | |
$params = $this->buildDeleteParams($properties, $isIdsOnly); | |
$response = (ClientBuilder::create()->setHosts([config('services.elasticsearch.host')]) | |
->setBasicAuthentication(config('services.elasticsearch.username'), config('services.elasticsearch.password')) | |
->build())->bulk($params); | |
return $this->getErrorLog($response, 'delete'); | |
} | |
/** | |
* @param $response | |
* @param $type | |
* @return bool | |
*/ | |
private function getErrorLog($response, $type): bool | |
{ | |
if ($response['errors']) { | |
$responseCollection = collect($response['items'])->whereNotNull($type . '.error'); | |
bugsnag(new \Exception('Bulk operation response of type: ' . $type . ' : ' . $responseCollection->toJson())); | |
return false; | |
} | |
return true; | |
} | |
/** | |
* prepare query to delete elastic documents | |
* | |
* @param $properties | |
* @param bool $isIdsOnly | |
* @return mixed | |
* @throws \ReflectionException | |
*/ | |
public function buildDeleteParams($properties, $isIdsOnly = true) | |
{ | |
$params['body'] = []; | |
foreach ($properties as $property) { | |
$params['body'][] = [ | |
'delete' => [ | |
'_id' => $isIdsOnly ? $property : $property->getKey(), | |
'_index' => $isIdsOnly ? config('constant.elastic.property.index') : $property->searchableAs(), | |
'_type' => $isIdsOnly ? config('constant.elastic.property.type') : Str::lower((new ReflectionClass($property))->getShortName()), | |
], | |
]; | |
} | |
return $params; | |
} | |
/** | |
* Prepare search query and get addresses from elasticsearch | |
* | |
* @param $data | |
* @return mixed|object | |
*/ | |
public function getAddresses($data) | |
{ | |
$this->prepareQuery($data); | |
$this->aggregate(); | |
$elasticHits = $this->search(); | |
if (isset($data['preference']) && !empty($data['preference'])) { | |
$this->preference = $data['preference']; | |
} | |
return Container::getInstance()->makeWith(LengthAwarePaginator::class, [ | |
'items' => $elasticHits->hits->hits, | |
'total' => $elasticHits->hits->total->value, | |
'perPage' => $data['pagesize'] ?: 15, | |
'currentPage' => $data['page'] ?: 1, | |
'options' => [ | |
'path' => Paginator::resolveCurrentPath(), | |
'pageName' => 'page', | |
'aggregations' => $elasticHits->aggregations, | |
] | |
]); | |
} | |
public function getAddressCount($data = [], $shard=NULL) | |
{ | |
$this->prepareQuery($data); | |
$elasticHits = $this->count($shard); | |
$this->reset(); | |
return $elasticHits->count; | |
} | |
public function getPropertiesCountByReset($data = []) | |
{ | |
$this->prepareQuery($data); | |
unset($this->baseQuery['sort']); // unset sort due to count doesnt support sort | |
$elasticHits = $this->count(); | |
$this->reset(); | |
return $elasticHits->count; | |
} | |
/** | |
* Get all Address Ids from the elasticsearch based on filters with custom source | |
* @param $data | |
* @param $source | |
* @return Collection | |
*/ | |
public function getRawProperties($data, $source, $user=null): Collection | |
{ | |
$this->prepareQuery($data, true, false, $user); | |
$this->source($source); | |
if (!empty($data['limit'])) { | |
$this->limit($data['limit']); | |
} | |
if (!empty($data['offset'])) { | |
$this->prepareCustomPaginate($data['offset']); | |
} | |
$elasticHits = $this->search(); | |
$addressesHits = collect($elasticHits->hits->hits); | |
$this->reset(); | |
$this->resetSort(); | |
return $addressesHits; | |
} | |
/** | |
* Get all Address Ids from the elasticsearch based on filters | |
* @param $data | |
* @return array | |
*/ | |
public function getAddressIds($data) | |
{ | |
$requestLimit = !empty($data['limit']) ? $data['limit'] : 10000; | |
$data['limit'] = $requestLimit > 10000 ? 10000 : $requestLimit; | |
$this->prepareQuery($data, false); | |
$this->limit($data['limit']); | |
$elasticHits = $this->search(); | |
$addressesHits = collect($elasticHits->hits->hits); | |
$addressesCount = $addressesHits->count(); | |
$addresses = $addressesHits->pluck('_source.id')->toArray(); | |
$lastAddress = $addressesHits->last(); | |
// Loop the requests to get more than 10,000 records | |
if ($requestLimit > 10000) { | |
$searchAfterLimit = $requestLimit - $addressesCount; | |
while ($searchAfterLimit > 0 && $lastAddress) { | |
$searchRequestLimit = $searchAfterLimit > 10000 ? 10000 : $searchAfterLimit; | |
$this->limit($searchRequestLimit); | |
$this->searchAfter($lastAddress->sort); | |
$elasticSearchAfterHits = $this->search(); | |
$addressesSearchAfterHits = collect($elasticSearchAfterHits->hits->hits); | |
$addressesCount += $addressesSearchAfterHits->count(); | |
$searchAfterAddresses = $addressesSearchAfterHits->pluck('_source.id')->toArray(); | |
$addresses = array_merge($addresses, $searchAfterAddresses); | |
$lastAddress = $addressesSearchAfterHits->last(); | |
$searchAfterLimit = $requestLimit - $addressesCount; | |
} | |
} | |
return $addresses; | |
} | |
/** | |
* @param $data | |
* @param bool $checkAmount | |
* @return array | |
*/ | |
public function getAddressIdsScanScroll($data, $checkAmount = false): array | |
{ | |
$elasticMaxResultWindow = 10000; | |
$searchQueryParams = ['scroll' => '1m']; | |
$requestLimit = !empty($data['limit']) ? $data['limit'] : $elasticMaxResultWindow; | |
$data['limit'] = $requestLimit > $elasticMaxResultWindow ? $elasticMaxResultWindow : $requestLimit; | |
$this->prepareQuery($data, false); | |
if (!empty($data['from']) && !empty($data['to'])) { | |
$elasticFrom = $data['from'] - 1; | |
$elasticSize = ($data['to'] - $data['from']) + 1; | |
$ranges = splitRange($elasticFrom, $elasticSize, $elasticMaxResultWindow); | |
$firstRange = array_shift($ranges); | |
if ($firstRange) { | |
$elasticFrom = $firstRange['from']; | |
$elasticSize = $firstRange['to']; | |
} | |
$searchQueryParams = []; | |
$this->prepareCustomPaginate($elasticFrom); | |
$this->limit($elasticSize); | |
} else { | |
$this->limit($data['limit']); | |
} | |
if ($checkAmount) { | |
$this->source(['id', 'is_skip_traced']); | |
} | |
$elasticHits = $this->search($searchQueryParams); | |
$addressesHits = collect($elasticHits->hits->hits); | |
$addressesCount = $addressesHits->count(); | |
if ($checkAmount) { | |
$addresses = $addressesHits->map(function ($item) { | |
return [ | |
'id' => $item->_source->id, | |
'is_skip_traced' => $item->_source->is_skip_traced | |
]; | |
})->toArray(); | |
} else { | |
$addresses = $addressesHits->pluck('_source.id')->toArray(); | |
} | |
$scrollIds = []; | |
$scrollId = data_get($elasticHits, '_scroll_id'); | |
if ($scrollId) { | |
$scrollIds = [$scrollId]; | |
} | |
// Loop the requests to get more than 10,000 records | |
if ($requestLimit > $elasticMaxResultWindow && !empty($scrollId)) { | |
$searchAfterLimit = $requestLimit - $addressesCount; | |
while ($searchAfterLimit > 0) { | |
$elasticSearchAfterHits = $this->searchScroll($scrollId); | |
//Collect different scroll ids to delete scroll session post completion | |
if ($elasticSearchAfterHits->_scroll_id != $scrollId) { | |
$scrollId = $elasticSearchAfterHits->_scroll_id; | |
$scrollIds[] = $scrollId; | |
} | |
$addressesSearchAfterHits = collect($elasticSearchAfterHits->hits->hits); | |
if ($checkAmount) { | |
$searchAfterAddresses = $addressesSearchAfterHits->map(function ($item) { | |
return [ | |
'id' => $item->_source->id, | |
'is_skip_traced' => $item->_source->is_skip_traced | |
]; | |
}); | |
} else { | |
$searchAfterAddresses = $addressesSearchAfterHits->pluck('_source.id'); | |
} | |
if (!$addressesSearchAfterHits->count()) { | |
break; | |
} | |
if ($searchAfterLimit < $elasticMaxResultWindow) { | |
$searchAfterAddresses->splice($searchAfterLimit); | |
$searchAfterAddresses = collect($searchAfterAddresses->all()); | |
} | |
$addressesCount += count($searchAfterAddresses); | |
$addresses = array_merge($addresses, $searchAfterAddresses->toArray()); | |
$searchAfterLimit = $requestLimit - $addressesCount; | |
} | |
} | |
// Loop the requests to get remaining records of pending ranges | |
if (!empty($ranges)) { | |
foreach ($ranges as $range) { | |
$searchQueryParams = []; | |
$this->prepareCustomPaginate($range['from']); | |
$this->limit($range['to']); | |
$elasticHits = $this->search($searchQueryParams); | |
$addressesSearchAfterHits = collect($elasticHits->hits->hits); | |
if ($checkAmount) { | |
$searchAfterAddresses = $addressesSearchAfterHits->map(function ($item) { | |
return [ | |
'id' => $item->_source->id, | |
'is_skip_traced' => $item->_source->is_skip_traced | |
]; | |
}); | |
} else { | |
$searchAfterAddresses = $addressesSearchAfterHits->pluck('_source.id'); | |
} | |
$addresses = array_merge($addresses, $searchAfterAddresses->toArray()); | |
} | |
} | |
if ($scrollIds) { | |
$this->clearScrolls($scrollIds); | |
} | |
return $addresses; | |
} | |
/** | |
* Prepare the main | |
* | |
* @param $data | |
* @param bool $getPaginate | |
* @param bool $allFields | |
*/ | |
public function prepareQuery($data, $getPaginate = true, $allFields = false, $user=null) | |
{ | |
if (isset($data['bound_box_lat_lon'])) { | |
$this->baseQuery['query']['bool']['must'][] = $this->term('inserted_by', empty($data['user_id']) ? appCompanyId() : $data['user_id']); | |
} else { | |
$this->baseQuery['query']['bool']['filter'][] = $this->term('inserted_by', empty($data['user_id']) ? appCompanyId() : $data['user_id']); | |
} | |
if ($allFields === false) { | |
$this->selectFields($data); | |
} else { | |
unset($this->baseQuery['_source']); | |
} | |
if (!empty($data['aggregate_filter'])) { | |
$aggregateFields = config('constant.aggregation'); | |
if (isset($aggregateFields[$data['aggregate_filter']])) { | |
$value = $aggregateFields[$data['aggregate_filter']]['aggregation_field'] == 'property_is_vacant' | |
? $aggregateFields[$data['aggregate_filter']]['aggregation_value'] | |
: !!$aggregateFields[$data['aggregate_filter']]['aggregation_value']; | |
$this->baseQuery['post_filter'] = $this->term($aggregateFields[$data['aggregate_filter']]['aggregation_field'], $value); | |
} | |
} | |
if (!empty($data['global_filter'])) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->searchByKeyword($data['global_filter']); | |
} | |
if (!empty($data['address_ids'])) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->terms('id', $data['address_ids']); | |
} | |
if (!empty($data['uncheckedids'])) { | |
$this->baseQuery['query']['bool']['must_not'][] = $this->terms('id', $data['uncheckedids']); | |
} | |
if (isset($data['opt_out']) && in_array($data['opt_out'], [1, 0, 'both'])) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->optOut($data['opt_out']); | |
} | |
if (isset($data['self_managed']) && in_array($data['self_managed'], [1, 0, 'both'])) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->selfManaged($data['self_managed']); | |
} | |
if (isset($data['is_dialer_pushed']) && in_array($data['is_dialer_pushed'], [true, false])) { | |
$value = $data['is_dialer_pushed']; | |
$field = 'is_dialer_pushed'; | |
$term = $this->term($field, $value); | |
if ($value) { | |
$this->baseQuery['query']['bool']['filter'][] = $term; | |
} else { | |
// field not exist or false value query | |
$orQuery = [$this->notExist($field), $term]; | |
$this->baseQuery['query']['bool']['must'][] = [ | |
"bool" => [ | |
"minimum_should_match" => 1, | |
"should" => $orQuery | |
] | |
]; | |
} | |
} | |
if (isset($data['quick_list']) && is_array($data['quick_list']) && !empty($data['quick_list'])) { | |
$segment = $this->setQuickListQuery($data['quick_list']); | |
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $segment); | |
} | |
if (!empty($data['list_id']) && !empty($data['camp_cond_filter'])) { | |
$segment = $this->includeList($data['list_id'], $data['camp_cond_filter']); | |
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $segment); | |
} | |
if (!empty($data['lead_status'])) { | |
if (in_array(PropertyLeadStatus::LEAD_STATUS_DEFAULT_ID, $data['lead_status'])) { | |
$this->baseQuery['query']['bool']['must'][] = [ | |
"bool" => [ | |
"minimum_should_match" => 1, | |
"should" => [$this->terms('lead_status', $data['lead_status']), $this->mustNotExistField('lead_status')] | |
] | |
]; | |
} else { | |
$segment = $this->includeLeadStatus($data['lead_status']); | |
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $segment); | |
} | |
} | |
if (!empty($data['list_id2'])) { | |
$this->baseQuery['query']['bool']['must_not'][] = $this->excludeList($data['list_id2']); | |
} | |
if (!empty($data['mailer1']) && !empty($data['mailer_cond_filter'])) { | |
$segment = $this->includeTag($data['mailer1'], $data['mailer_cond_filter']); | |
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $segment); | |
} | |
if (!empty($data['mailer2'])) { | |
$this->baseQuery['query']['bool']['must_not'][] = $this->excludeTag($data['mailer2']); | |
} | |
if (isset($data['camp_from']) && !is_null($data['camp_from']) && isset($data['camp_to']) && !is_null($data['camp_to'])) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->listCount($data['camp_from'], $data['camp_to']); | |
} | |
if (isset($data['lead_score_from']) && !is_null($data['lead_score_from']) && isset($data['lead_score_to']) && !is_null($data['lead_score_to'])) { | |
$leadScoreFrom = $data['lead_score_from']; | |
$leadScoreTo = $data['lead_score_to']; | |
if($leadScoreFrom <= 0 && $leadScoreTo >= 0) { | |
$this->baseQuery['query']['bool']['must'][] = [ | |
"bool" => [ | |
"minimum_should_match" => 1, | |
"should" => [$this->range('lead_score', $leadScoreFrom, $leadScoreTo), $this->mustNotExistField('lead_score')] | |
] | |
]; | |
} | |
else { | |
$this->baseQuery['query']['bool']['must'][] = [ | |
"bool" => [ | |
"minimum_should_match" => 1, | |
"should" => [$this->range('lead_score', $leadScoreFrom, $leadScoreTo)] | |
] | |
]; | |
} | |
} | |
if (isset($data['mailer_from']) && !is_null($data['mailer_from']) && isset($data['mailer_to']) && !is_null($data['mailer_to'])) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->tagCount($data['mailer_from'], $data['mailer_to']); | |
} | |
if (isset($data['image_count_from']) && !is_null($data['image_count_from']) && isset($data['image_count_to']) && !is_null($data['image_count_to'])) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->imageCount($data['image_count_from'], $data['image_count_to']); | |
} | |
if (isset($data['images_count_from']) && !is_null($data['images_count_from']) && isset($data['images_count_to']) && !is_null($data['images_count_to'])) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->imagesCount($data['images_count_from'], $data['images_count_to']); | |
} | |
if (!empty($data['absentee']) && in_array($data['absentee'], Address::getAbsenteeStatuses())) { | |
$segment = $this->absentee($data['absentee']); | |
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $segment); | |
} | |
if (!empty($data['skiptraced'])) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->skipTraced($data['skiptraced']); | |
} | |
if (!empty($data['is_vacant'])) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->propertyVacant($data['is_vacant']); | |
} | |
if (!empty($data['is_mailing_vacant'])) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->mailingVacant($data['is_mailing_vacant']); | |
} | |
if (isset($data['has_phone_numbers']) && $data['has_phone_numbers'] != '') { | |
if ($data['has_phone_numbers'] == '1') { | |
$this->baseQuery['query']['bool']['filter'][] = $this->mustExistsField('phone_numbers'); | |
} else { | |
$this->baseQuery['query']['bool']['filter'][] = $this->notExist('phone_numbers'); | |
} | |
} | |
if (!empty($data['bound_box_lat_lon'])) { | |
$this->baseQuery['query']['bool']['filter']['geo_bounding_box']['property_lat_lon'] = [ | |
'top_left' => [ | |
'lat' => $data['bound_box_lat_lon']['nw_lat'], | |
'lon' => $data['bound_box_lat_lon']['nw_lon'], | |
], | |
'bottom_right' => [ | |
'lat' => $data['bound_box_lat_lon']['se_lat'], | |
'lon' => $data['bound_box_lat_lon']['se_lon'], | |
], | |
]; | |
} | |
if (!empty($data['greater_than_id'])) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->greaterThan('id',$data['greater_than_id'], false); | |
} | |
if (!empty($data['filter_data']->rules)) { | |
$condition = strtolower($data['filter_data']->condition ?? 'and'); | |
$segments = $this->customQuery($data['filter_data'], $user); | |
// Append conditions to base query for must and must not operations | |
if ($condition == 'and') { | |
if (!empty($segments['not'])) { | |
if (!isset($this->baseQuery['query']['bool']['must_not'])) { | |
$this->baseQuery['query']['bool']['must_not'] = []; | |
} | |
$this->baseQuery['query']['bool']['must_not'] = array_merge($this->baseQuery['query']['bool']['must_not'], $segments['not']); | |
} | |
if (!empty($segments['filter'])) { | |
if (!isset($this->baseQuery['query']['bool']['filter'])) { | |
$this->baseQuery['query']['bool']['filter'] = []; | |
} | |
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $segments['filter']); | |
} | |
if (!empty($segments['must'])) { | |
if (!isset($this->baseQuery['query']['bool']['must'])) { | |
$this->baseQuery['query']['bool']['must'] = []; | |
} | |
$this->baseQuery['query']['bool']['must'] = array_merge($this->baseQuery['query']['bool']['must'], $segments['must']); | |
} | |
} // Append conditions to base query for should and should not operations | |
else { | |
if (!empty($segments['filter']) || !empty($segments['not']) || !empty($segments['must'])) { | |
if (!isset($this->baseQuery['query']['bool']['should'])) { | |
$this->baseQuery['query']['bool']['should'] = []; | |
} | |
if (!empty($segments['not'])) { | |
$query = []; | |
$query['bool']['must_not'] = $segments['not']; | |
if (!isset($this->baseQuery['query']['bool']['should'])) { | |
$this->baseQuery['query']['bool']['should'] = []; | |
} | |
$this->baseQuery['query']['bool']['should'] = array_merge($this->baseQuery['query']['bool']['should'], [$query]); | |
} | |
if (!empty($segments['filter'])) { | |
if (!isset($this->baseQuery['query']['bool']['should'])) { | |
$this->baseQuery['query']['bool']['should'] = []; | |
} | |
$this->baseQuery['query']['bool']['should'] = array_merge($this->baseQuery['query']['bool']['should'], $segments['filter']); | |
} | |
if (!empty($segments['must'])) { | |
if (!isset($this->baseQuery['query']['bool']['should'])) { | |
$this->baseQuery['query']['bool']['should'] = []; | |
} | |
$this->baseQuery['query']['bool']['should'] = array_merge($this->baseQuery['query']['bool']['should'], $segments['must']); | |
} | |
} | |
} | |
} | |
if (!empty($data['column_filter']->rules)) { | |
$filterSegment = $this->customQuery($data['column_filter'], $user); | |
if (!empty($filterSegment['not'])) { | |
if (!isset($this->baseQuery['query']['bool']['must_not'])) { | |
$this->baseQuery['query']['bool']['must_not'] = []; | |
} | |
$this->baseQuery['query']['bool']['must_not'] = array_merge($this->baseQuery['query']['bool']['must_not'], $filterSegment['not']); | |
} | |
if (!empty($filterSegment['filter'])) { | |
if (!isset($this->baseQuery['query']['bool']['filter'])) { | |
$this->baseQuery['query']['bool']['filter'] = []; | |
} | |
$this->baseQuery['query']['bool']['filter'] = array_merge($this->baseQuery['query']['bool']['filter'], $filterSegment['filter']); | |
} | |
if (!empty($filterSegment['must'])) { | |
if (!isset($this->baseQuery['query']['bool']['must'])) { | |
$this->baseQuery['query']['bool']['must'] = []; | |
} | |
$this->baseQuery['query']['bool']['must'] = array_merge($this->baseQuery['query']['bool']['must'], $filterSegment['must']); | |
} | |
/** | |
* Special case for lead status to manage lead_status field , if no key lead_status exist in the document | |
* */ | |
if (collect($data['column_filter']->rules)->pluck('field')->count() > 0) { | |
$leadStatusFilter = collect($data['column_filter']->rules)->filter(function ($obj) { | |
if ($obj->field == 'lead_status') { | |
return true; | |
} | |
}); | |
$leadStatusFilter = $leadStatusFilter->first(); | |
if (!empty($leadStatusFilter) && $leadStatusFilter->value == PropertyLeadStatus::LEAD_STATUS_DEFAULT_ID) { | |
$this->baseQuery['query']['bool']['must'][] = [ | |
"bool" => [ | |
"minimum_should_match" => 1, | |
"should" => [$this->term('lead_status', $leadStatusFilter->value), $this->mustNotExistField('lead_status')] | |
] | |
]; | |
} | |
} | |
} | |
if (!empty($data['sort_data'])) { | |
if (!empty($data['global_filter']) && $data['sort_data'] == 'id') { | |
$data['sort_data'] = '_score'; | |
$data['sort_type'] = 'desc'; | |
} | |
$this->baseQuery['sort'][] = $this->sortBy($data['sort_data'], isset($data['sort_type']) ? $data['sort_type'] : 'asc'); | |
} | |
if (Arr::has($data, 'skip')) { | |
$this->prepareCustomPaginate(Arr::get($data, 'skip')); | |
} | |
if ($getPaginate) { | |
$this->paginate(Arr::get($data, 'page', 1), Arr::get($data, 'pagesize', 15)); | |
} | |
} | |
private function sortBy($field, $direction) | |
{ | |
$field = $field == 'updated_at' ? 'updated_date' : $field; | |
$mapping = config('constant.filter.mapping'); | |
$mappingCollection = collect($mapping); | |
if (isset($mapping[$field])) { | |
$map = $mapping[$field]; | |
} else { | |
$map = $mappingCollection->where('elastic', $field)->first(); | |
if (empty($map)) { | |
$map = [ | |
'elastic' => 'id', | |
'type' => 'long' | |
]; | |
} | |
} | |
if ($field == '_score') { | |
$map = [ | |
'elastic' => $field, | |
'type' => 'long' | |
]; | |
} | |
$sortBy = $this->getSortField($map['elastic'], $map['type']); | |
return $this->sort($sortBy, $direction); | |
} | |
private function getSortField($field, $type = '') | |
{ | |
if ($type == 'text') { | |
$field .= '.keyword'; | |
} | |
// create sorting query for nested data type loan | |
if ($field == "loan") { | |
$field = "loan.interest_rate"; | |
} | |
return $field; | |
} | |
/** | |
* Generate sub segment for search by keyword | |
* | |
* @param $keyword | |
* @return array[] | |
*/ | |
private function searchByKeyword($keyword) | |
{ | |
$phoneKeys = config('constant.phone_keys'); | |
$emailKeys = config('constant.email_keys'); | |
$customKeys = config('constant.custom_keys'); | |
$specialCar = strpos($keyword, '+') == false; | |
if ($specialCar) { | |
$fields = [ | |
'full_name', | |
'property_full_address', | |
'mailing_full_address', | |
]; | |
} else { | |
$fields = [ | |
'full_name.keyword', | |
'property_full_address.keyword', | |
'mailing_full_address.keyword', | |
]; | |
} | |
foreach ($phoneKeys as $phoneKey) { | |
$fields[] = $phoneKey . ($specialCar ? '' : '.keyword'); | |
} | |
foreach ($emailKeys as $emailKey) { | |
$fields[] = $emailKey . ($specialCar ? '' : '.keyword'); | |
} | |
foreach ($customKeys as $customKey) { | |
$fields[] = $customKey . ($specialCar ? '' : '.keyword'); | |
} | |
return $this->queryString($keyword, $fields); | |
} | |
/** | |
* generate sub segment for opt_out | |
* | |
* @param $optOutStatus | |
* @return array[] | |
*/ | |
private function optOut($optOutStatus) | |
{ | |
if ($optOutStatus == 'both') { | |
return $this->exists('opt_out'); | |
} | |
return $this->term('opt_out', !!$optOutStatus); | |
} | |
/** | |
* generate sub segment for self_managed | |
* | |
* @param $selfManagedStatus | |
* @return array[] | |
*/ | |
private function selfManaged($selfManagedStatus) | |
{ | |
if ($selfManagedStatus == 'both') { | |
return $this->exists('self_managed'); | |
} | |
return $this->term('self_managed', !!$selfManagedStatus); | |
} | |
/** | |
* generate sub segment for quick lists | |
* | |
* @param $quickList | |
* @return array[] | |
*/ | |
private function setQuickListQuery($quickList) | |
{ | |
$segment = []; | |
foreach ($quickList as $listName => $value) { | |
$segment[] = $this->term("quick_list.{$listName}", $value); | |
} | |
return $segment; | |
} | |
/** | |
* generate sub segment for include lists | |
* | |
* @param $listIds | |
* @param $type | |
* @return array | |
*/ | |
private function includeList($listIds, $type) | |
{ | |
if ($type == 'exactMatchAny') { | |
$segment = []; | |
foreach ($listIds as $listId) { | |
$segment[] = $this->terms('list_ids', [$listId]); | |
} | |
return $segment; | |
} | |
return [$this->terms('list_ids', $listIds)]; | |
} | |
/** | |
* generate sub segment for include lists | |
* | |
* @param $listIds | |
* @param $type | |
* @return array | |
*/ | |
private function includeLeadStatus($listIds) | |
{ | |
return [$this->terms('lead_status', $listIds)]; | |
} | |
/** | |
* generate sub segment for exclude lists | |
* | |
* @param $listIds | |
* @return array[] | |
*/ | |
private function excludeList($listIds) | |
{ | |
return $this->terms('list_ids', $listIds); | |
} | |
/** | |
* generate sub segment for include tags | |
* | |
* @param $tagIds | |
* @param $type | |
* @return array | |
*/ | |
private function includeTag($tagIds, $type) | |
{ | |
if ($type == 'exactMatchAny') { | |
$segment = []; | |
foreach ($tagIds as $tagId) { | |
$segment[] = $this->terms('tag_ids', [$tagId]); | |
} | |
return $segment; | |
} | |
return [$this->terms('tag_ids', $tagIds)]; | |
} | |
/** | |
* generate sub segment for exclude tags | |
* | |
* @param $tagIds | |
* @return array[] | |
*/ | |
private function excludeTag($tagIds) | |
{ | |
return $this->terms('tag_ids', $tagIds); | |
} | |
/** | |
* generate sub segment for list count | |
* | |
* @param $from | |
* @param $to | |
* @return array | |
*/ | |
private function listCount($from, $to) | |
{ | |
return $this->range('lists_count', $from, $to); | |
} | |
/** | |
* generate sub segment for tag count | |
* | |
* @param $from | |
* @param $to | |
* @return array | |
*/ | |
private function tagCount($from, $to) | |
{ | |
return $this->range('tags_count', $from, $to); | |
} | |
/** | |
* generate sub segment for image count | |
* | |
* @param $from | |
* @param $to | |
* @return array | |
*/ | |
private function imageCount($from, $to) | |
{ | |
return $this->range('image_count', $from, $to); | |
} | |
/** | |
* generate sub segment for images count | |
* | |
* @param $from | |
* @param $to | |
* @return array | |
*/ | |
private function imagesCount($from, $to) | |
{ | |
return $this->range('images_count', $from, $to); | |
} | |
/** | |
* generate sub segment for absentee | |
* | |
* @param $absenteeStatus | |
* @return array | |
*/ | |
private function absentee($absenteeStatus) | |
{ | |
$segment = []; | |
switch ($absenteeStatus) { | |
case Address::ABSENTEE_YES: | |
case Address::ABSENTEE_NO: | |
$segment[] = $this->term('sub_address_match', $absenteeStatus == Address::ABSENTEE_NO); | |
break; | |
case Address::ABSENTEE_IN_STATE: | |
$segment[] = $this->term('sub_address_match', false); | |
$segment[] = $this->term('is_in_state', true); | |
break; | |
case Address::ABSENTEE_OUT_OF_STATE: | |
$segment[] = $this->term('is_in_state', false); | |
break; | |
} | |
return $segment; | |
} | |
/** | |
* generate sub segment for skip trace | |
* | |
* @param $skipTraceStatus | |
* @return array[] | |
*/ | |
public function skipTraced($skipTraceStatus) | |
{ | |
return $this->term('is_skip_traced', $skipTraceStatus == Address::SKIPTRACE_YES); | |
} | |
/** | |
* generate sub segment for property vacant status | |
* | |
* @param $vacantStatus | |
* @return array[] | |
*/ | |
private function propertyVacant($vacantStatus) | |
{ | |
$status = 0; | |
$field = 'property_is_vacant'; | |
if ($vacantStatus == Address::VACANT_NEW) { | |
$status = 2; | |
} elseif ($vacantStatus == Address::VACANT_YES) { | |
$status = 1; | |
} else if ($vacantStatus == Address::VACANT_BOTH) { | |
$status = [0, 1]; | |
return $this->terms($field, $status); | |
} | |
return $this->term($field, $status); | |
} | |
/** | |
* generate sub segment for mailing vacant status | |
* | |
* @param $vacantStatus | |
* @return array[] | |
*/ | |
private function mailingVacant($vacantStatus) | |
{ | |
return $this->term('mailing_is_vacant', $vacantStatus == Address::VACANT_YES ? true : false); | |
} | |
/** | |
* Prepare custom queries based on query builder rules | |
* | |
* @param $filterData | |
* @return array[] | |
*/ | |
private function customQuery($filterData, $user=null) | |
{ | |
$notConditions = []; | |
$filterConditions = []; | |
$mustConditions = []; | |
$rules = $filterData->rules; | |
$mapping = config('constant.filter.mapping'); | |
foreach ($rules as $rule) { | |
if ($rule->field == 'lead_status' && $rule->value == PropertyLeadStatus::LEAD_STATUS_DEFAULT_ID) { | |
continue; | |
} | |
if (is_null($rule->value) && !in_array($rule->operator, ['is_empty', 'is_not_empty'])) { | |
continue; | |
} | |
$mapped = $mapping[$rule->id]; | |
$field = $mapped['elastic']; | |
$query = $this->getQuery($rule, $field, $mapped, $user); | |
if (!$query) { | |
continue; | |
} | |
if ($this->isNegateCondition($rule->operator)) { | |
$notConditions[] = $query; | |
continue; | |
} | |
if (!empty($mapped['match']) || $rule->operator == 'contains') { | |
$mustConditions[] = $query; | |
continue; | |
} | |
$filterConditions[] = $query; | |
} | |
return ['not' => $notConditions, 'filter' => $filterConditions, 'must' => $mustConditions]; | |
} | |
/** | |
* check if the operator is negative condition | |
* | |
* @param $operator | |
* @return bool | |
*/ | |
private function isNegateCondition($operator) | |
{ | |
if ($operator == 'not_contains' || $operator == 'not_between' || $operator == 'is_empty') { | |
return true; | |
} | |
return false; | |
} | |
/** | |
* get query based on builder operator | |
* | |
* @param $rule | |
* @param $field | |
* @param $mapped | |
* @return array | |
*/ | |
private function getQuery($rule, $field, $mapped, $user=null) | |
{ | |
$match = !empty($mapped['match']); | |
$value = $rule->value; | |
$operator = $rule->operator; | |
// create nested query | |
if ($rule->type == "nested") { | |
if ($rule->field == "loan_type") { | |
return $this->nestedTypeQuery($field, $rule->field, $value, $rule->operator); | |
} | |
if ($rule->field == "interest_rate") { | |
return $this->nestedTypeRangeQuery($field, $rule->field, $value, $rule->operator); | |
} | |
} | |
if (in_array($field, ['property_vacant_from_date', 'mailing_vacant_from_date']) && isset($rule->input) && $rule->input == 'text') { | |
$value = now()->subMonths(intval($rule->value))->toDateString(); | |
if ($operator == 'less') { | |
$operator = 'greater'; | |
} else { | |
$operator = 'less'; | |
} | |
} | |
switch ($operator) { | |
case 'is_empty': | |
case 'is_not_empty': | |
return $this->exists($field); | |
case 'contains': | |
case 'not_contains': | |
return $mapped['type'] == 'phrase' ? $this->matchPhrase($field, $rule->value) : $this->match($field, $rule->value); | |
case 'less': | |
return $this->lessThan($field, $value); | |
case 'greater': | |
return $this->greaterThan($field, $value); | |
case 'between': | |
case 'not_between': | |
$valueGTE = $rule->value[0]; | |
$valueLTE = $rule->value[1]; | |
$userTimeZone = isset($user) ? $user->default_timezone : userTimeZone(); | |
if ($rule->type == 'date') { | |
$startDate = $rule->value[0] ? $rule->value[0] . ' 00:00:00' : $rule->value[0]; | |
$endDate = $rule->value[1] ? $rule->value[1] . ' 23:59:59' : $rule->value[1]; | |
$valueGTE = str_replace(' ', 'T', gmtTime($startDate, 'Y-m-d H:i:s', $userTimeZone)); | |
$valueLTE = str_replace(' ', 'T', gmtTime($endDate, 'Y-m-d H:i:s', $userTimeZone)); | |
} else if ($rule->type == 'datetime') { | |
$startDate = $rule->value[0]; | |
$endDate = $rule->value[1]; | |
$valueGTE = str_replace(' ', 'T', gmtTime($startDate, 'Y-m-d H:i:s', $userTimeZone)); | |
$valueLTE = str_replace(' ', 'T', gmtTime($endDate, 'Y-m-d H:i:s', $userTimeZone)); | |
} | |
// we are creating should query for lead score, because not all property contain lead_score field | |
if($rule->field == "lead_score"){ | |
// we need to check filter range condition already applied in lead_score, Then we need to remove that query | |
$mustQuery = $this->baseQuery['query']['bool']['must']; | |
if(!empty($mustQuery)){ | |
foreach($mustQuery as $key => $must) { | |
$shouldQuery = $must['bool']['should']; | |
foreach ($shouldQuery as $key => $should) { | |
// check and remove for exist lead_score query | |
if(isset($should['bool']['must_not']['exists']['field']) && $should['bool']['must_not']['exists']['field'] == 'lead_score') { | |
unset($shouldQuery[$key]); | |
} | |
// check filter lead_score range query | |
if(isset($should['range']['lead_score'])) { | |
unset($shouldQuery[$key]); | |
} | |
} | |
$must['bool']['should'] = array_values($shouldQuery); | |
} | |
} | |
// if user selected range include 0, We need to return lead_score column not created properties too | |
if($valueGTE <= 0 && $valueLTE >= 0){ | |
$this->baseQuery['query']['bool']['must'][] = [ | |
"bool" => [ | |
"minimum_should_match" => 1, | |
"should" => [$this->range('lead_score', $valueGTE, $valueLTE), $this->notExist('lead_score')] | |
] | |
]; | |
} | |
else { | |
$this->baseQuery['query']['bool']['must'][] = [ | |
"bool" => [ | |
"minimum_should_match" => 1, | |
"should" => [$this->range('lead_score', $valueGTE, $valueLTE)] | |
] | |
]; | |
} | |
return false; | |
} | |
$valueGTE = ($valueGTE == '' || $valueGTE == null) ? 0 : $valueGTE; | |
$valueLTE = ($valueLTE == '' || $valueLTE == null) ? 0 : $valueLTE; | |
return $this->range($field, $valueGTE, $valueLTE); | |
default: | |
if ($match) { | |
return $this->match($field, $value); | |
} | |
if ($rule->type == 'string') { | |
$field .= '.keyword'; | |
} | |
if ($rule->id == 'is_dialer_pushed' && !$rule->value) { | |
// field not exist or false value query | |
$orQuery = [$this->notExist($field), $this->term($field, false)]; | |
$this->baseQuery['query']['bool']['must'][] = [ | |
"bool" => [ | |
"minimum_should_match" => 1, | |
"should" => $orQuery | |
] | |
]; | |
return false; | |
} | |
if (is_array($rule->value)) { | |
return $this->terms($field, $rule->value); | |
} | |
$value = $rule->value; | |
if (($rule->type == 'boolean' && !is_bool($value)) || $field == 'property_is_usps_valid') { | |
$value = !!((int)$rule->value); | |
} | |
return $this->term($field, $value); | |
} | |
} | |
/** | |
* Get fields from elasticsearch object | |
* | |
* @param $data | |
*/ | |
private function selectFields($data) | |
{ | |
$fields = ['id']; | |
$visualColumns = isset($data['visual_columns']) ? $data['visual_columns'] : []; | |
$mapping = config('constant.filter.mapping'); | |
foreach ($visualColumns as $visualColumn) { | |
if (isset($mapping[$visualColumn])) { | |
$fields[] = $mapping[$visualColumn]['elastic']; | |
} | |
} | |
if (count($fields)) { | |
$this->source($fields); | |
} | |
} | |
/** | |
* Get address by user | |
* | |
* @param $userIds | |
* @return \Illuminate\Support\Collection | |
*/ | |
public function getAddressesByUsers($userIds): \Illuminate\Support\Collection | |
{ | |
$this->baseQuery['size'] = 0; | |
$this->baseQuery['query']['bool']['filter'][] = $this->terms('inserted_by', $userIds); | |
$this->termAggregate('inserted_by'); | |
$elasticResponse = $this->search(); | |
return collect($elasticResponse->aggregations->inserted_by_field->buckets); | |
} | |
private function aggregate() | |
{ | |
$aggregationFields = config('constant.aggregation'); | |
if (empty($aggregationFields)) { | |
return; | |
} | |
$this->baseQuery['aggregations'] = []; | |
foreach ($aggregationFields as $aggregationFieldName => $aggregationField) { | |
$this->baseQuery['aggregations'][$aggregationFieldName . '_field'] = [ | |
'terms' => [ | |
'field' => $aggregationField['aggregation_field'] | |
] | |
]; | |
} | |
} | |
/** | |
* @param $userId | |
* @return mixed | |
*/ | |
public function deleteUserAddresses($userId) | |
{ | |
$this->prepareQuery(['user_id' => $userId]); | |
return $this->deleteByQuery(); | |
} | |
public function getAddressByID($addressID) | |
{ | |
$elasticDocument = $this->getDocumentByID(config('constant.elastic.property.type'), $addressID); | |
return $elasticDocument->found ? $elasticDocument->_source : null; | |
} | |
/** | |
* returns the complete address object from elastic for given IDs without pagination. | |
* | |
* @param $data | |
* @return \Illuminate\Support\Collection | |
*/ | |
public function getAllAddressByMultipleIDs($data) | |
{ | |
$totalRecordsPerLoop = 1000; | |
$perPageSize = $data['pagesize']; | |
$totalRecords = count($data['addresses']); | |
$totalPages = (int)ceil($totalRecords / $perPageSize); | |
$addressIds = collect($data['addresses'])->chunk($totalRecordsPerLoop)->values(); | |
$addressIds = ($addressIds->values()->toArray()); | |
$allAddresses = collect(); | |
foreach ($addressIds as $addresses) { | |
$data['pagesize'] = $totalRecordsPerLoop; | |
$data['size'] = $totalRecordsPerLoop; | |
$data['address_ids'] = []; | |
$data['address_ids'] = array_values($addresses); | |
$this->baseQuery = []; | |
$this->baseQuery['from'] = 0; | |
$this->baseQuery['size'] = $totalRecordsPerLoop; | |
$this->prepareQuery($data, false, true); | |
$elasticHits = $this->search(); | |
$allAddresses->push($elasticHits->hits->hits); | |
} | |
return collect($allAddresses)->flatten(); | |
} | |
/** | |
* Get address IDs greater than last id | |
* @param $userId | |
* @param $addressId | |
* @param int $size | |
* @return array | |
*/ | |
public function getAddressIdsGreaterThan($userId, $addressId, $size = 1000): array | |
{ | |
$this->baseQuery['query']['bool']['filter'][] = $this->term('inserted_by', $userId); | |
$this->baseQuery['query']['bool']['filter'][] = $this->greaterThan('id', $addressId, false); | |
$this->baseQuery['size'] = $size; | |
$this->baseQuery['_source'] = ['id']; | |
$this->baseQuery['sort'][] = $this->sortBy('id', 'asc'); | |
$elasticHits = $this->search(); | |
return collect($elasticHits->hits->hits)->pluck('_source.id')->toArray(); | |
} | |
/** | |
* returns list ids with property count | |
* | |
* @param $data | |
* @return \Illuminate\Support\Collection | |
*/ | |
public function getAddressCountByLists($data = []): \Illuminate\Support\Collection | |
{ | |
$this->prepareQuery($data); | |
$this->baseQuery['from'] = 0; | |
$this->baseQuery['size'] = 0; | |
$this->termAggregate('list_ids', count($data)); | |
$elasticResponse = $this->search(); | |
return collect($elasticResponse->aggregations->list_ids_field->buckets); | |
} | |
/** | |
* Get address chart data using aggregation | |
* | |
* @param $userId | |
* @param $totalLists | |
* @return \Illuminate\Support\Collection | |
*/ | |
public function getDashBoardChartData($userId, $totalLists): \Illuminate\Support\Collection | |
{ | |
$this->baseQuery['size'] = 0; | |
$this->baseQuery['query']['bool']['filter'][] = $this->term('inserted_by', $userId); | |
$this->termAggregate('property_is_vacant', 3); | |
$this->subTermAggregate('property_is_vacant_field', 'lists_count', $totalLists); | |
$elasticResponse = $this->search(); | |
return collect($elasticResponse->aggregations->property_is_vacant_field->buckets); | |
} | |
/** | |
* returns aggregate data ids with property count | |
* | |
* @param $data | |
* @param $field | |
* @param $size | |
* @return \Illuminate\Support\Collection | |
*/ | |
public function getAggregateAddressCounts($data = [], $field, $size): \Illuminate\Support\Collection | |
{ | |
$this->prepareQuery($data); | |
$this->baseQuery['from'] = 0; | |
$this->baseQuery['size'] = 0; | |
$this->termAggregate($field, $size); | |
$elasticResponse = $this->search(); | |
$response = collect($elasticResponse->aggregations->{$field . "_field"}->buckets); | |
$this->reset(); | |
$this->resetAggregate(); | |
return $response; | |
} | |
public function setDrivingRouteIdToNull($drivingRouteIds = [], $userId = null) | |
{ | |
if (empty($drivingRouteIds)) { | |
return false; | |
} | |
if (!empty($drivingRouteIds)) { | |
$this->baseQuery['query']['bool']['filter'][] = $this->terms( | |
'driving_route_id', | |
$drivingRouteIds | |
); | |
} | |
if (!empty($userId)) { | |
$this->baseQuery['query']['bool']['must'][] = $this->term( | |
'inserted_by', | |
$userId | |
); | |
} | |
$this->baseQuery['script'] = [ | |
"source" => "ctx._source.driving_route_id = null", | |
"lang" => "painless" | |
]; | |
return $this->updateByQuery(); | |
} | |
/** | |
* Get address using route id | |
* @param $userId | |
* @param $routeId | |
* @return array | |
*/ | |
public function getAddressUsingRouteId($userId, $routeId, $perPage = 15): object | |
{ | |
$this->baseQuery['size'] = $perPage; | |
$this->baseQuery['query']['bool']['filter'][] = $this->term('inserted_by', $userId); | |
$this->baseQuery['query']['bool']['filter'][] = $this->term('driving_route_id', $routeId); | |
$this->baseQuery['_source'] = ["mls_status","last_sale_price","property_full_address","property_line_1","property_line_2","property_city","property_county","property_state","property_zip","property_lat_lon","quick_list.absentee_owner","quick_list.preforeclosure","quick_list.vacant","image_count","images_count","image_urls","owner_occupied","owner_2_first_name","owner_2_last_name","mailing_county","equity_current_estimated_balance","year_built","bedroom_count","bathroom_count","total_building_area_square_feet","total_open_lien_balance","last_sale_date","lead_status","full_name","first_name","last_name","hash","mls_listing_date","mls_price","owner_residence_months","open_lien_count","id"]; | |
$elasticHits = $this->search(); | |
$addressesHits = collect($elasticHits->hits->hits); | |
$this->reset(); | |
return $addressesHits; | |
} | |
/** | |
* @param int $userId | |
* @param string $distance | |
* @param array $propertyLatLon | |
* @return Collection | |
*/ | |
public function getPropertiesUsingGeoDistance(int $userId, string $distance, array $propertyLatLon) | |
{ | |
$this->baseQuery['query']['bool']['must'][] = $this->term('inserted_by', $userId); | |
$this->baseQuery['query']['bool']['filter'][] = $this->geoDistance([ | |
'distance' => $distance . 'm', | |
'property_lat_lon' => $propertyLatLon | |
]); | |
$this->baseQuery['size'] = 1000; | |
$this->baseQuery['_source'] = [ | |
"mls_status", | |
"property_full_address", | |
"property_lat_lon", | |
"lead_status", | |
"image_urls", | |
"hash" | |
]; | |
$elasticHits = $this->search(); | |
$addressesHits = collect($elasticHits->hits->hits); | |
$this->reset(); | |
$this->baseQuery['size'] = 15; // resetting size to default value | |
return $addressesHits; | |
} | |
/** | |
* Get minmum and maximum of a field by user | |
* | |
* @param $field | |
* @param $userId | |
* @return \Illuminate\Support\Collection | |
*/ | |
public function getFieldMinMaxByUser($field, $userId = null): \Illuminate\Support\Collection | |
{ | |
$userId = empty($userId) ? appCompanyId() : $userId; | |
$this->baseQuery['size'] = 0; | |
$this->baseQuery['query'] = $this->term('inserted_by', $userId); | |
$this->customAggregate($field, 'min'); | |
$this->customAggregate($field, 'max'); | |
$elasticResponse = $this->search(); | |
$this->reset(); | |
return collect($elasticResponse->aggregations); | |
} | |
/** | |
* Update in ES uising guzzle curl | |
* @param $conditions | array | |
* @param $updateFields | array | |
* @return array|mixed | |
*/ | |
public function updateBulkRecord($conditions, $updateFields){ | |
if(is_array($conditions)){ | |
foreach($conditions as $field => $value){ | |
$this->baseQuery['query']['bool']['must'][] = [ | |
'match' => [ | |
$field => $value | |
] | |
]; | |
} | |
} | |
$updateStr = ''; | |
if(is_array($updateFields)){ | |
foreach($updateFields as $field => $value){ | |
$updateStr .= !empty($updateStr) ? ";" : ''; | |
$updateStr .= "ctx._source.".$field." = '".$value."'"; | |
} | |
} | |
$this->baseQuery['script']['source'] = $updateStr; | |
$this->baseQuery['script']['lang'] = 'painless'; | |
return $this->updateByQuery(); | |
} | |
/** | |
* Delete in ES using guzzle curl | |
* @param $conditions | array | |
* | |
* @return array|mixed | |
*/ | |
public function deleteBulkRecord($conditions) | |
{ | |
if(is_array($conditions)){ | |
foreach($conditions as $field => $value){ | |
$this->baseQuery['query']['bool']['must'][] = [ | |
'match' => [ | |
$field => $value | |
] | |
]; | |
} | |
return $this->deleteByQuery(); | |
} | |
} | |
/** | |
* Set porperty custom_images array based on create or delete | |
* | |
* @param $addressId | |
* @param $data | |
* @return true|false | |
*/ | |
public function setCustomImages($addressId, $data = []) | |
{ | |
if ( empty($addressId) ) { | |
return false; | |
} | |
$elasticDocument = $this->getAddressByID($addressId); | |
if( empty($elasticDocument) ) { | |
return false; | |
} | |
$customImages = !empty($elasticDocument->custom_images) ? $elasticDocument->custom_images : []; | |
$imageUrls = !empty($elasticDocument->image_urls) ? $elasticDocument->image_urls : []; | |
//set create for insert images | |
if( !empty($data['create']) ) { | |
//merge two array and create one array customImages | |
$customImages = array_merge($data['create'], $customImages); | |
} elseif( !empty($data['delete']) ) { //set delete for remove images | |
array_filter($customImages, function( $value, $key ) use( &$customImages, $data ) { | |
//check for image url exists if yes then unlink else reuturn false. | |
if ( trim(parse_url($data['delete'])['path'],"/") == trim(parse_url($value)['path'],"/") ) { | |
unset($customImages[$key]); | |
} | |
}, ARRAY_FILTER_USE_BOTH); | |
} | |
//calculate count for final image count | |
$imageCount = count(array_merge($customImages, $imageUrls)); | |
//prepare query for elastic serach and update | |
$this->baseQuery['query']['bool']['filter'][] = $this->term( | |
'id', $addressId | |
); | |
$this->baseQuery['script'] = [ | |
"source" => "ctx._source.custom_images = params.custom_images;ctx._source.image_count = params.image_count;ctx._source.images_count = params.images_count;", | |
"lang" => "painless", | |
"params" => [ | |
"custom_images" => array_values($customImages), | |
"image_count" => ($imageCount >= 127) ? null : $imageCount, | |
"images_count" => $imageCount, | |
] | |
]; | |
return $this->updateByQuery(); | |
} | |
/** | |
* @description Get BUlk Operation Data From Elastic and Return Postgres Table BulkOperationData's array of ids | |
* @param $data | |
* @param $jobId | |
* @return array | |
*/ | |
public function getBulkOperationDataFromElasticWithScrollAndStoreIdsInTable($data, $jobId): array | |
{ | |
$bulkOperationDataIds = []; | |
$elasticMaxResultWindow = 10000; | |
$subBatchSize = 1000; | |
$searchQueryParams = ['scroll' => '1m']; | |
if (isset($data['shard'])){ | |
$searchQueryParams['preference'] = $data['shard']; | |
} | |
$requestLimit = !empty($data['limit']) ? $data['limit'] : $elasticMaxResultWindow; | |
$data['limit'] = $requestLimit > $elasticMaxResultWindow ? $elasticMaxResultWindow : $requestLimit; | |
$allFields = (isset($data['visual_columns'])) ? false : true; | |
$this->prepareQuery($data, false, $allFields); | |
if (!empty($data['from']) && !empty($data['to'])) { | |
$elasticFrom = $data['from'] - 1; | |
$elasticSize = ($data['to'] - $data['from']) + 1; | |
$ranges = splitRange($elasticFrom,$elasticSize,$elasticMaxResultWindow); | |
$firstRange = array_shift($ranges); | |
if ($firstRange) { | |
$elasticFrom = $firstRange['from']; | |
$elasticSize = $firstRange['to']; | |
} | |
$searchQueryParams = []; | |
$this->prepareCustomPaginate($elasticFrom); | |
$this->limit($elasticSize); | |
} else { | |
$this->limit($data['limit']); | |
} | |
// First request is called here as it could be scroll or skip/limit & for scroll we need scroll id for other api calls | |
$elasticHits = $this->search($searchQueryParams); | |
$addressesHits = collect($elasticHits->hits->hits); | |
$addressesCount = $addressesHits->count(); | |
if (isset($data['visual_columns'])){ | |
$addresses = ['scroll_id'=>$elasticHits->_scroll_id, 'hits'=> $addressesHits->toArray()]; | |
$isIdsOnly = false; | |
}else{ | |
$addresses = $addressesHits->pluck('_source.id')->toArray(); | |
$isIdsOnly = true; | |
} | |
/** @var BulkOperationService $bulkOperationService */ | |
$bulkOperationService = resolve(BulkOperationService::class); | |
$bulkOperationDataIds = array_merge($bulkOperationDataIds,$bulkOperationService->addDataInBulkOperationInChunk($jobId,$addresses,$subBatchSize,$isIdsOnly)); | |
$scrollIds = []; | |
$scrollId = data_get($elasticHits,'_scroll_id'); | |
if ($scrollId) { | |
$scrollIds = [$scrollId]; | |
} | |
// Loop the requests to get more than 10,000 records | |
if ($requestLimit > $elasticMaxResultWindow && !empty($scrollId)) { | |
$searchAfterLimit = $requestLimit - $addressesCount; | |
while ($searchAfterLimit > 0) { | |
$elasticSearchAfterHits = $this->searchScroll($scrollId); | |
//Collect different scroll ids to delete scroll session post completion | |
if ($elasticSearchAfterHits->_scroll_id != $scrollId) { | |
$scrollId = $elasticSearchAfterHits->_scroll_id; | |
$scrollIds[] = $scrollId; | |
} | |
$addressesSearchAfterHits = collect($elasticSearchAfterHits->hits->hits); | |
if (isset($data['visual_columns'])){ | |
$searchAfterAddresses =$addressesSearchAfterHits; | |
$isIdsOnly = false; | |
$subBatchSize = 1000; | |
}else{ | |
$searchAfterAddresses = $addressesSearchAfterHits->pluck('_source.id'); | |
$isIdsOnly = true; | |
} | |
if (!$addressesSearchAfterHits->count()) { | |
break; | |
} | |
if ($searchAfterLimit < $elasticMaxResultWindow) { | |
$searchAfterAddresses->splice($searchAfterLimit); | |
$searchAfterAddresses = collect($searchAfterAddresses->all()); | |
} | |
$addressesCount += count($searchAfterAddresses); | |
if (isset($data['visual_columns'])){ | |
$searchAfterAddresses = ['scroll_id'=>$elasticSearchAfterHits->_scroll_id, 'hits'=> $searchAfterAddresses->toArray()]; | |
} | |
$bulkOperationDataIds = array_merge($bulkOperationDataIds,$bulkOperationService->addDataInBulkOperationInChunk($jobId,$searchAfterAddresses,$subBatchSize,$isIdsOnly)); | |
$searchAfterLimit = $requestLimit - $addressesCount; | |
} | |
} | |
// Loop the requests to get remaining records of pending ranges | |
if (!empty($ranges)) { | |
foreach ($ranges as $range) { | |
$searchQueryParams = []; | |
$this->prepareCustomPaginate($range['from']); | |
$this->limit($range['to']); | |
$elasticHits = $this->search($searchQueryParams); | |
$addressesSearchAfterHits = collect($elasticHits->hits->hits); | |
$searchAfterAddresses = $addressesSearchAfterHits->pluck('_source.id'); | |
$bulkOperationDataIds = array_merge($bulkOperationDataIds,$bulkOperationService->addDataInBulkOperationInChunk($jobId,$searchAfterAddresses->toArray())); | |
} | |
} | |
if ($scrollIds) { | |
$this->clearScrolls($scrollIds); | |
} | |
return $bulkOperationDataIds; | |
} | |
/** | |
* Fetching properties based on hash | |
* | |
* @param $userId | |
* @param $hashList | |
*/ | |
public function fetchPropertiesByHashList($userId,$hashList) | |
{ | |
$this->baseQuery['query']['bool']['filter'][] = $this->terms('hash',$hashList); | |
$this->baseQuery['query']['bool']['filter'][] = $this->term('inserted_by',$userId); | |
$elasticHits = $this->search(); | |
$this->reset(); | |
return $elasticHits; | |
} | |
/** | |
* Get Address with selected columns from ES with ScanScorll | |
* @param $data | |
* @return array | |
*/ | |
public function getAddressSelectedColumnsScanScroll($data): array | |
{ | |
$elasticMaxResultWindow = 10000; | |
$searchQueryParams = ['scroll' => '1m']; | |
$requestLimit = !empty($data['limit']) ? $data['limit'] : $elasticMaxResultWindow; | |
$data['limit'] = $requestLimit > $elasticMaxResultWindow ? $elasticMaxResultWindow : $requestLimit; | |
$this->prepareQuery($data, false); | |
if (!empty($data['from']) && !empty($data['to'])) { | |
$elasticFrom = $data['from'] - 1; | |
$elasticSize = ($data['to'] - $data['from']) + 1; | |
$ranges = splitRange($elasticFrom, $elasticSize, $elasticMaxResultWindow); | |
$firstRange = array_shift($ranges); | |
if ($firstRange) { | |
$elasticFrom = $firstRange['from']; | |
$elasticSize = $firstRange['to']; | |
} | |
$searchQueryParams = []; | |
$this->prepareCustomPaginate($elasticFrom); | |
$this->limit($elasticSize); | |
} else { | |
$this->limit($data['limit']); | |
} | |
$elasticHits = $this->search($searchQueryParams); | |
$addressesHits = collect($elasticHits->hits->hits); | |
$addressesCount = $addressesHits->count(); | |
$addresses = $addressesHits->toArray(); | |
$scrollIds = []; | |
$scrollId = data_get($elasticHits, '_scroll_id'); | |
if ($scrollId) { | |
$scrollIds = [$scrollId]; | |
} | |
// Loop the requests to get more than 10,000 records | |
if ($requestLimit > $elasticMaxResultWindow && !empty($scrollId)) { | |
$searchAfterLimit = $requestLimit - $addressesCount; | |
while ($searchAfterLimit > 0) { | |
$elasticSearchAfterHits = $this->searchScroll($scrollId); | |
//Collect different scroll ids to delete scroll session post completion | |
if ($elasticSearchAfterHits->_scroll_id != $scrollId) { | |
$scrollId = $elasticSearchAfterHits->_scroll_id; | |
$scrollIds[] = $scrollId; | |
} | |
$addressesSearchAfterHits = collect($elasticSearchAfterHits->hits->hits); | |
$searchAfterAddresses = $addressesSearchAfterHits; | |
if (!$addressesSearchAfterHits->count()) { | |
break; | |
} | |
if ($searchAfterLimit < $elasticMaxResultWindow) { | |
$searchAfterAddresses->splice($searchAfterLimit); | |
$searchAfterAddresses = collect($searchAfterAddresses->all()); | |
} | |
$addressesCount += count($searchAfterAddresses); | |
$addresses = array_merge($addresses, $searchAfterAddresses->toArray()); | |
$searchAfterLimit = $requestLimit - $addressesCount; | |
} | |
} | |
// Loop the requests to get remaining records of pending ranges | |
if (!empty($ranges)) { | |
foreach ($ranges as $range) { | |
$searchQueryParams = []; | |
$this->prepareCustomPaginate($range['from']); | |
$this->limit($range['to']); | |
$elasticHits = $this->search($searchQueryParams); | |
$addressesSearchAfterHits = collect($elasticHits->hits->hits); | |
$addresses = array_merge($addresses, $addressesSearchAfterHits->toArray()); | |
} | |
} | |
if ($scrollIds) { | |
$this->clearScrolls($scrollIds); | |
} | |
return $addresses; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment