1: <?php declare(strict_types=1);
2:
3: namespace Salient\Sync;
4:
5: use Salient\Contract\Core\Exception\MethodNotImplementedExceptionInterface;
6: use Salient\Contract\Core\MessageLevel as Level;
7: use Salient\Contract\Sync\Exception\UnreachableBackendExceptionInterface;
8: use Salient\Contract\Sync\DeferralPolicy;
9: use Salient\Contract\Sync\DeferredEntityInterface;
10: use Salient\Contract\Sync\DeferredRelationshipInterface;
11: use Salient\Contract\Sync\ErrorType;
12: use Salient\Contract\Sync\HydrationPolicy;
13: use Salient\Contract\Sync\SyncEntityInterface;
14: use Salient\Contract\Sync\SyncErrorCollectionInterface;
15: use Salient\Contract\Sync\SyncErrorInterface;
16: use Salient\Contract\Sync\SyncNamespaceHelperInterface;
17: use Salient\Contract\Sync\SyncProviderInterface;
18: use Salient\Contract\Sync\SyncStoreInterface;
19: use Salient\Core\Facade\Console;
20: use Salient\Core\Facade\Err;
21: use Salient\Core\Facade\Event;
22: use Salient\Core\AbstractStore;
23: use Salient\Sync\Event\SyncStoreLoadedEvent;
24: use Salient\Sync\Exception\HeartbeatCheckFailedException;
25: use Salient\Sync\Exception\SyncStoreException;
26: use Salient\Sync\Support\SyncErrorCollection;
27: use Salient\Utility\Arr;
28: use Salient\Utility\Get;
29: use Salient\Utility\Json;
30: use Salient\Utility\Regex;
31: use Salient\Utility\Str;
32: use Generator;
33: use InvalidArgumentException;
34: use LogicException;
35: use ReflectionClass;
36:
37: /**
38: * Tracks the state of entities synced to and from third-party backends in a
39: * local SQLite database
40: *
41: * Creating a {@see SyncStore} instance starts a sync operation run that must be
42: * terminated by calling {@see SyncStore::close()}, otherwise a failed run is
43: * recorded.
44: */
45: final class SyncStore extends AbstractStore implements SyncStoreInterface
46: {
47: private ?int $RunId = null;
48: private string $RunUuid;
49:
50: /**
51: * Prefix => true
52: *
53: * @var array<string,true>
54: */
55: private array $Namespaces = [];
56:
57: /**
58: * Provider ID => provider
59: *
60: * @var array<int,SyncProviderInterface>
61: */
62: private array $Providers = [];
63:
64: /**
65: * Provider hash => provider ID
66: *
67: * @var array<string,int>
68: */
69: private array $ProviderMap = [];
70:
71: /**
72: * Entity type ID => entity type
73: *
74: * @var array<int,class-string<SyncEntityInterface>>
75: */
76: private array $EntityTypes = [];
77:
78: /**
79: * Entity type => entity type ID
80: *
81: * @var array<class-string<SyncEntityInterface>,int>
82: */
83: private array $EntityTypeMap = [];
84:
85: /**
86: * Prefix => PHP namespace with trailing "/"
87: *
88: * @var array<string,string>
89: */
90: private array $NamespacesByPrefix;
91:
92: /**
93: * Prefix => namespace URI with trailing "/"
94: *
95: * @var array<string,string>
96: */
97: private array $NamespaceUrisByPrefix;
98:
99: /**
100: * Prefix => namespace helper
101: *
102: * @var array<string,SyncNamespaceHelperInterface>
103: */
104: private array $NamespaceHelpersByPrefix;
105:
106: /**
107: * Provider ID => entity type ID => entity ID => entity
108: *
109: * @var array<int,array<int,array<int|string,SyncEntityInterface>>>
110: */
111: private array $Entities;
112:
113: /**
114: * SPL object ID => checkpoint
115: *
116: * @var array<int,int>
117: */
118: private array $EntityCheckpoints;
119:
120: /**
121: * Provider ID => entity type ID => entity ID => [ deferred entity ]
122: *
123: * @var array<int,array<int,array<int|string,DeferredEntityInterface<SyncEntityInterface>[]>>>
124: */
125: private array $DeferredEntities = [];
126:
127: /**
128: * Provider ID => entity type ID => requesting entity type ID => requesting
129: * entity property => requesting entity ID => [ deferred relationship ]
130: *
131: * @var array<int,array<int,array<int,array<string,array<int|string,DeferredRelationshipInterface<SyncEntityInterface>[]>>>>>
132: */
133: private array $DeferredRelationships = [];
134:
135: private SyncErrorCollection $Errors;
136: private int $DeferralCheckpoint = 0;
137: private string $Command;
138: /** @var string[] */
139: private array $Arguments;
140: /** @var array<string,array{string,string,SyncNamespaceHelperInterface|null}> */
141: private array $DeferredNamespaces = [];
142: /** @var SyncProviderInterface[] */
143: private array $DeferredProviders = [];
144: /** @var class-string<SyncEntityInterface>[] */
145: private array $DeferredEntityTypes = [];
146:
147: /**
148: * Creates a new SyncStore object
149: *
150: * @param string $command The canonical name of the command performing sync
151: * operations (e.g. a qualified class and/or method name).
152: * @param string[] $arguments Arguments passed to the command.
153: */
154: public function __construct(
155: string $filename = ':memory:',
156: string $command = '',
157: array $arguments = []
158: ) {
159: $this->assertCanUpsert();
160:
161: $this->Errors = new SyncErrorCollection();
162: $this->Command = $command;
163: $this->Arguments = $arguments;
164:
165: $this->openDb(
166: $filename,
167: <<<SQL
168: CREATE TABLE IF NOT EXISTS
169: _sync_run (
170: run_id INTEGER NOT NULL PRIMARY KEY,
171: run_uuid BLOB NOT NULL UNIQUE,
172: run_command TEXT NOT NULL,
173: run_arguments_json TEXT NOT NULL,
174: started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
175: finished_at DATETIME,
176: exit_status INTEGER,
177: error_count INTEGER,
178: warning_count INTEGER,
179: errors_json TEXT
180: );
181:
182: CREATE TABLE IF NOT EXISTS
183: _sync_provider (
184: provider_id INTEGER NOT NULL PRIMARY KEY,
185: provider_hash BLOB NOT NULL UNIQUE,
186: provider_class TEXT NOT NULL,
187: added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
188: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
189: );
190:
191: CREATE TABLE IF NOT EXISTS
192: _sync_entity_type (
193: entity_type_id INTEGER NOT NULL PRIMARY KEY,
194: entity_type_class TEXT NOT NULL UNIQUE,
195: added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
196: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
197: );
198:
199: CREATE TABLE IF NOT EXISTS
200: _sync_entity_type_state (
201: provider_id INTEGER NOT NULL,
202: entity_type_id INTEGER NOT NULL,
203: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP,
204: last_sync DATETIME,
205: PRIMARY KEY (provider_id, entity_type_id),
206: FOREIGN KEY (provider_id) REFERENCES _sync_provider,
207: FOREIGN KEY (entity_type_id) REFERENCES _sync_entity_type
208: );
209:
210: CREATE TABLE IF NOT EXISTS
211: _sync_entity (
212: provider_id INTEGER NOT NULL,
213: entity_type_id INTEGER NOT NULL,
214: entity_id TEXT NOT NULL,
215: canonical_id TEXT,
216: is_dirty INTEGER NOT NULL DEFAULT 0,
217: is_deleted INTEGER NOT NULL DEFAULT 0,
218: added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
219: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP,
220: last_sync DATETIME,
221: entity_json TEXT NOT NULL,
222: PRIMARY KEY (provider_id, entity_type_id, entity_id),
223: FOREIGN KEY (provider_id) REFERENCES _sync_provider,
224: FOREIGN KEY (entity_type_id) REFERENCES _sync_entity_type
225: ) WITHOUT ROWID;
226:
227: CREATE TABLE IF NOT EXISTS
228: _sync_entity_namespace (
229: entity_namespace_id INTEGER NOT NULL PRIMARY KEY,
230: entity_namespace_prefix TEXT NOT NULL UNIQUE,
231: base_uri TEXT NOT NULL,
232: php_namespace TEXT NOT NULL,
233: added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
234: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
235: );
236:
237: SQL
238: );
239:
240: Event::dispatch(new SyncStoreLoadedEvent($this));
241: }
242:
243: /**
244: * Terminate the current run and close the database
245: */
246: public function close(int $exitStatus = 0): void
247: {
248: // Don't start a run now
249: if (!$this->isOpen() || $this->RunId === null) {
250: $this->closeDb();
251: return;
252: }
253:
254: $sql = <<<SQL
255: UPDATE
256: _sync_run
257: SET
258: finished_at = CURRENT_TIMESTAMP,
259: exit_status = :exit_status,
260: error_count = :error_count,
261: warning_count = :warning_count,
262: errors_json = :errors_json
263: WHERE
264: run_uuid = :run_uuid;
265: SQL;
266:
267: $stmt = $this->prepare($sql);
268: $stmt->bindValue(':exit_status', $exitStatus, \SQLITE3_INTEGER);
269: $stmt->bindValue(':run_uuid', $this->RunUuid, \SQLITE3_BLOB);
270: $stmt->bindValue(':error_count', $this->Errors->getErrorCount(), \SQLITE3_INTEGER);
271: $stmt->bindValue(':warning_count', $this->Errors->getWarningCount(), \SQLITE3_INTEGER);
272: $stmt->bindValue(':errors_json', Json::stringify($this->Errors), \SQLITE3_TEXT);
273: $stmt->execute();
274: $stmt->close();
275:
276: $this->closeDb();
277: }
278:
279: /**
280: * @phpstan-assert-if-true !null $this->RunId
281: */
282: public function runHasStarted(): bool
283: {
284: return $this->RunId !== null;
285: }
286:
287: /**
288: * @inheritDoc
289: */
290: public function getRunId(): int
291: {
292: $this->assertRunHasStarted();
293:
294: return $this->RunId;
295: }
296:
297: /**
298: * @inheritDoc
299: */
300: public function getRunUuid(): string
301: {
302: $this->assertRunHasStarted();
303:
304: return Get::uuid($this->RunUuid);
305: }
306:
307: /**
308: * @inheritDoc
309: */
310: public function getBinaryRunUuid(): string
311: {
312: $this->assertRunHasStarted();
313:
314: return $this->RunUuid;
315: }
316:
317: /**
318: * @phpstan-assert !null $this->RunId
319: */
320: private function assertRunHasStarted(): void
321: {
322: if ($this->RunId === null) {
323: throw new LogicException('Run has not started');
324: }
325: }
326:
327: /**
328: * @inheritDoc
329: */
330: public function registerProvider(SyncProviderInterface $provider)
331: {
332: // Don't start a run just to register a provider
333: if (!$this->runHasStarted()) {
334: $this->DeferredProviders[] = $provider;
335: return $this;
336: }
337:
338: $class = get_class($provider);
339: $hash = $this->getProviderSignature($provider);
340:
341: if (isset($this->ProviderMap[$hash])) {
342: throw new LogicException(sprintf(
343: 'Provider already registered: %s',
344: $class,
345: ));
346: }
347:
348: // Update `last_seen` if the provider is already in the database
349: $sql = <<<SQL
350: INSERT INTO
351: _sync_provider (provider_hash, provider_class)
352: VALUES
353: (:provider_hash, :provider_class) ON CONFLICT (provider_hash) DO
354: UPDATE
355: SET
356: last_seen = CURRENT_TIMESTAMP;
357: SQL;
358: $stmt = $this->prepare($sql);
359: $stmt->bindValue(':provider_hash', $hash, \SQLITE3_BLOB);
360: $stmt->bindValue(':provider_class', $class, \SQLITE3_TEXT);
361: $stmt->execute();
362: $stmt->close();
363:
364: $sql = <<<SQL
365: SELECT
366: provider_id
367: FROM
368: _sync_provider
369: WHERE
370: provider_hash = :provider_hash;
371: SQL;
372: $stmt = $this->prepare($sql);
373: $stmt->bindValue(':provider_hash', $hash, \SQLITE3_BLOB);
374: $result = $this->execute($stmt);
375: /** @var array{int}|false */
376: $row = $result->fetchArray(\SQLITE3_NUM);
377: $stmt->close();
378:
379: if ($row === false) {
380: // @codeCoverageIgnoreStart
381: throw new SyncStoreException('Error retrieving provider ID');
382: // @codeCoverageIgnoreEnd
383: }
384:
385: $providerId = $row[0];
386: $this->Providers[$providerId] = $provider;
387: $this->ProviderMap[$hash] = $providerId;
388:
389: return $this;
390: }
391:
392: /**
393: * @inheritDoc
394: */
395: public function hasProvider($provider): bool
396: {
397: if ($provider instanceof SyncProviderInterface) {
398: $providers = $this->runHasStarted()
399: ? $this->Providers
400: : $this->DeferredProviders;
401: return in_array($provider, $providers, true);
402: }
403:
404: if (!$this->runHasStarted()) {
405: foreach ($this->DeferredProviders as $deferred) {
406: if ($this->getProviderSignature($deferred) === $provider) {
407: return true;
408: }
409: }
410: return false;
411: }
412:
413: return isset($this->ProviderMap[$provider]);
414: }
415:
416: /**
417: * @inheritDoc
418: */
419: public function getProviderId($provider): int
420: {
421: if (!$this->runHasStarted()) {
422: $this->check();
423: }
424:
425: $hash = $provider instanceof SyncProviderInterface
426: ? $this->getProviderSignature($provider)
427: : $provider;
428: $id = $this->ProviderMap[$hash] ?? null;
429: if ($id === null) {
430: throw new LogicException('Provider not registered'
431: . ($provider instanceof SyncProviderInterface
432: ? sprintf(': %s', get_class($provider))
433: : ''));
434: }
435: return $id;
436: }
437:
438: /**
439: * @inheritDoc
440: */
441: public function getProvider($provider): SyncProviderInterface
442: {
443: if (is_int($provider)) {
444: if (!$this->runHasStarted()) {
445: throw new LogicException(sprintf(
446: 'Provider ID not issued during run: %d',
447: $provider,
448: ));
449: }
450: if (!isset($this->Providers[$provider])) {
451: throw new LogicException(sprintf(
452: 'Provider not registered: #%d',
453: $provider,
454: ));
455: }
456: return $this->Providers[$provider];
457: }
458:
459: // Don't start a run just to get a provider
460: if (!$this->runHasStarted()) {
461: foreach ($this->DeferredProviders as $deferred) {
462: if ($this->getProviderSignature($deferred) === $provider) {
463: return $deferred;
464: }
465: }
466: throw new LogicException('Provider not registered');
467: }
468:
469: $id = $this->ProviderMap[$provider] ?? null;
470: if ($id === null) {
471: throw new LogicException('Provider not registered');
472: }
473: return $this->Providers[$id];
474: }
475:
476: /**
477: * @inheritDoc
478: */
479: public function getProviderSignature(SyncProviderInterface $provider): string
480: {
481: $class = get_class($provider);
482: return Get::binaryHash(implode("\0", [
483: $class,
484: ...$provider->getBackendIdentifier(),
485: ]));
486: }
487:
488: /**
489: * @inheritDoc
490: */
491: public function registerEntityType(string $entityType)
492: {
493: // Don't start a run just to register an entity type
494: if (!$this->runHasStarted()) {
495: $this->DeferredEntityTypes[] = $entityType;
496: return $this;
497: }
498:
499: if (isset($this->EntityTypeMap[$entityType])) {
500: return $this;
501: }
502:
503: $class = new ReflectionClass($entityType);
504:
505: if ($entityType !== $class->getName()) {
506: throw new LogicException(sprintf(
507: 'Not an exact match for declared class (%s expected): %s',
508: $class->getName(),
509: $entityType,
510: ));
511: }
512:
513: if (!$class->implementsInterface(SyncEntityInterface::class)) {
514: // @codeCoverageIgnoreStart
515: throw new LogicException(sprintf(
516: 'Does not implement %s: %s',
517: SyncEntityInterface::class,
518: $entityType,
519: ));
520: // @codeCoverageIgnoreEnd
521: }
522:
523: // Update `last_seen` if the entity type is already in the database
524: $sql = <<<SQL
525: INSERT INTO
526: _sync_entity_type (entity_type_class)
527: VALUES
528: (:entity_type_class) ON CONFLICT (entity_type_class) DO
529: UPDATE
530: SET
531: last_seen = CURRENT_TIMESTAMP;
532: SQL;
533: $stmt = $this->prepare($sql);
534: $stmt->bindValue(':entity_type_class', $entityType, \SQLITE3_TEXT);
535: $stmt->execute();
536: $stmt->close();
537:
538: $sql = <<<SQL
539: SELECT
540: entity_type_id
541: FROM
542: _sync_entity_type
543: WHERE
544: entity_type_class = :entity_type_class;
545: SQL;
546: $stmt = $this->prepare($sql);
547: $stmt->bindValue(':entity_type_class', $entityType, \SQLITE3_TEXT);
548: $result = $this->execute($stmt);
549: /** @var array{int}|false */
550: $row = $result->fetchArray(\SQLITE3_NUM);
551: $stmt->close();
552:
553: if ($row === false) {
554: throw new SyncStoreException('Error retrieving entity type ID');
555: }
556:
557: $this->EntityTypes[$row[0]] = $entityType;
558: $this->EntityTypeMap[$entityType] = $row[0];
559:
560: return $this;
561: }
562:
563: /**
564: * @inheritDoc
565: */
566: public function hasEntityType(string $entityType): bool
567: {
568: if (!$this->runHasStarted()) {
569: return in_array($entityType, $this->DeferredEntityTypes, true);
570: }
571:
572: return isset($this->EntityTypeMap[$entityType]);
573: }
574:
575: /**
576: * @inheritDoc
577: */
578: public function getEntityTypeId(string $entityType): int
579: {
580: if (!$this->runHasStarted()) {
581: $this->check();
582: }
583:
584: $id = $this->EntityTypeMap[$entityType] ?? null;
585: if ($id === null) {
586: throw new LogicException(sprintf(
587: 'Entity not registered: %s',
588: $entityType,
589: ));
590: }
591: return $id;
592: }
593:
594: /**
595: * @inheritDoc
596: */
597: public function getEntityType(int $entityTypeId): string
598: {
599: if (!$this->runHasStarted()) {
600: throw new LogicException(sprintf(
601: 'Entity type ID not issued during run: %d',
602: $entityTypeId,
603: ));
604: }
605: if (!isset($this->EntityTypes[$entityTypeId])) {
606: throw new LogicException(sprintf(
607: 'Entity type not registered: #%d',
608: $entityTypeId,
609: ));
610: }
611: return $this->EntityTypes[$entityTypeId];
612: }
613:
614: /**
615: * @inheritDoc
616: */
617: public function registerNamespace(
618: string $prefix,
619: string $uri,
620: string $namespace,
621: ?SyncNamespaceHelperInterface $helper = null
622: ) {
623: $prefix = Str::lower($prefix);
624: if (isset($this->Namespaces[$prefix]) || (
625: !$this->runHasStarted()
626: && isset($this->DeferredNamespaces[$prefix])
627: )) {
628: throw new LogicException(sprintf(
629: 'Prefix already registered: %s',
630: $prefix,
631: ));
632: }
633:
634: // Namespaces are validated and normalised before deferral so
635: // `classToNamespace()` can be used without starting a run.
636: // `$DeferredNamespaces` is used to ensure it's only done once.
637: if (
638: !isset($this->DeferredNamespaces)
639: || !isset($this->DeferredNamespaces[$prefix])
640: ) {
641: if (!Regex::match('/^[a-z][-a-z0-9+.]*$/iD', $prefix)) {
642: throw new InvalidArgumentException(sprintf(
643: 'Invalid prefix: %s',
644: $prefix,
645: ));
646: }
647: $uri = rtrim($uri, '/') . '/';
648: $namespace = trim($namespace, '\\') . '\\';
649: }
650:
651: // Don't start a run just to register a namespace
652: if (!$this->runHasStarted()) {
653: $this->DeferredNamespaces[$prefix] = [$uri, $namespace, $helper];
654: return $this;
655: }
656:
657: // Update `last_seen` if the namespace is already in the database
658: $sql = <<<SQL
659: INSERT INTO
660: _sync_entity_namespace (entity_namespace_prefix, base_uri, php_namespace)
661: VALUES
662: (
663: :entity_namespace_prefix,
664: :base_uri,
665: :php_namespace
666: ) ON CONFLICT (entity_namespace_prefix) DO
667: UPDATE
668: SET
669: base_uri = excluded.base_uri,
670: php_namespace = excluded.php_namespace,
671: last_seen = CURRENT_TIMESTAMP;
672: SQL;
673: $stmt = $this->prepare($sql);
674: $stmt->bindValue(':entity_namespace_prefix', $prefix, \SQLITE3_TEXT);
675: $stmt->bindValue(':base_uri', $uri, \SQLITE3_TEXT);
676: $stmt->bindValue(':php_namespace', $namespace, \SQLITE3_TEXT);
677: $stmt->execute();
678: $stmt->close();
679:
680: $this->Namespaces[$prefix] = true;
681:
682: if ($helper) {
683: $this->NamespaceHelpersByPrefix[$prefix] = $helper;
684: }
685:
686: // Don't reload while bootstrapping
687: if (!isset($this->NamespacesByPrefix)) {
688: return $this;
689: }
690:
691: return $this->reload();
692: }
693:
694: /**
695: * @inheritDoc
696: */
697: public function getEntityTypeUri(string $entityType, bool $compact = true): string
698: {
699: $prefix = $this->classToNamespace($entityType, $uri, $namespace);
700: if ($prefix === null) {
701: return SyncUtil::getEntityTypeUri($entityType, $compact);
702: }
703: $entityType = str_replace('\\', '/', substr(ltrim($entityType, '\\'), strlen($namespace)));
704:
705: return $compact
706: ? "{$prefix}:{$entityType}"
707: : "{$uri}{$entityType}";
708: }
709:
710: /**
711: * @inheritDoc
712: */
713: public function getNamespacePrefix(string $class): ?string
714: {
715: return $this->classToNamespace($class);
716: }
717:
718: /**
719: * @inheritDoc
720: */
721: public function getNamespaceHelper(string $class): ?SyncNamespaceHelperInterface
722: {
723: if ($this->classToNamespace(
724: $class,
725: $uri,
726: $namespace,
727: $helper
728: ) === null) {
729: return null;
730: }
731:
732: return $helper;
733: }
734:
735: /**
736: * @param class-string<SyncEntityInterface|SyncProviderInterface> $class
737: * @param-out SyncNamespaceHelperInterface|null $helper
738: */
739: private function classToNamespace(
740: string $class,
741: ?string &$uri = null,
742: ?string &$namespace = null,
743: ?SyncNamespaceHelperInterface &$helper = null
744: ): ?string {
745: $class = Str::lower(ltrim($class, '\\'));
746: // Don't start a run just to resolve a class to a namespace
747: $namespaces = $this->runHasStarted()
748: ? $this->getNamespaces()
749: : $this->DeferredNamespaces;
750: foreach ($namespaces as $prefix => [$_uri, $_namespace, $_helper]) {
751: $_namespace = Str::lower($_namespace);
752: if (strpos($class, $_namespace) === 0) {
753: $uri = $_uri;
754: $namespace = $_namespace;
755: $helper = $_helper;
756: return $prefix;
757: }
758: }
759: return null;
760: }
761:
762: /**
763: * @return Generator<string,array{string,string,SyncNamespaceHelperInterface|null}>
764: */
765: private function getNamespaces(): Generator
766: {
767: foreach ($this->NamespacesByPrefix as $prefix => $namespace) {
768: yield $prefix => [
769: $this->NamespaceUrisByPrefix[$prefix],
770: $namespace,
771: $this->NamespaceHelpersByPrefix[$prefix] ?? null,
772: ];
773: }
774: }
775:
776: /**
777: * @inheritDoc
778: */
779: public function setEntity(
780: int $providerId,
781: string $entityType,
782: $entityId,
783: SyncEntityInterface $entity
784: ) {
785: $entityTypeId = $this->EntityTypeMap[$entityType];
786: if (isset($this->Entities[$providerId][$entityTypeId][$entityId])) {
787: throw new LogicException('Entity already registered');
788: }
789: $this->Entities[$providerId][$entityTypeId][$entityId] = $entity;
790: $this->EntityCheckpoints[spl_object_id($entity)] = $this->DeferralCheckpoint++;
791:
792: // Resolve the entity's entries in the deferred entity queue (if any)
793: $deferred = $this->DeferredEntities[$providerId][$entityTypeId][$entityId] ?? null;
794: if ($deferred) {
795: foreach ($deferred as $i => $deferredEntity) {
796: $deferredEntity->replace($entity);
797: unset($this->DeferredEntities[$providerId][$entityTypeId][$entityId][$i]);
798: }
799: unset($this->DeferredEntities[$providerId][$entityTypeId][$entityId]);
800: }
801:
802: return $this;
803: }
804:
805: /**
806: * @inheritDoc
807: */
808: public function getEntity(
809: int $providerId,
810: string $entityType,
811: $entityId,
812: ?bool $offline = null
813: ): ?SyncEntityInterface {
814: $entityTypeId = $this->EntityTypeMap[$entityType];
815: $entity = $this->Entities[$providerId][$entityTypeId][$entityId] ?? null;
816: if ($entity || $offline === false) {
817: return $entity;
818: }
819: return null;
820: }
821:
822: /**
823: * @template TEntity of SyncEntityInterface
824: *
825: * @param class-string<TEntity> $entityType
826: * @param DeferredEntityInterface<TEntity> $entity
827: */
828: public function deferEntity(
829: int $providerId,
830: string $entityType,
831: $entityId,
832: DeferredEntityInterface $entity
833: ) {
834: $entityTypeId = $this->EntityTypeMap[$entityType];
835: /** @var TEntity|null */
836: $_entity = $this->Entities[$providerId][$entityTypeId][$entityId] ?? null;
837: if ($_entity) {
838: $entity->replace($_entity);
839: return $this;
840: }
841:
842: // Get the deferral policy of the context within which the entity was
843: // deferred
844: $context = $entity->getContext();
845: if ($context) {
846: $last = $context->getLastEntity();
847: if ($last) {
848: $context = $last->getContext();
849: }
850: }
851: $policy = $context
852: ? $context->getDeferralPolicy()
853: : null;
854:
855: $this->DeferredEntities[$providerId][$entityTypeId][$entityId][
856: $this->DeferralCheckpoint++
857: ] = $entity;
858:
859: // In `RESOLVE_EARLY` mode, deferred entities are added to
860: // `$this->DeferredEntities` for the benefit of `$this->setEntity()`,
861: // which only calls `DeferredEntityInterface::replace()` method on
862: // registered instances
863: if ($policy === DeferralPolicy::RESOLVE_EARLY) {
864: $entity->resolve();
865: return $this;
866: }
867:
868: return $this;
869: }
870:
871: /**
872: * @inheritDoc
873: */
874: public function deferRelationship(
875: int $providerId,
876: string $entityType,
877: string $forEntityType,
878: string $forEntityProperty,
879: $forEntityId,
880: DeferredRelationshipInterface $relationship
881: ) {
882: $entityTypeId = $this->EntityTypeMap[$entityType];
883: $forEntityTypeId = $this->EntityTypeMap[$forEntityType];
884:
885: /** @var DeferredRelationshipInterface<SyncEntityInterface>[]|null */
886: $deferredList = &$this->DeferredRelationships[$providerId][$entityTypeId][
887: $forEntityTypeId
888: ][$forEntityProperty][$forEntityId];
889:
890: if (isset($deferredList)) {
891: throw new LogicException('Relationship already registered');
892: }
893:
894: $deferredList = [];
895:
896: // Get hydration policy from the context within which the deferral was
897: // created
898: $context = $relationship->getContext();
899: if ($context) {
900: $last = $context->getLastEntity();
901: if ($last) {
902: $context = $last->getContext();
903: }
904: }
905: $policy = $context
906: ? $context->getHydrationPolicy($entityType)
907: : 0;
908:
909: if ($policy === HydrationPolicy::LAZY) {
910: return $this;
911: }
912:
913: if ($policy === HydrationPolicy::EAGER) {
914: $relationship->resolve();
915: return $this;
916: }
917:
918: $deferredList[$this->DeferralCheckpoint++] = $relationship;
919: return $this;
920: }
921:
922: /**
923: * @inheritDoc
924: */
925: public function resolveDeferrals(
926: ?int $fromCheckpoint = null,
927: ?string $entityType = null,
928: ?int $providerId = null
929: ): array {
930: $checkpoint = $this->DeferralCheckpoint;
931: do {
932: // Resolve relationships first because they typically deliver
933: // multiple entities per round trip, some of which may be in the
934: // deferred entity queue
935: $deferred = $this->resolveDeferredRelationships($fromCheckpoint, $entityType, null, null, $providerId);
936: if ($deferred) {
937: foreach ($deferred as $relationship) {
938: foreach ($relationship as $entity) {
939: $objectId = spl_object_id($entity);
940: if ($this->EntityCheckpoints[$objectId] < $checkpoint) {
941: continue;
942: }
943: $resolved[$objectId] = $entity;
944: }
945: }
946: continue;
947: }
948:
949: $deferred = $this->resolveDeferredEntities($fromCheckpoint, $entityType, $providerId);
950: if (!$deferred) {
951: continue;
952: }
953: foreach ($deferred as $entity) {
954: $resolved[spl_object_id($entity)] = $entity;
955: }
956: } while ($deferred);
957:
958: return array_values($resolved ?? []);
959: }
960:
961: /**
962: * @inheritDoc
963: */
964: public function getDeferralCheckpoint(): int
965: {
966: return $this->DeferralCheckpoint;
967: }
968:
969: /**
970: * @inheritDoc
971: */
972: public function resolveDeferredEntities(
973: ?int $fromCheckpoint = null,
974: ?string $entityType = null,
975: ?int $providerId = null
976: ): array {
977: $entityTypeId = $entityType === null
978: ? null
979: : $this->EntityTypeMap[$entityType];
980:
981: $resolved = [];
982: foreach ($this->DeferredEntities as $provId => $entitiesByTypeId) {
983: if ($providerId !== null && $provId !== $providerId) {
984: continue;
985: }
986: foreach ($entitiesByTypeId as $entTypeId => $entities) {
987: if ($entityTypeId !== null && $entTypeId !== $entityTypeId) {
988: continue;
989: }
990:
991: if ($fromCheckpoint !== null) {
992: $_entities = $entities;
993: foreach ($_entities as $entityId => $deferred) {
994: foreach ($deferred as $i => $deferredEntity) {
995: if ($i < $fromCheckpoint) {
996: unset($_entities[$entityId][$i]);
997: if (!$_entities[$entityId]) {
998: unset($_entities[$entityId]);
999: }
1000: }
1001: }
1002: }
1003: if (!$_entities) {
1004: continue;
1005: }
1006: $entities = $_entities;
1007: }
1008:
1009: /** @var array<int|string,non-empty-array<DeferredEntityInterface<SyncEntityInterface>>> $entities */
1010: foreach ($entities as $entityId => $deferred) {
1011: $deferredEntity = reset($deferred);
1012: $resolved[] = $deferredEntity->resolve();
1013: }
1014: }
1015: }
1016:
1017: return $resolved;
1018: }
1019:
1020: /**
1021: * @inheritDoc
1022: */
1023: public function resolveDeferredRelationships(
1024: ?int $fromCheckpoint = null,
1025: ?string $entityType = null,
1026: ?string $forEntityType = null,
1027: ?string $forEntityProperty = null,
1028: ?int $providerId = null
1029: ): array {
1030: $entityTypeId = $entityType === null
1031: ? null
1032: : $this->EntityTypeMap[$entityType];
1033: $forEntityTypeId = $forEntityType === null
1034: ? null
1035: : $this->EntityTypeMap[$forEntityType];
1036:
1037: $resolved = [];
1038: foreach ($this->DeferredRelationships as $provId => $relationshipsByEntTypeId) {
1039: if ($providerId !== null && $provId !== $providerId) {
1040: continue;
1041: }
1042: foreach ($relationshipsByEntTypeId as $entTypeId => $relationshipsByForEntTypeId) {
1043: if ($entityTypeId !== null && $entTypeId !== $entityTypeId) {
1044: continue;
1045: }
1046: foreach ($relationshipsByForEntTypeId as $forEntTypeId => $relationshipsByForEntProp) {
1047: if ($forEntityTypeId !== null && $forEntTypeId !== $forEntityTypeId) {
1048: continue;
1049: }
1050: foreach ($relationshipsByForEntProp as $forEntProp => $relationshipsByForEntId) {
1051: if ($forEntityProperty !== null && $forEntProp !== $forEntityProperty) {
1052: continue;
1053: }
1054: foreach ($relationshipsByForEntId as $forEntId => $relationships) {
1055: if ($fromCheckpoint !== null) {
1056: $_relationships = $relationships;
1057: foreach ($_relationships as $index => $deferred) {
1058: if ($index < $fromCheckpoint) {
1059: unset($_relationships[$index]);
1060: }
1061: }
1062: if (!$_relationships) {
1063: continue;
1064: }
1065: $relationships = $_relationships;
1066: }
1067:
1068: foreach ($relationships as $index => $deferred) {
1069: $resolved[] = $deferred->resolve();
1070: unset($this->DeferredRelationships[$provId][$entTypeId][$forEntTypeId][$forEntProp][$forEntId][$index]);
1071: }
1072: }
1073: }
1074: }
1075: }
1076: }
1077:
1078: return $resolved;
1079: }
1080:
1081: /**
1082: * @inheritDoc
1083: */
1084: public function checkProviderHeartbeats(
1085: int $ttl = 300,
1086: bool $failEarly = true,
1087: SyncProviderInterface ...$providers
1088: ) {
1089: $this->check();
1090:
1091: if ($providers) {
1092: $providers = Arr::unique($providers);
1093: } elseif ($this->Providers) {
1094: $providers = $this->Providers;
1095: } else {
1096: return $this;
1097: }
1098:
1099: $failed = [];
1100: /** @var SyncProviderInterface $provider */
1101: foreach ($providers as $provider) {
1102: $id = $provider->getProviderId();
1103: $name = sprintf('%s [#%d]', $provider->getName(), $id);
1104: Console::logProgress('Checking', $name);
1105: try {
1106: $provider->checkHeartbeat($ttl);
1107: Console::log('Heartbeat OK:', $name);
1108: } catch (MethodNotImplementedExceptionInterface $ex) {
1109: Console::log('Heartbeat check not supported:', $name);
1110: } catch (UnreachableBackendExceptionInterface $ex) {
1111: Console::exception($ex, Level::DEBUG, null);
1112: Console::log('Heartbeat check failed:', $name);
1113: $failed[] = $provider;
1114: $this->recordError(
1115: SyncError::build()
1116: ->errorType(ErrorType::BACKEND_UNREACHABLE)
1117: ->message('Heartbeat check failed: %s')
1118: ->values([[
1119: 'provider_id' => $id,
1120: 'provider_class' => get_class($provider),
1121: 'exception' => get_class($ex),
1122: 'message' => $ex->getMessage()
1123: ]])
1124: ->build()
1125: );
1126: }
1127: if ($failEarly && $failed) {
1128: break;
1129: }
1130: }
1131:
1132: if ($failed) {
1133: throw new HeartbeatCheckFailedException(...$failed);
1134: }
1135:
1136: return $this;
1137: }
1138:
1139: /**
1140: * @inheritDoc
1141: */
1142: public function recordError(SyncErrorInterface $error, bool $deduplicate = false)
1143: {
1144: if ($deduplicate) {
1145: $key = $this->Errors->keyOf($error);
1146: if ($key !== null) {
1147: $this->Errors[$key] = $this->Errors[$key]->count();
1148: return $this;
1149: }
1150: }
1151:
1152: $this->Errors[] = $error;
1153:
1154: return $this;
1155: }
1156:
1157: /**
1158: * @inheritDoc
1159: */
1160: public function getErrors(): SyncErrorCollectionInterface
1161: {
1162: return clone $this->Errors;
1163: }
1164:
1165: /**
1166: * @phpstan-assert !null $this->RunId
1167: */
1168: protected function check()
1169: {
1170: if ($this->RunId !== null) {
1171: return $this;
1172: }
1173:
1174: if (!$this->isCheckRunning()) {
1175: return $this->safeCheck();
1176: }
1177:
1178: $sql = <<<SQL
1179: INSERT INTO _sync_run (run_uuid, run_command, run_arguments_json)
1180: VALUES (
1181: :run_uuid,
1182: :run_command,
1183: :run_arguments_json
1184: );
1185: SQL;
1186:
1187: $stmt = $this->prepare($sql);
1188: $stmt->bindValue(':run_uuid', $uuid = Get::binaryUuid(), \SQLITE3_BLOB);
1189: $stmt->bindValue(':run_command', $this->Command, \SQLITE3_TEXT);
1190: $stmt->bindValue(':run_arguments_json', Json::stringify($this->Arguments), \SQLITE3_TEXT);
1191: $stmt->execute();
1192: $stmt->close();
1193:
1194: $id = $this->db()->lastInsertRowID();
1195: $this->RunId = $id;
1196: $this->RunUuid = $uuid;
1197: unset($this->Command, $this->Arguments);
1198:
1199: foreach ($this->DeferredProviders as $provider) {
1200: $this->registerProvider($provider);
1201: }
1202: unset($this->DeferredProviders);
1203:
1204: foreach ($this->DeferredEntityTypes as $entity) {
1205: $this->registerEntityType($entity);
1206: }
1207: unset($this->DeferredEntityTypes);
1208:
1209: foreach ($this->DeferredNamespaces as $prefix => [$uri, $namespace, $helper]) {
1210: $this->registerNamespace($prefix, $uri, $namespace, $helper);
1211: }
1212: unset($this->DeferredNamespaces);
1213:
1214: return $this->reload();
1215: }
1216:
1217: /**
1218: * @return $this
1219: */
1220: private function reload()
1221: {
1222: $db = $this->db();
1223: $sql = <<<SQL
1224: SELECT
1225: entity_namespace_prefix,
1226: base_uri,
1227: php_namespace
1228: FROM
1229: _sync_entity_namespace
1230: ORDER BY
1231: LENGTH(php_namespace) DESC;
1232: SQL;
1233: $stmt = $this->prepare($sql);
1234: $result = $this->execute($stmt);
1235: $this->NamespacesByPrefix = [];
1236: $this->NamespaceUrisByPrefix = [];
1237: while (($row = $result->fetchArray(\SQLITE3_NUM)) !== false) {
1238: /** @var array{string,string,string} $row */
1239: $this->NamespacesByPrefix[$row[0]] = $row[2];
1240: $this->NamespaceUrisByPrefix[$row[0]] = $row[1];
1241: }
1242: $result->finalize();
1243: $stmt->close();
1244:
1245: return $this;
1246: }
1247:
1248: /**
1249: * @internal
1250: */
1251: public function __destruct()
1252: {
1253: $exitStatus = -1;
1254: if (Err::isLoaded() && Err::isShuttingDown()) {
1255: $exitStatus = Err::getExitStatus();
1256: }
1257: $this->close($exitStatus);
1258: }
1259: }
1260: