Skip to content

Instantly share code, notes, and snippets.

@Nightbr
Last active October 6, 2021 17:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Nightbr/ddb586394d95877dde8ed7445c51d973 to your computer and use it in GitHub Desktop.
Save Nightbr/ddb586394d95877dde8ed7445c51d973 to your computer and use it in GitHub Desktop.
FOSElasticaBundle: Doctrine ORM Listener with relation updates
<?php
namespace AppBundle\Listener\Search;
/**
* Automatically update ElasticSearch based on changes to the Doctrine source
* data. One listener is generated for each Doctrine entity / ElasticSearch type.
*/
class CategoryListener extends SearchListener
{
}
services:
fos_elastica.listener.app.category:
class: AppBundle\Listener\Search\CategoryListener
arguments:
- @fos_elastica.object_persister.app.category
- @fos_elastica.indexable
- {"indexName" : "welp", "typeName": "category"}
calls:
- [ setContainer, [ '@service_container' ] ]
tags:
- { name: 'doctrine.event_subscriber' }
fos_elastica.listener.app.user:
class: AppBundle\Listener\Search\UserListener
arguments:
- @fos_elastica.object_persister.app.user
- @fos_elastica.indexable
- {"indexName" : "welp", "typeName": "user"}
calls:
- [ setContainer, [ '@service_container' ] ]
tags:
- { name: 'doctrine.event_subscriber' }
<?php
namespace AppBundle\Listener\Search;
use Doctrine\Common\Persistence\Event\LifecycleEventArgs;
use FOS\ElasticaBundle\Persister\ObjectPersister;
use FOS\ElasticaBundle\Persister\ObjectPersisterInterface;
use FOS\ElasticaBundle\Provider\IndexableInterface;
use Doctrine\ORM\PersistentCollection;
use Doctrine\ORM\LazyCriteriaCollection;
use Psr\Log\LoggerInterface;
use Symfony\Component\PropertyAccess\PropertyAccess;
use Symfony\Component\PropertyAccess\PropertyAccessorInterface;
use Doctrine\Common\EventSubscriber;
/**
* Automatically update ElasticSearch based on changes to the Doctrine source
* data. One listener is generated for each Doctrine entity / ElasticSearch type.
*/
class SearchListener implements EventSubscriber
{
/**
* Object persister.
*
* @var ObjectPersisterInterface
*/
protected $objectPersister;
/**
* Configuration for the listener.
*
* @var array
*/
private $config;
/**
* Objects scheduled for insertion.
*
* @var array
*/
public $scheduledForInsertion = array();
/**
* Objects scheduled to be updated or removed.
*
* @var array
*/
public $scheduledForUpdate = array();
/**
* IDs of objects scheduled for removal.
*
* @var array
*/
public $scheduledForDeletion = array();
/**
* PropertyAccessor instance.
*
* @var PropertyAccessorInterface
*/
protected $propertyAccessor;
/**
* @var IndexableInterface
*/
private $indexable;
public function getSubscribedEvents()
{
return array(
'postPersist',
'preRemove',
'postRemove',
'postUpdate',
'postFlush',
);
}
/**
* Constructor.
*
* @param ObjectPersisterInterface $objectPersister
* @param IndexableInterface $indexable
* @param array $config
* @param LoggerInterface $logger
*/
public function __construct(
ObjectPersisterInterface $objectPersister,
IndexableInterface $indexable,
array $config = array(),
LoggerInterface $logger = null
) {
$this->config = array_merge(array(
'identifier' => 'id',
), $config);
$this->indexable = $indexable;
$this->objectPersister = $objectPersister;
$this->propertyAccessor = PropertyAccess::createPropertyAccessor();
if ($logger && $this->objectPersister instanceof ObjectPersister) {
$this->objectPersister->setLogger($logger);
}
}
/** @var \Symfony\Component\DependencyInjection\ContainerInterface */
private $container;
public function setContainer(\Symfony\Component\DependencyInjection\ContainerInterface $container) {
$this->container = $container;
}
/**
* Looks for new objects that should be indexed.
*
* @param LifecycleEventArgs $eventArgs
*/
public function postPersist(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();
if ($this->objectPersister->handlesObject($entity) && $this->isObjectIndexable($entity)) {
$this->scheduledForInsertion[] = $entity;
// Update related document (for example, User1 create a new Need, we have to update the document User1 to increment the need counter)
$this->updateRelations($entity);
}
}
/**
* Looks for objects being updated that should be indexed or removed from the index.
*
* @param LifecycleEventArgs $eventArgs
*/
public function postUpdate(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();
if ($this->objectPersister->handlesObject($entity)) {
if ($this->isObjectIndexable($entity)) {
$this->scheduledForUpdate[] = $entity;
// Update related document
$this->updateRelations($entity);
} else {
// Delete if no longer indexable
$this->scheduleForDeletion($entity);
}
}
}
/**
* Delete objects preRemove instead of postRemove so that we have access to the id. Because this is called
* preRemove, first check that the entity is managed by Doctrine.
*
* @param LifecycleEventArgs $eventArgs
*/
public function preRemove(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();
if ($this->objectPersister->handlesObject($entity)) {
$this->scheduleForDeletion($entity);
}
}
/**
* Update relations on postRemove
*
* @param LifecycleEventArgs $eventArgs
*/
public function postRemove(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();
if ($this->objectPersister->handlesObject($entity)) {
// Update related document
$this->updateRelations($entity);
}
}
/**
* Update all object's relation managed by Doctrine
*
* @param Object $entity
*/
private function updateRelations($entity)
{
// Get all association of the current entity
$entityAssociations = $this->container->get('doctrine')->getManager()->getMetadataFactory()->getMetadataFor(get_class($entity))->getAssociationMappings();
foreach($entityAssociations as $asso){
//dump($asso);
$className = strtolower(array_pop((array_slice(explode('\\',$asso["targetEntity"]), -1))));
/* ToDo: retrieve 'app' from config */
//dump($this->container->getParameter('fos_elastica.indexes'));
$objectPersisterName = 'fos_elastica.object_persister.app.'.$className;
// if the type exist in ES
if($this->container->has($objectPersisterName)){
//dump($asso);die();
$objectPersisterRelation = $this->container->get($objectPersisterName);
$getAssoObject = 'get'.ucfirst($asso['fieldName']);
$relationObjects = $entity->$getAssoObject();
$scheduledForUpdate = [];
//dump($relationObjects);
// Collection of Objects (ManyToOne, ManyToMany)
if($relationObjects instanceof PersistentCollection || $relationObjects instanceof LazyCriteriaCollection){
if($relationObjects->count() > 0){
foreach ($relationObjects as $key => $object) {
if($objectPersisterRelation->handlesObject($object) && $this->isObjectIndexable($object))
$scheduledForUpdate[] = $object;
}
//dump($scheduledForUpdate);
$objectPersisterRelation->replaceMany($scheduledForUpdate);
}
}else{
// One object
$object = $relationObjects;
if($objectPersisterRelation->handlesObject($object) && $this->isObjectIndexable($object)){
$scheduledForUpdate[] = $object;
//dump($scheduledForUpdate); die();
$objectPersisterRelation->replaceMany($scheduledForUpdate);
}
}
}
}
//die();
}
/**
* Persist scheduled objects to ElasticSearch
* After persisting, clear the scheduled queue to prevent multiple data updates when using multiple flush calls.
*/
protected function persistScheduled()
{
if (count($this->scheduledForInsertion)) {
$this->objectPersister->insertMany($this->scheduledForInsertion);
$this->scheduledForInsertion = array();
}
if (count($this->scheduledForUpdate)) {
$this->objectPersister->replaceMany($this->scheduledForUpdate);
$this->scheduledForUpdate = array();
}
if (count($this->scheduledForDeletion)) {
$this->objectPersister->deleteManyByIdentifiers($this->scheduledForDeletion);
$this->scheduledForDeletion = array();
}
}
/**
* Iterate through scheduled actions before flushing to emulate 2.x behavior.
* Note that the ElasticSearch index will fall out of sync with the source
* data in the event of a crash during flush.
*
* This method is only called in legacy configurations of the listener.
*
* @deprecated This method should only be called in applications that depend
* on the behaviour that entities are indexed regardless of if a
* flush is successful.
*/
public function preFlush()
{
$this->persistScheduled();
}
/**
* Iterating through scheduled actions *after* flushing ensures that the
* ElasticSearch index will be affected only if the query is successful.
*/
public function postFlush()
{
$this->persistScheduled();
}
/**
* Record the specified identifier to delete. Do not need to entire object.
*
* @param object $object
*/
protected function scheduleForDeletion($object)
{
if ($identifierValue = $this->propertyAccessor->getValue($object, $this->config['identifier'])) {
$this->scheduledForDeletion[] = $identifierValue;
}
}
/**
* Checks if the object is indexable or not.
*
* @param object $object
*
* @return bool
*/
protected function isObjectIndexable($object)
{
return $this->indexable->isObjectIndexable(
$this->config['indexName'],
$this->config['typeName'],
$object
);
}
}
<?php
namespace AppBundle\Listener\Search;
/**
* Automatically update ElasticSearch based on changes to the Doctrine source
* data. One listener is generated for each Doctrine entity / ElasticSearch type.
*/
class UserListener extends SearchListener
{
}
@arshad111
Copy link

