Last active
May 29, 2018 09:36
-
-
Save kricha/f98b21365a46238a69bdeedf16745694 to your computer and use it in GitHub Desktop.
Fastest custom pager provider for https://github.com/php-enqueue/enqueue-bundle works great with more then 2m rows, also support multiple (parallel) consumers (workers)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace AppBundle\ElasticSearch\SearchProvider; | |
use Doctrine\ORM\EntityManagerInterface; | |
use FOS\ElasticaBundle\Provider\PagerfantaPager; | |
use FOS\ElasticaBundle\Provider\PagerInterface; | |
use FOS\ElasticaBundle\Provider\PagerProviderInterface; | |
use Pagerfanta\Adapter\CallbackAdapter; | |
use Pagerfanta\Pagerfanta; | |
use Symfony\Component\Cache\Adapter\AdapterInterface; | |
class FastPagerProvider implements PagerProviderInterface | |
{ | |
private $em; | |
private $repository; | |
private $cache; | |
private $pageToLastIdMap; | |
private $populatedClass; | |
private $maxPerPage = 0; | |
public function __construct(EntityManagerInterface $entityManager, AdapterInterface $adapter, string $entityClass) | |
{ | |
$this->em = $entityManager; | |
$this->populatedClass = preg_replace("#\W+#", '_', $entityClass); | |
$this->repository = $this->em->getRepository($entityClass); | |
$this->cache = $adapter; | |
} | |
/** | |
* @param array $options | |
* | |
* @throws \Psr\Cache\InvalidArgumentException | |
* | |
* @return PagerInterface | |
*/ | |
public function provide(array $options = []) | |
{ | |
$this->maxPerPage = $options['max_per_page']; | |
$this->em->getConfiguration()->setSQLLogger(null); | |
$this->preparePageToLastIdMap(); | |
$nbCallback = function () { return $this->getNbResultsCallback(); }; | |
$sliceCallback = function ($offset, $length) use ($options) { | |
return $this->getSliceCallback($length, $options['first_page']); | |
}; | |
$pagerFanta = new Pagerfanta(new CallbackAdapter($nbCallback, $sliceCallback)); | |
$pager = new PagerfantaPager($pagerFanta); | |
return $pager; | |
} | |
/** | |
* Prepare pageToLastIdMap for getting last id in parallel consumers. | |
* | |
* @param $maxResults | |
* | |
* @throws \Psr\Cache\InvalidArgumentException | |
*/ | |
private function preparePageToLastIdMap(): void | |
{ | |
$mapCache = $this->cache->getItem("_fo_es_populate_map_{$this->populatedClass}_mpp_{$this->maxPerPage}"); | |
if ($mapCache->isHit()) { | |
$this->pageToLastIdMap = $mapCache->get(); | |
} else { | |
$lastId = 0; | |
$pages = []; | |
$page = 1; | |
$alias = uniqid('qb_'); | |
$query = $this->repository->createQueryBuilder($alias) | |
->select("${alias}.id") | |
->where("${alias}.id > ${lastId}") | |
->orderBy("${alias}.id", 'asc') | |
->setMaxResults($this->maxPerPage); | |
while ($results = $query->getQuery()->getResult()) { | |
$query->where("${alias}.id > ${lastId}"); | |
$newLastId = end($results)['id']; | |
$pages[$page] = $lastId; | |
if ($lastId !== $newLastId) { | |
$lastId = $newLastId; | |
++$page; | |
} | |
} | |
$mapCache->set($pages)->expiresAfter(300); | |
$this->cache->save($mapCache); | |
$this->pageToLastIdMap = $pages; | |
} | |
} | |
/** | |
* Get number of total results. | |
* | |
* @return mixed | |
*/ | |
private function getNbResultsCallback() | |
{ | |
return $this->repository->createQueryBuilder(uniqid('qb_')) | |
->select('count(1)') | |
->getQuery() | |
->getSingleScalarResult(); | |
} | |
/** | |
* Get results for current page. | |
* | |
* @param $length | |
* @param $currentPage | |
* | |
* @throws \Psr\Cache\InvalidArgumentException | |
* | |
* @return mixed | |
*/ | |
private function getSliceCallback($length, $currentPage) | |
{ | |
$lastId = $this->pageToLastIdMap[$currentPage]; | |
$populatePageKeyFormat = "_fos_es_pop_{$this->populatedClass}_mpp_{$this->maxPerPage}_page_%s_results"; | |
$resultCacheKey = sprintf($populatePageKeyFormat, $currentPage); | |
/** | |
* Get result objects from cache for second request (o_O). | |
*/ | |
$resultCache = $this->cache->getItem($resultCacheKey); | |
if ($resultCache->isHit()) { | |
$resultObjects = $resultCache->get(); | |
} else { | |
$resultObjects = $this->repository->getPopulatingResults($lastId, $length); | |
$resultCache->set($resultObjects)->expiresAfter(10); | |
$this->cache->save($resultCache); | |
} | |
return $resultObjects; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment