-
-
Save Nightbr/ddb586394d95877dde8ed7445c51d973 to your computer and use it in GitHub Desktop.
<?php | |
namespace AppBundle\Listener\Search; | |
/** | |
* Automatically update ElasticSearch based on changes to the Doctrine source | |
* data. One listener is generated for each Doctrine entity / ElasticSearch type. | |
*/ | |
class CategoryListener extends SearchListener | |
{ | |
} |
services: | |
fos_elastica.listener.app.category: | |
class: AppBundle\Listener\Search\CategoryListener | |
arguments: | |
- @fos_elastica.object_persister.app.category | |
- @fos_elastica.indexable | |
- {"indexName" : "welp", "typeName": "category"} | |
calls: | |
- [ setContainer, [ '@service_container' ] ] | |
tags: | |
- { name: 'doctrine.event_subscriber' } | |
fos_elastica.listener.app.user: | |
class: AppBundle\Listener\Search\UserListener | |
arguments: | |
- @fos_elastica.object_persister.app.user | |
- @fos_elastica.indexable | |
- {"indexName" : "welp", "typeName": "user"} | |
calls: | |
- [ setContainer, [ '@service_container' ] ] | |
tags: | |
- { name: 'doctrine.event_subscriber' } |
<?php | |
namespace AppBundle\Listener\Search; | |
use Doctrine\Common\Persistence\Event\LifecycleEventArgs; | |
use FOS\ElasticaBundle\Persister\ObjectPersister; | |
use FOS\ElasticaBundle\Persister\ObjectPersisterInterface; | |
use FOS\ElasticaBundle\Provider\IndexableInterface; | |
use Doctrine\ORM\PersistentCollection; | |
use Doctrine\ORM\LazyCriteriaCollection; | |
use Psr\Log\LoggerInterface; | |
use Symfony\Component\PropertyAccess\PropertyAccess; | |
use Symfony\Component\PropertyAccess\PropertyAccessorInterface; | |
use Doctrine\Common\EventSubscriber; | |
/** | |
* Automatically update ElasticSearch based on changes to the Doctrine source | |
* data. One listener is generated for each Doctrine entity / ElasticSearch type. | |
*/ | |
class SearchListener implements EventSubscriber | |
{ | |
/** | |
* Object persister. | |
* | |
* @var ObjectPersisterInterface | |
*/ | |
protected $objectPersister; | |
/** | |
* Configuration for the listener. | |
* | |
* @var array | |
*/ | |
private $config; | |
/** | |
* Objects scheduled for insertion. | |
* | |
* @var array | |
*/ | |
public $scheduledForInsertion = array(); | |
/** | |
* Objects scheduled to be updated or removed. | |
* | |
* @var array | |
*/ | |
public $scheduledForUpdate = array(); | |
/** | |
* IDs of objects scheduled for removal. | |
* | |
* @var array | |
*/ | |
public $scheduledForDeletion = array(); | |
/** | |
* PropertyAccessor instance. | |
* | |
* @var PropertyAccessorInterface | |
*/ | |
protected $propertyAccessor; | |
/** | |
* @var IndexableInterface | |
*/ | |
private $indexable; | |
public function getSubscribedEvents() | |
{ | |
return array( | |
'postPersist', | |
'preRemove', | |
'postRemove', | |
'postUpdate', | |
'postFlush', | |
); | |
} | |
/** | |
* Constructor. | |
* | |
* @param ObjectPersisterInterface $objectPersister | |
* @param IndexableInterface $indexable | |
* @param array $config | |
* @param LoggerInterface $logger | |
*/ | |
public function __construct( | |
ObjectPersisterInterface $objectPersister, | |
IndexableInterface $indexable, | |
array $config = array(), | |
LoggerInterface $logger = null | |
) { | |
$this->config = array_merge(array( | |
'identifier' => 'id', | |
), $config); | |
$this->indexable = $indexable; | |
$this->objectPersister = $objectPersister; | |
$this->propertyAccessor = PropertyAccess::createPropertyAccessor(); | |
if ($logger && $this->objectPersister instanceof ObjectPersister) { | |
$this->objectPersister->setLogger($logger); | |
} | |
} | |
/** @var \Symfony\Component\DependencyInjection\ContainerInterface */ | |
private $container; | |
public function setContainer(\Symfony\Component\DependencyInjection\ContainerInterface $container) { | |
$this->container = $container; | |
} | |
/** | |
* Looks for new objects that should be indexed. | |
* | |
* @param LifecycleEventArgs $eventArgs | |
*/ | |
public function postPersist(LifecycleEventArgs $eventArgs) | |
{ | |
$entity = $eventArgs->getObject(); | |
if ($this->objectPersister->handlesObject($entity) && $this->isObjectIndexable($entity)) { | |
$this->scheduledForInsertion[] = $entity; | |
// Update related document (for example, User1 create a new Need, we have to update the document User1 to increment the need counter) | |
$this->updateRelations($entity); | |
} | |
} | |
/** | |
* Looks for objects being updated that should be indexed or removed from the index. | |
* | |
* @param LifecycleEventArgs $eventArgs | |
*/ | |
public function postUpdate(LifecycleEventArgs $eventArgs) | |
{ | |
$entity = $eventArgs->getObject(); | |
if ($this->objectPersister->handlesObject($entity)) { | |
if ($this->isObjectIndexable($entity)) { | |
$this->scheduledForUpdate[] = $entity; | |
// Update related document | |
$this->updateRelations($entity); | |
} else { | |
// Delete if no longer indexable | |
$this->scheduleForDeletion($entity); | |
} | |
} | |
} | |
/** | |
* Delete objects preRemove instead of postRemove so that we have access to the id. Because this is called | |
* preRemove, first check that the entity is managed by Doctrine. | |
* | |
* @param LifecycleEventArgs $eventArgs | |
*/ | |
public function preRemove(LifecycleEventArgs $eventArgs) | |
{ | |
$entity = $eventArgs->getObject(); | |
if ($this->objectPersister->handlesObject($entity)) { | |
$this->scheduleForDeletion($entity); | |
} | |
} | |
/** | |
* Update relations on postRemove | |
* | |
* @param LifecycleEventArgs $eventArgs | |
*/ | |
public function postRemove(LifecycleEventArgs $eventArgs) | |
{ | |
$entity = $eventArgs->getObject(); | |
if ($this->objectPersister->handlesObject($entity)) { | |
// Update related document | |
$this->updateRelations($entity); | |
} | |
} | |
/** | |
* Update all object's relation managed by Doctrine | |
* | |
* @param Object $entity | |
*/ | |
private function updateRelations($entity) | |
{ | |
// Get all association of the current entity | |
$entityAssociations = $this->container->get('doctrine')->getManager()->getMetadataFactory()->getMetadataFor(get_class($entity))->getAssociationMappings(); | |
foreach($entityAssociations as $asso){ | |
//dump($asso); | |
$className = strtolower(array_pop((array_slice(explode('\\',$asso["targetEntity"]), -1)))); | |
/* ToDo: retrieve 'app' from config */ | |
//dump($this->container->getParameter('fos_elastica.indexes')); | |
$objectPersisterName = 'fos_elastica.object_persister.app.'.$className; | |
// if the type exist in ES | |
if($this->container->has($objectPersisterName)){ | |
//dump($asso);die(); | |
$objectPersisterRelation = $this->container->get($objectPersisterName); | |
$getAssoObject = 'get'.ucfirst($asso['fieldName']); | |
$relationObjects = $entity->$getAssoObject(); | |
$scheduledForUpdate = []; | |
//dump($relationObjects); | |
// Collection of Objects (ManyToOne, ManyToMany) | |
if($relationObjects instanceof PersistentCollection || $relationObjects instanceof LazyCriteriaCollection){ | |
if($relationObjects->count() > 0){ | |
foreach ($relationObjects as $key => $object) { | |
if($objectPersisterRelation->handlesObject($object) && $this->isObjectIndexable($object)) | |
$scheduledForUpdate[] = $object; | |
} | |
//dump($scheduledForUpdate); | |
$objectPersisterRelation->replaceMany($scheduledForUpdate); | |
} | |
}else{ | |
// One object | |
$object = $relationObjects; | |
if($objectPersisterRelation->handlesObject($object) && $this->isObjectIndexable($object)){ | |
$scheduledForUpdate[] = $object; | |
//dump($scheduledForUpdate); die(); | |
$objectPersisterRelation->replaceMany($scheduledForUpdate); | |
} | |
} | |
} | |
} | |
//die(); | |
} | |
/** | |
* Persist scheduled objects to ElasticSearch | |
* After persisting, clear the scheduled queue to prevent multiple data updates when using multiple flush calls. | |
*/ | |
protected function persistScheduled() | |
{ | |
if (count($this->scheduledForInsertion)) { | |
$this->objectPersister->insertMany($this->scheduledForInsertion); | |
$this->scheduledForInsertion = array(); | |
} | |
if (count($this->scheduledForUpdate)) { | |
$this->objectPersister->replaceMany($this->scheduledForUpdate); | |
$this->scheduledForUpdate = array(); | |
} | |
if (count($this->scheduledForDeletion)) { | |
$this->objectPersister->deleteManyByIdentifiers($this->scheduledForDeletion); | |
$this->scheduledForDeletion = array(); | |
} | |
} | |
/** | |
* Iterate through scheduled actions before flushing to emulate 2.x behavior. | |
* Note that the ElasticSearch index will fall out of sync with the source | |
* data in the event of a crash during flush. | |
* | |
* This method is only called in legacy configurations of the listener. | |
* | |
* @deprecated This method should only be called in applications that depend | |
* on the behaviour that entities are indexed regardless of if a | |
* flush is successful. | |
*/ | |
public function preFlush() | |
{ | |
$this->persistScheduled(); | |
} | |
/** | |
* Iterating through scheduled actions *after* flushing ensures that the | |
* ElasticSearch index will be affected only if the query is successful. | |
*/ | |
public function postFlush() | |
{ | |
$this->persistScheduled(); | |
} | |
/** | |
* Record the specified identifier to delete. Do not need to entire object. | |
* | |
* @param object $object | |
*/ | |
protected function scheduleForDeletion($object) | |
{ | |
if ($identifierValue = $this->propertyAccessor->getValue($object, $this->config['identifier'])) { | |
$this->scheduledForDeletion[] = $identifierValue; | |
} | |
} | |
/** | |
* Checks if the object is indexable or not. | |
* | |
* @param object $object | |
* | |
* @return bool | |
*/ | |
protected function isObjectIndexable($object) | |
{ | |
return $this->indexable->isObjectIndexable( | |
$this->config['indexName'], | |
$this->config['typeName'], | |
$object | |
); | |
} | |
} |
<?php | |
namespace AppBundle\Listener\Search; | |
/** | |
* Automatically update ElasticSearch based on changes to the Doctrine source | |
* data. One listener is generated for each Doctrine entity / ElasticSearch type. | |
*/ | |
class UserListener extends SearchListener | |
{ | |
} |
Hey, it's been 2016 I have not used this code and I move on to a Full TypeScript stack (with Nestjs) so sorry but not in this game anymore.
Perhaps FosElasticaBundle has a solution for this problem since 2016, I recommend you look at the coe & issues in their repo.
Good luck my friend!
Thanks Titouan.
I have already checked the repo but i cant find any solution for this issue with FOSElasticaBundle 6.0.0.
Thanks Titouan. I have already checked the repo but i cant find any solution for this issue with FOSElasticaBundle 6.0.0.
Probably a bit late now, but there's a simple solution if you still have this problem - store a timestamp field on your indexed entity and then whenever changes to your relational entities happen, update that field. Here's an example for a ManyToOne relation - let's say the thing that is indexed is called Offers and it has a Category:
/**
* @ORM\HasLifecycleCallbacks
*/
class Category {
/**
* @ORM\OneToMany(targetEntity=Offer::class, mappedBy="category")
*/
private $offers;
public function setName($name)
{
$this->name = $name;
$this->reindexElastica();
}
private function reindexElastica()
{
$this->offers->map(function(Offer $offer) {
$offer->setUpdatedAt(new \DateTime());
});
}
}
}
Here’s a solution that abstracts it out of the entity to a listener:
use App\Entity\Offer;
use Doctrine\Bundle\DoctrineBundle\EventSubscriber\EventSubscriberInterface;
use Doctrine\ORM\Event\OnFlushEventArgs;
class UpdateOfferListener implements EventSubscriberInterface
{
private $flushed = false;
public function getSubscribedEvents()
{
return array(
'onFlush',
);
}
public function onFlush(OnFlushEventArgs $args)
{
$em = $args->getEntityManager();
$uow = $em->getUnitOfWork();
if ($this->flushed) {
return;
}
foreach ($uow->getScheduledEntityUpdates() as $entity) {
if (method_exists($entity, 'getOffers')) {
$entity->getOffers()->map(function(Offer $offer) {
$offer->setUpdatedAt(new \DateTime());
});
}
}
$this->flushed = true;
$em->flush();
$this->flushed = false;
}
}
Note I can't take the credit for this idea, I got it off Stackoverflow.
Hi Titouan,
I am using FOSElasticaBundle 6.0.0 Beta 3 and i am getting one issue that the relationship record does not updated in my current index.
I have also used above code in my project but its not resolve my problem.Can you please help me.
Please find my fos_elastica.yaml
`fos_elastica:
`