vendor/shopware/core/Framework/DataAbstractionLayer/Indexing/EntityIndexerRegistry.php line 138

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\DataAbstractionLayer\Indexing;
  3. use Shopware\Core\Framework\Context;
  4. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
  5. use Shopware\Core\Framework\DataAbstractionLayer\Indexing\MessageQueue\IterateEntityIndexerMessage;
  6. use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
  7. use Shopware\Core\Framework\Event\ProgressFinishedEvent;
  8. use Shopware\Core\Framework\Event\ProgressStartedEvent;
  9. use Shopware\Core\Framework\MessageQueue\Handler\AbstractMessageHandler;
  10. use Shopware\Core\Framework\Struct\ArrayStruct;
  11. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  12. use Symfony\Component\Messenger\MessageBusInterface;
  13. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  14. /**
  15.  * @deprecated tag:v6.5.0 - reason:remove-subscriber - EntityIndexerRegistry will not implement EventSubscriberInterface anymore, it will also become final and internal in v6.5.0
  16.  */
  17. class EntityIndexerRegistry extends AbstractMessageHandler implements EventSubscriberInterface
  18. {
  19.     public const EXTENSION_INDEXER_SKIP 'indexer-skip';
  20.     /**
  21.      * @deprecated tag:v6.5.0 - `$context->addExtension(EntityIndexerRegistry::USE_INDEXING_QUEUE, ...)` will be ignored, use `context->addState(EntityIndexerRegistry::USE_INDEXING_QUEUE)` instead
  22.      */
  23.     public const USE_INDEXING_QUEUE 'use-queue-indexing';
  24.     /**
  25.      * @deprecated tag:v6.5.0 - `$context->addExtension(EntityIndexerRegistry::DISABLE_INDEXING, ...)` will be ignored, use `context->addState(EntityIndexerRegistry::DISABLE_INDEXING)` instead
  26.      */
  27.     public const DISABLE_INDEXING 'disable-indexing';
  28.     /**
  29.      * @var iterable<EntityIndexer>
  30.      */
  31.     private iterable $indexer;
  32.     private MessageBusInterface $messageBus;
  33.     private bool $working false;
  34.     private EventDispatcherInterface $dispatcher;
  35.     /**
  36.      * @internal
  37.      *
  38.      * @param iterable<EntityIndexer> $indexer
  39.      */
  40.     public function __construct(iterable $indexerMessageBusInterface $messageBusEventDispatcherInterface $dispatcher)
  41.     {
  42.         $this->indexer $indexer;
  43.         $this->messageBus $messageBus;
  44.         $this->dispatcher $dispatcher;
  45.     }
  46.     /**
  47.      * @deprecated tag:v6.5.0 - reason:remove-subscriber - will be removed in v6.5.0, event handling is done in `EntityIndexingSubscriber`
  48.      */
  49.     public static function getSubscribedEvents(): array
  50.     {
  51.         return [];
  52.     }
  53.     public static function getHandledMessages(): iterable
  54.     {
  55.         return [
  56.             EntityIndexingMessage::class,
  57.             IterateEntityIndexerMessage::class,
  58.         ];
  59.     }
  60.     /**
  61.      * @param list<string> $skip
  62.      * @param list<string> $only
  63.      */
  64.     public function index(bool $useQueue, array $skip = [], array $only = []): void
  65.     {
  66.         foreach ($this->indexer as $indexer) {
  67.             if (\in_array($indexer->getName(), $skiptrue)) {
  68.                 continue;
  69.             }
  70.             if (\count($only) > && !\in_array($indexer->getName(), $onlytrue)) {
  71.                 continue;
  72.             }
  73.             $offset null;
  74.             $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $indexer->getTotal()));
  75.             while ($message $indexer->iterate($offset)) {
  76.                 $message->setIndexer($indexer->getName());
  77.                 $message->addSkip(...$skip);
  78.                 $this->sendOrHandle($message$useQueue);
  79.                 $offset $message->getOffset();
  80.                 try {
  81.                     $count \is_array($message->getData()) ? \count($message->getData()) : 1;
  82.                     $this->dispatcher->dispatch(new ProgressAdvancedEvent($count));
  83.                 } catch (\Exception $e) {
  84.                 }
  85.             }
  86.             $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
  87.         }
  88.     }
  89.     public function refresh(EntityWrittenContainerEvent $event): void
  90.     {
  91.         $context $event->getContext();
  92.         if ($this->working) {
  93.             return;
  94.         }
  95.         $this->working true;
  96.         if ($this->disabled($context)) {
  97.             $this->working false;
  98.             return;
  99.         }
  100.         $useQueue $this->useQueue($context);
  101.         foreach ($this->indexer as $indexer) {
  102.             $message $indexer->update($event);
  103.             if (!$message) {
  104.                 continue;
  105.             }
  106.             $message->setIndexer($indexer->getName());
  107.             self::addSkips($message$context);
  108.             $this->sendOrHandle($message$useQueue);
  109.         }
  110.         $this->working false;
  111.     }
  112.     public static function addSkips(EntityIndexingMessage $messageContext $context): void
  113.     {
  114.         if (!$context->hasExtension(self::EXTENSION_INDEXER_SKIP)) {
  115.             return;
  116.         }
  117.         /** @var ArrayStruct<string, mixed> $skip */
  118.         $skip $context->getExtension(self::EXTENSION_INDEXER_SKIP);
  119.         $message->addSkip(...$skip->all());
  120.     }
  121.     /**
  122.      * @param mixed $message
  123.      */
  124.     public function handle($message): void
  125.     {
  126.         if ($message instanceof EntityIndexingMessage) {
  127.             $indexer $this->getIndexer($message->getIndexer());
  128.             if ($indexer) {
  129.                 $indexer->handle($message);
  130.             }
  131.             return;
  132.         }
  133.         if ($message instanceof IterateEntityIndexerMessage) {
  134.             $next $this->iterateIndexer($message->getIndexer(), $message->getOffset(), true$message->getSkip());
  135.             if (!$next) {
  136.                 return;
  137.             }
  138.             $this->messageBus->dispatch(new IterateEntityIndexerMessage($message->getIndexer(), $next->getOffset(), $message->getSkip()));
  139.         }
  140.     }
  141.     /**
  142.      * @param list<string> $indexer
  143.      * @param list<string> $skip
  144.      */
  145.     public function sendIndexingMessage(array $indexer = [], array $skip = []): void
  146.     {
  147.         if (empty($indexer)) {
  148.             $indexer = [];
  149.             foreach ($this->indexer as $loop) {
  150.                 $indexer[] = $loop->getName();
  151.             }
  152.         }
  153.         if (empty($indexer)) {
  154.             return;
  155.         }
  156.         foreach ($indexer as $name) {
  157.             if (\in_array($name$skiptrue)) {
  158.                 continue;
  159.             }
  160.             $this->messageBus->dispatch(new IterateEntityIndexerMessage($namenull$skip));
  161.         }
  162.     }
  163.     public function has(string $name): bool
  164.     {
  165.         return $this->getIndexer($name) !== null;
  166.     }
  167.     public function getIndexer(string $name): ?EntityIndexer
  168.     {
  169.         foreach ($this->indexer as $indexer) {
  170.             if ($indexer->getName() === $name) {
  171.                 return $indexer;
  172.             }
  173.         }
  174.         return null;
  175.     }
  176.     private function useQueue(Context $context): bool
  177.     {
  178.         return $context->hasExtension(self::USE_INDEXING_QUEUE) || $context->hasState(self::USE_INDEXING_QUEUE);
  179.     }
  180.     private function disabled(Context $context): bool
  181.     {
  182.         return $context->hasExtension(self::DISABLE_INDEXING) || $context->hasState(self::DISABLE_INDEXING);
  183.     }
  184.     private function sendOrHandle(EntityIndexingMessage $messagebool $useQueue): void
  185.     {
  186.         if ($useQueue || $message->forceQueue()) {
  187.             $this->messageBus->dispatch($message);
  188.             return;
  189.         }
  190.         $this->handle($message);
  191.     }
  192.     /**
  193.      * @param array<string, string>|null $offset
  194.      * @param list<string> $skip
  195.      */
  196.     private function iterateIndexer(string $name, ?array $offsetbool $useQueue, array $skip): ?EntityIndexingMessage
  197.     {
  198.         $indexer $this->getIndexer($name);
  199.         if (!$indexer instanceof EntityIndexer) {
  200.             throw new \RuntimeException(sprintf('Entity indexer with name %s not found'$name));
  201.         }
  202.         $message $indexer->iterate($offset);
  203.         if (!$message) {
  204.             return null;
  205.         }
  206.         $message->setIndexer($indexer->getName());
  207.         $message->addSkip(...$skip);
  208.         $this->sendOrHandle($message$useQueue);
  209.         return $message;
  210.     }
  211. }