arshad111 commented May 12, 2021

Hi Titouan,
I am using FOSElasticaBundle 6.0.0 Beta 3 and i am getting one issue that the relationship record does not updated in my current index.
I have also used above code in my project but its not resolve my problem.Can you please help me.
Please find my fos_elastica.yaml

`fos_elastica:

clients:
    default: { url: '%env(ELASTICSEARCH_URL)%' }

indexes:

    drawings:
            settings:
                index:
                    analysis:
                        analyzer:
                            name_analyzer:
                                type: standard


            properties:
                name_suggest:
                        type: completion
                        analyzer: name_analyzer
                        search_analyzer: name_analyzer
                id:
                    fielddata: true
                name:
                    # type: ~
                    fielddata: true
                is_deleted:
                    # type: ~
                create_by:
                    # type: ~
                create_time:

                keywords:
                    type: object
                    properties:
                        keyword_name:
                            type: ~
                            fielddata: true


            persistence:
                # the driver can be orm, mongodb or phpcr
                driver: orm
                model: App\Entity\Drawing
                provider: ~
                finder: ~
                listener: ~ 

`

@Nightbr
Copy link
Author

Nightbr commented May 12, 2021

Hey, it's been 2016 I have not used this code and I move on to a Full TypeScript stack (with Nestjs) so sorry but not in this game anymore.
Perhaps FosElasticaBundle has a solution for this problem since 2016, I recommend you look at the coe & issues in their repo.
Good luck my friend!

