vendor/shopware/core/Framework/Api/Sync/SyncService.php line 65

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\Api\Sync;
  3. use Doctrine\DBAL\Connection;
  4. use Doctrine\DBAL\ConnectionException;
  5. use Shopware\Core\Framework\Adapter\Database\ReplicaConnection;
  6. use Shopware\Core\Framework\Api\Converter\ApiVersionConverter;
  7. use Shopware\Core\Framework\Api\Converter\Exceptions\ApiConversionException;
  8. use Shopware\Core\Framework\Api\Exception\InvalidSyncOperationException;
  9. use Shopware\Core\Framework\Context;
  10. use Shopware\Core\Framework\DataAbstractionLayer\DefinitionInstanceRegistry;
  11. use Shopware\Core\Framework\DataAbstractionLayer\EntityDefinition;
  12. use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface;
  13. use Shopware\Core\Framework\DataAbstractionLayer\EntityWriteResult;
  14. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
  15. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenEvent;
  16. use Shopware\Core\Framework\DataAbstractionLayer\Indexing\EntityIndexerRegistry;
  17. use Shopware\Core\Framework\DataAbstractionLayer\Write\EntityWriterInterface;
  18. use Shopware\Core\Framework\DataAbstractionLayer\Write\WriteContext;
  19. use Shopware\Core\Framework\DataAbstractionLayer\Write\WriteException;
  20. use Shopware\Core\Framework\Feature;
  21. use Shopware\Core\Framework\Struct\ArrayEntity;
  22. use Shopware\Core\Framework\Validation\WriteConstraintViolationException;
  23. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  24. /**
  25.  * @package core
  26.  */
  27. class SyncService implements SyncServiceInterface
  28. {
  29.     private DefinitionInstanceRegistry $definitionRegistry;
  30.     private Connection $connection;
  31.     private ApiVersionConverter $apiVersionConverter;
  32.     private EntityWriterInterface $writer;
  33.     private EventDispatcherInterface $eventDispatcher;
  34.     /**
  35.      * @internal
  36.      */
  37.     public function __construct(
  38.         DefinitionInstanceRegistry $definitionRegistry,
  39.         Connection $connection,
  40.         ApiVersionConverter $apiVersionConverter,
  41.         EntityWriterInterface $writer,
  42.         EventDispatcherInterface $eventDispatcher
  43.     ) {
  44.         $this->definitionRegistry $definitionRegistry;
  45.         $this->connection $connection;
  46.         $this->apiVersionConverter $apiVersionConverter;
  47.         $this->writer $writer;
  48.         $this->eventDispatcher $eventDispatcher;
  49.     }
  50.     /**
  51.      * @param SyncOperation[] $operations
  52.      *
  53.      * @throws ConnectionException
  54.      * @throws InvalidSyncOperationException
  55.      */
  56.     public function sync(array $operationsContext $contextSyncBehavior $behavior): SyncResult
  57.     {
  58.         ReplicaConnection::ensurePrimary();
  59.         $context = clone $context;
  60.         if (\count($behavior->getSkipIndexers())) {
  61.             $context->addExtension(EntityIndexerRegistry::EXTENSION_INDEXER_SKIP, new ArrayEntity($behavior->getSkipIndexers()));
  62.         }
  63.         if (
  64.             $behavior->getIndexingBehavior() !== null
  65.             && \in_array($behavior->getIndexingBehavior(), [EntityIndexerRegistry::DISABLE_INDEXINGEntityIndexerRegistry::USE_INDEXING_QUEUE], true)
  66.         ) {
  67.             // @deprecated tag:v6.5.0 - complete if statement will be removed, context.state should be used instead
  68.             if (!Feature::isActive('v6.5.0.0')) {
  69.                 $context->addExtension($behavior->getIndexingBehavior(), new ArrayEntity());
  70.             }
  71.             $context->addState($behavior->getIndexingBehavior());
  72.         }
  73.         // allows to execute all writes inside a single transaction and a single entity write event
  74.         // @internal (flag:FEATURE_NEXT_15815) tag:v6.5.0 - Remove "IF" condition - useSingleOperation is always true
  75.         if ($behavior->useSingleOperation()) {
  76.             $result $this->writer->sync($operationsWriteContext::createFromContext($context));
  77.             $writes EntityWrittenContainerEvent::createWithWrittenEvents($result->getWritten(), $context, []);
  78.             $deletes EntityWrittenContainerEvent::createWithWrittenEvents($result->getDeleted(), $context, []);
  79.             if ($deletes->getEvents() !== null) {
  80.                 $writes->addEvent(...$deletes->getEvents()->getElements());
  81.             }
  82.             $this->eventDispatcher->dispatch($writes);
  83.             $ids $this->getWrittenEntities($result->getWritten());
  84.             $deleted $this->getWrittenEntitiesByEvent($deletes);
  85.             $notFound $this->getWrittenEntities($result->getNotFound());
  86.             //@internal (flag:FEATURE_NEXT_15815) - second construct parameter removed - simply remove if condition and all other code below
  87.             if (Feature::isActive('FEATURE_NEXT_15815')) {
  88.                 return new SyncResult($ids$notFound$deleted);
  89.             }
  90.             return new SyncResult($idstrue$notFound$deleted);
  91.         }
  92.         //@deprecated tag:v6.5.0 (flag:FEATURE_NEXT_15815) - remove all code below and all functions which will are no longer used
  93.         Feature::triggerDeprecationOrThrow(
  94.             'v6.5.0.0',
  95.             'Sync api can only be used in single operation mode in v6.5.0.0'
  96.         );
  97.         if ($behavior->failOnError()) {
  98.             $this->connection->beginTransaction();
  99.         }
  100.         $hasError false;
  101.         $results = [];
  102.         foreach ($operations as $operation) {
  103.             $this->validateSyncOperationInput($operation);
  104.             if (!$behavior->failOnError()) {
  105.                 //begin a new transaction for every operation to provide chunk-safe operations
  106.                 $this->connection->beginTransaction();
  107.             }
  108.             $result $this->execute($operation$context);
  109.             $results[$operation->getKey()] = $result;
  110.             if ($result->hasError()) {
  111.                 $hasError true;
  112.                 if ($behavior->failOnError()) {
  113.                     foreach ($results as $result) {
  114.                         $result->resetEntities();
  115.                     }
  116.                     continue;
  117.                 }
  118.                 $this->connection->rollBack();
  119.             } elseif (!$behavior->failOnError()) {
  120.                 // Only commit if transaction not already marked as rollback
  121.                 if (!$this->connection->isRollbackOnly()) {
  122.                     $this->connection->commit();
  123.                 } else {
  124.                     $this->connection->rollBack();
  125.                 }
  126.             }
  127.         }
  128.         if ($behavior->failOnError()) {
  129.             // Only commit if transaction not already marked as rollback
  130.             if ($hasError === false && !$this->connection->isRollbackOnly()) {
  131.                 $this->connection->commit();
  132.             } else {
  133.                 $this->connection->rollBack();
  134.             }
  135.         }
  136.         return new SyncResult($results$hasError === false);
  137.     }
  138.     private function execute(SyncOperation $operationContext $context): SyncOperationResult
  139.     {
  140.         $repository $this->definitionRegistry->getRepository($operation->getEntity());
  141.         switch (mb_strtolower($operation->getAction())) {
  142.             case SyncOperation::ACTION_UPSERT:
  143.                 return $this->upsertRecords($operation$context$repository);
  144.             case SyncOperation::ACTION_DELETE:
  145.                 return $this->deleteRecords($operation$context$repository);
  146.             default:
  147.                 throw new \RuntimeException(
  148.                     sprintf(
  149.                         'provided action "%s" is not supported. Following actions are supported: %s',
  150.                         $operation->getAction(),
  151.                         implode(', '$operation->getSupportedActions())
  152.                     )
  153.                 );
  154.         }
  155.     }
  156.     private function upsertRecords(
  157.         SyncOperation $operation,
  158.         Context $context,
  159.         EntityRepositoryInterface $repository
  160.     ): SyncOperationResult {
  161.         $results = [];
  162.         $records array_values($operation->getPayload());
  163.         $definition $repository->getDefinition();
  164.         foreach ($records as $index => $record) {
  165.             try {
  166.                 $record $this->convertToApiVersion($record$definition$index);
  167.                 $result $repository->upsert([$record], $context);
  168.                 $results[$index] = [
  169.                     'entities' => $this->getWrittenEntitiesByEvent($result),
  170.                     'errors' => [],
  171.                 ];
  172.             } catch (\Throwable $exception) {
  173.                 $writeException $this->getWriteError($exception$index);
  174.                 $errors = [];
  175.                 foreach ($writeException->getErrors() as $error) {
  176.                     $errors[] = $error;
  177.                 }
  178.                 $results[$index] = [
  179.                     'entities' => [],
  180.                     'errors' => $errors,
  181.                 ];
  182.             }
  183.         }
  184.         return new SyncOperationResult($results);
  185.     }
  186.     private function deleteRecords(
  187.         SyncOperation $operation,
  188.         Context $context,
  189.         EntityRepositoryInterface $repository
  190.     ): SyncOperationResult {
  191.         $results = [];
  192.         $records array_values($operation->getPayload());
  193.         $definition $repository->getDefinition();
  194.         foreach ($records as $index => $record) {
  195.             try {
  196.                 $record $this->convertToApiVersion($record$definition$index);
  197.                 $result $repository->delete([$record], $context);
  198.                 $results[$index] = [
  199.                     'entities' => $this->getWrittenEntitiesByEvent($result),
  200.                     'errors' => [],
  201.                 ];
  202.             } catch (\Throwable $exception) {
  203.                 $writeException $this->getWriteError($exception$index);
  204.                 $errors = [];
  205.                 foreach ($writeException->getErrors() as $error) {
  206.                     $errors[] = $error;
  207.                 }
  208.                 $results[$index] = [
  209.                     'entities' => [],
  210.                     'errors' => $errors,
  211.                 ];
  212.             }
  213.         }
  214.         return new SyncOperationResult($results);
  215.     }
  216.     /**
  217.      * @param array<string, mixed|null> $record
  218.      *
  219.      * @return array<string, mixed|null>
  220.      */
  221.     private function convertToApiVersion(array $recordEntityDefinition $definitionint $writeIndex): array
  222.     {
  223.         $exception = new ApiConversionException();
  224.         $converted $this->apiVersionConverter->convertPayload($definition$record$exception"/{$writeIndex}");
  225.         $exception->tryToThrow();
  226.         return $converted;
  227.     }
  228.     private function getWriteError(\Throwable $exceptionint $writeIndex): WriteException
  229.     {
  230.         if ($exception instanceof WriteException) {
  231.             foreach ($exception->getExceptions() as $innerException) {
  232.                 if ($innerException instanceof WriteConstraintViolationException) {
  233.                     $path preg_replace('/^\/0/'"/{$writeIndex}"$innerException->getPath());
  234.                     if ($path !== null) {
  235.                         $innerException->setPath($path);
  236.                     }
  237.                 }
  238.             }
  239.             return $exception;
  240.         }
  241.         return (new WriteException())->add($exception);
  242.     }
  243.     /**
  244.      * @param array<string, EntityWriteResult[]> $grouped
  245.      *
  246.      * @return array<string, array<int, mixed>>
  247.      */
  248.     private function getWrittenEntities(array $grouped): array
  249.     {
  250.         $mapped = [];
  251.         foreach ($grouped as $entity => $results) {
  252.             foreach ($results as $result) {
  253.                 $mapped[$entity][] = $result->getPrimaryKey();
  254.             }
  255.         }
  256.         ksort($mapped);
  257.         return $mapped;
  258.     }
  259.     /**
  260.      * @return array<string, array<int, mixed>>
  261.      */
  262.     private function getWrittenEntitiesByEvent(EntityWrittenContainerEvent $result): array
  263.     {
  264.         $entities = [];
  265.         /** @var EntityWrittenEvent $event */
  266.         foreach ($result->getEvents() ?? [] as $event) {
  267.             $entity $event->getEntityName();
  268.             if (!isset($entities[$entity])) {
  269.                 $entities[$entity] = [];
  270.             }
  271.             $entities[$entity] = array_merge($entities[$entity], $event->getIds());
  272.         }
  273.         ksort($entities);
  274.         return $entities;
  275.     }
  276.     /**
  277.      * @deprecated tag:v6.5.0 - Sync Operation will be validated inside EntityWriter instead.
  278.      *
  279.      * @throws InvalidSyncOperationException
  280.      */
  281.     private function validateSyncOperationInput(SyncOperation $operation): void
  282.     {
  283.         $errors $operation->validate();
  284.         if (\count($errors)) {
  285.             throw new InvalidSyncOperationException(sprintf('Invalid sync operation. %s'implode(' '$errors)));
  286.         }
  287.     }
  288. }