Last active
October 6, 2021 17:20
-
-
Save Nightbr/ddb586394d95877dde8ed7445c51d973 to your computer and use it in GitHub Desktop.
FOSElasticaBundle: Doctrine ORM Listener with relation updates
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace AppBundle\Listener\Search; | |
/** | |
* Automatically update ElasticSearch based on changes to the Doctrine source | |
* data. One listener is generated for each Doctrine entity / ElasticSearch type. | |
*/ | |
class CategoryListener extends SearchListener | |
{ | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
services: | |
fos_elastica.listener.app.category: | |
class: AppBundle\Listener\Search\CategoryListener | |
arguments: | |
- @fos_elastica.object_persister.app.category | |
- @fos_elastica.indexable | |
- {"indexName" : "welp", "typeName": "category"} | |
calls: | |
- [ setContainer, [ '@service_container' ] ] | |
tags: | |
- { name: 'doctrine.event_subscriber' } | |
fos_elastica.listener.app.user: | |
class: AppBundle\Listener\Search\UserListener | |
arguments: | |
- @fos_elastica.object_persister.app.user | |
- @fos_elastica.indexable | |
- {"indexName" : "welp", "typeName": "user"} | |
calls: | |
- [ setContainer, [ '@service_container' ] ] | |
tags: | |
- { name: 'doctrine.event_subscriber' } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace AppBundle\Listener\Search; | |
use Doctrine\Common\Persistence\Event\LifecycleEventArgs; | |
use FOS\ElasticaBundle\Persister\ObjectPersister; | |
use FOS\ElasticaBundle\Persister\ObjectPersisterInterface; | |
use FOS\ElasticaBundle\Provider\IndexableInterface; | |
use Doctrine\ORM\PersistentCollection; | |
use Doctrine\ORM\LazyCriteriaCollection; | |
use Psr\Log\LoggerInterface; | |
use Symfony\Component\PropertyAccess\PropertyAccess; | |
use Symfony\Component\PropertyAccess\PropertyAccessorInterface; | |
use Doctrine\Common\EventSubscriber; | |
/** | |
* Automatically update ElasticSearch based on changes to the Doctrine source | |
* data. One listener is generated for each Doctrine entity / ElasticSearch type. | |
*/ | |
class SearchListener implements EventSubscriber | |
{ | |
/** | |
* Object persister. | |
* | |
* @var ObjectPersisterInterface | |
*/ | |
protected $objectPersister; | |
/** | |
* Configuration for the listener. | |
* | |
* @var array | |
*/ | |
private $config; | |
/** | |
* Objects scheduled for insertion. | |
* | |
* @var array | |
*/ | |
public $scheduledForInsertion = array(); | |
/** | |
* Objects scheduled to be updated or removed. | |
* | |
* @var array | |
*/ | |
public $scheduledForUpdate = array(); | |
/** | |
* IDs of objects scheduled for removal. | |
* | |
* @var array | |
*/ | |
public $scheduledForDeletion = array(); | |
/** | |
* PropertyAccessor instance. | |
* | |
* @var PropertyAccessorInterface | |
*/ | |
protected $propertyAccessor; | |
/** | |
* @var IndexableInterface | |
*/ | |
private $indexable; | |
public function getSubscribedEvents() | |
{ | |
return array( | |
'postPersist', | |
'preRemove', | |
'postRemove', | |
'postUpdate', | |
'postFlush', | |
); | |
} | |
/** | |
* Constructor. | |
* | |
* @param ObjectPersisterInterface $objectPersister | |
* @param IndexableInterface $indexable | |
* @param array $config | |
* @param LoggerInterface $logger | |
*/ | |
public function __construct( | |
ObjectPersisterInterface $objectPersister, | |
IndexableInterface $indexable, | |
array $config = array(), | |
LoggerInterface $logger = null | |
) { | |
$this->config = array_merge(array( | |
'identifier' => 'id', | |
), $config); | |
$this->indexable = $indexable; | |
$this->objectPersister = $objectPersister; | |
$this->propertyAccessor = PropertyAccess::createPropertyAccessor(); | |
if ($logger && $this->objectPersister instanceof ObjectPersister) { | |
$this->objectPersister->setLogger($logger); | |
} | |
} | |
/** @var \Symfony\Component\DependencyInjection\ContainerInterface */ | |
private $container; | |
public function setContainer(\Symfony\Component\DependencyInjection\ContainerInterface $container) { | |
$this->container = $container; | |
} | |
/** | |
* Looks for new objects that should be indexed. | |
* | |
* @param LifecycleEventArgs $eventArgs | |
*/ | |
public function postPersist(LifecycleEventArgs $eventArgs) | |
{ | |
$entity = $eventArgs->getObject(); | |
if ($this->objectPersister->handlesObject($entity) && $this->isObjectIndexable($entity)) { | |
$this->scheduledForInsertion[] = $entity; | |
// Update related document (for example, User1 create a new Need, we have to update the document User1 to increment the need counter) | |
$this->updateRelations($entity); | |
} | |
} | |
/** | |
* Looks for objects being updated that should be indexed or removed from the index. | |
* | |
* @param LifecycleEventArgs $eventArgs | |
*/ | |
public function postUpdate(LifecycleEventArgs $eventArgs) | |
{ | |
$entity = $eventArgs->getObject(); | |
if ($this->objectPersister->handlesObject($entity)) { | |
if ($this->isObjectIndexable($entity)) { | |
$this->scheduledForUpdate[] = $entity; | |
// Update related document | |
$this->updateRelations($entity); | |
} else { | |
// Delete if no longer indexable | |
$this->scheduleForDeletion($entity); | |
} | |
} | |
} | |
/** | |
* Delete objects preRemove instead of postRemove so that we have access to the id. Because this is called | |
* preRemove, first check that the entity is managed by Doctrine. | |
* | |
* @param LifecycleEventArgs $eventArgs | |
*/ | |
public function preRemove(LifecycleEventArgs $eventArgs) | |
{ | |
$entity = $eventArgs->getObject(); | |
if ($this->objectPersister->handlesObject($entity)) { | |
$this->scheduleForDeletion($entity); | |
} | |
} | |
/** | |
* Update relations on postRemove | |
* | |
* @param LifecycleEventArgs $eventArgs | |
*/ | |
public function postRemove(LifecycleEventArgs $eventArgs) | |
{ | |
$entity = $eventArgs->getObject(); | |
if ($this->objectPersister->handlesObject($entity)) { | |
// Update related document | |
$this->updateRelations($entity); | |
} | |
} | |
/** | |
* Update all object's relation managed by Doctrine | |
* | |
* @param Object $entity | |
*/ | |
private function updateRelations($entity) | |
{ | |
// Get all association of the current entity | |
$entityAssociations = $this->container->get('doctrine')->getManager()->getMetadataFactory()->getMetadataFor(get_class($entity))->getAssociationMappings(); | |
foreach($entityAssociations as $asso){ | |
//dump($asso); | |
$className = strtolower(array_pop((array_slice(explode('\\',$asso["targetEntity"]), -1)))); | |
/* ToDo: retrieve 'app' from config */ | |
//dump($this->container->getParameter('fos_elastica.indexes')); | |
$objectPersisterName = 'fos_elastica.object_persister.app.'.$className; | |
// if the type exist in ES | |
if($this->container->has($objectPersisterName)){ | |
//dump($asso);die(); | |
$objectPersisterRelation = $this->container->get($objectPersisterName); | |
$getAssoObject = 'get'.ucfirst($asso['fieldName']); | |
$relationObjects = $entity->$getAssoObject(); | |
$scheduledForUpdate = []; | |
//dump($relationObjects); | |
// Collection of Objects (ManyToOne, ManyToMany) | |
if($relationObjects instanceof PersistentCollection || $relationObjects instanceof LazyCriteriaCollection){ | |
if($relationObjects->count() > 0){ | |
foreach ($relationObjects as $key => $object) { | |
if($objectPersisterRelation->handlesObject($object) && $this->isObjectIndexable($object)) | |
$scheduledForUpdate[] = $object; | |
} | |
//dump($scheduledForUpdate); | |
$objectPersisterRelation->replaceMany($scheduledForUpdate); | |
} | |
}else{ | |
// One object | |
$object = $relationObjects; | |
if($objectPersisterRelation->handlesObject($object) && $this->isObjectIndexable($object)){ | |
$scheduledForUpdate[] = $object; | |
//dump($scheduledForUpdate); die(); | |
$objectPersisterRelation->replaceMany($scheduledForUpdate); | |
} | |
} | |
} | |
} | |
//die(); | |
} | |
/** | |
* Persist scheduled objects to ElasticSearch | |
* After persisting, clear the scheduled queue to prevent multiple data updates when using multiple flush calls. | |
*/ | |
protected function persistScheduled() | |
{ | |
if (count($this->scheduledForInsertion)) { | |
$this->objectPersister->insertMany($this->scheduledForInsertion); | |
$this->scheduledForInsertion = array(); | |
} | |
if (count($this->scheduledForUpdate)) { | |
$this->objectPersister->replaceMany($this->scheduledForUpdate); | |
$this->scheduledForUpdate = array(); | |
} | |
if (count($this->scheduledForDeletion)) { | |
$this->objectPersister->deleteManyByIdentifiers($this->scheduledForDeletion); | |
$this->scheduledForDeletion = array(); | |
} | |
} | |
/** | |
* Iterate through scheduled actions before flushing to emulate 2.x behavior. | |
* Note that the ElasticSearch index will fall out of sync with the source | |
* data in the event of a crash during flush. | |
* | |
* This method is only called in legacy configurations of the listener. | |
* | |
* @deprecated This method should only be called in applications that depend | |
* on the behaviour that entities are indexed regardless of if a | |
* flush is successful. | |
*/ | |
public function preFlush() | |
{ | |
$this->persistScheduled(); | |
} | |
/** | |
* Iterating through scheduled actions *after* flushing ensures that the | |
* ElasticSearch index will be affected only if the query is successful. | |
*/ | |
public function postFlush() | |
{ | |
$this->persistScheduled(); | |
} | |
/** | |
* Record the specified identifier to delete. Do not need to entire object. | |
* | |
* @param object $object | |
*/ | |
protected function scheduleForDeletion($object) | |
{ | |
if ($identifierValue = $this->propertyAccessor->getValue($object, $this->config['identifier'])) { | |
$this->scheduledForDeletion[] = $identifierValue; | |
} | |
} | |
/** | |
* Checks if the object is indexable or not. | |
* | |
* @param object $object | |
* | |
* @return bool | |
*/ | |
protected function isObjectIndexable($object) | |
{ | |
return $this->indexable->isObjectIndexable( | |
$this->config['indexName'], | |
$this->config['typeName'], | |
$object | |
); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace AppBundle\Listener\Search; | |
/** | |
* Automatically update ElasticSearch based on changes to the Doctrine source | |
* data. One listener is generated for each Doctrine entity / ElasticSearch type. | |
*/ | |
class UserListener extends SearchListener | |
{ | |
} |
Thanks Titouan. I have already checked the repo but i cant find any solution for this issue with FOSElasticaBundle 6.0.0.
Probably a bit late now, but there's a simple solution if you still have this problem - store a timestamp field on your indexed entity and then whenever changes to your relational entities happen, update that field. Here's an example for a ManyToOne relation - let's say the thing that is indexed is called Offers and it has a Category:
/**
* @ORM\HasLifecycleCallbacks
*/
class Category {
/**
* @ORM\OneToMany(targetEntity=Offer::class, mappedBy="category")
*/
private $offers;
public function setName($name)
{
$this->name = $name;
$this->reindexElastica();
}
private function reindexElastica()
{
$this->offers->map(function(Offer $offer) {
$offer->setUpdatedAt(new \DateTime());
});
}
}
}
Here’s a solution that abstracts it out of the entity to a listener:
use App\Entity\Offer;
use Doctrine\Bundle\DoctrineBundle\EventSubscriber\EventSubscriberInterface;
use Doctrine\ORM\Event\OnFlushEventArgs;
class UpdateOfferListener implements EventSubscriberInterface
{
private $flushed = false;
public function getSubscribedEvents()
{
return array(
'onFlush',
);
}
public function onFlush(OnFlushEventArgs $args)
{
$em = $args->getEntityManager();
$uow = $em->getUnitOfWork();
if ($this->flushed) {
return;
}
foreach ($uow->getScheduledEntityUpdates() as $entity) {
if (method_exists($entity, 'getOffers')) {
$entity->getOffers()->map(function(Offer $offer) {
$offer->setUpdatedAt(new \DateTime());
});
}
}
$this->flushed = true;
$em->flush();
$this->flushed = false;
}
}
Note I can't take the credit for this idea, I got it off Stackoverflow.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks Titouan.
I have already checked the repo but i cant find any solution for this issue with FOSElasticaBundle 6.0.0.