@arshad111
Copy link

arshad111 commented May 12, 2021

Thanks Titouan.
I have already checked the repo but i cant find any solution for this issue with FOSElasticaBundle 6.0.0.

@LewisW
Copy link

LewisW commented Oct 6, 2021

Thanks Titouan. I have already checked the repo but i cant find any solution for this issue with FOSElasticaBundle 6.0.0.

Probably a bit late now, but there's a simple solution if you still have this problem - store a timestamp field on your indexed entity and then whenever changes to your relational entities happen, update that field. Here's an example for a ManyToOne relation - let's say the thing that is indexed is called Offers and it has a Category:

/**
* @ORM\HasLifecycleCallbacks
*/
class Category {
    /**
     * @ORM\OneToMany(targetEntity=Offer::class, mappedBy="category")
     */
    private $offers;

    public function setName($name)
   {
	$this->name = $name;
	$this->reindexElastica();
    }

    private function reindexElastica()
    {
           $this->offers->map(function(Offer $offer) {
               $offer->setUpdatedAt(new \DateTime());
           });
       }
    }
}

Here’s a solution that abstracts it out of the entity to a listener:

use App\Entity\Offer;
use Doctrine\Bundle\DoctrineBundle\EventSubscriber\EventSubscriberInterface;
use Doctrine\ORM\Event\OnFlushEventArgs;

class UpdateOfferListener implements EventSubscriberInterface
{
    private $flushed = false;

    public function getSubscribedEvents()
    {
        return array(
            'onFlush',
        );
    }

    public function onFlush(OnFlushEventArgs $args)
    {
        $em = $args->getEntityManager();
        $uow = $em->getUnitOfWork();

        if ($this->flushed) {
            return;
        }

        foreach ($uow->getScheduledEntityUpdates() as $entity) {
            if (method_exists($entity, 'getOffers')) {
                $entity->getOffers()->map(function(Offer $offer) {
                    $offer->setUpdatedAt(new \DateTime());
                });
            }
        }

        $this->flushed = true;
        $em->flush();
        $this->flushed = false;
    }
}

Note I can't take the credit for this idea, I got it off Stackoverflow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment