1: <?php declare(strict_types=1);
2:
3: namespace Salient\Sync;
4:
5: use Salient\Contract\Core\Exception\MethodNotImplementedExceptionInterface;
6: use Salient\Contract\Core\MessageLevel as Level;
7: use Salient\Contract\Sync\Exception\UnreachableBackendExceptionInterface;
8: use Salient\Contract\Sync\DeferralPolicy;
9: use Salient\Contract\Sync\DeferredEntityInterface;
10: use Salient\Contract\Sync\DeferredRelationshipInterface;
11: use Salient\Contract\Sync\ErrorType;
12: use Salient\Contract\Sync\HydrationPolicy;
13: use Salient\Contract\Sync\SyncEntityInterface;
14: use Salient\Contract\Sync\SyncErrorCollectionInterface;
15: use Salient\Contract\Sync\SyncErrorInterface;
16: use Salient\Contract\Sync\SyncNamespaceHelperInterface;
17: use Salient\Contract\Sync\SyncProviderInterface;
18: use Salient\Contract\Sync\SyncStoreInterface;
19: use Salient\Core\Facade\Console;
20: use Salient\Core\Facade\Err;
21: use Salient\Core\Facade\Event;
22: use Salient\Core\AbstractStore;
23: use Salient\Sync\Event\SyncStoreLoadedEvent;
24: use Salient\Sync\Exception\HeartbeatCheckFailedException;
25: use Salient\Sync\Exception\SyncStoreException;
26: use Salient\Sync\Support\SyncErrorCollection;
27: use Salient\Utility\Arr;
28: use Salient\Utility\Get;
29: use Salient\Utility\Json;
30: use Salient\Utility\Regex;
31: use Salient\Utility\Str;
32: use Generator;
33: use InvalidArgumentException;
34: use LogicException;
35: use ReflectionClass;
36:
37: /**
38: * Tracks the state of entities synced to and from third-party backends in a
39: * local SQLite database
40: *
41: * Creating a {@see SyncStore} instance starts a sync operation run that must be
42: * terminated by calling {@see SyncStore::close()}, otherwise a failed run is
43: * recorded.
44: */
45: final class SyncStore extends AbstractStore implements SyncStoreInterface
46: {
47: private ?int $RunId = null;
48: private string $RunUuid;
49:
50: /**
51: * Prefix => true
52: *
53: * @var array<string,true>
54: */
55: private array $Namespaces = [];
56:
57: /**
58: * Provider ID => provider
59: *
60: * @var array<int,SyncProviderInterface>
61: */
62: private array $Providers = [];
63:
64: /**
65: * Provider hash => provider ID
66: *
67: * @var array<string,int>
68: */
69: private array $ProviderMap = [];
70:
71: /**
72: * Entity type ID => entity type
73: *
74: * @var array<int,class-string<SyncEntityInterface>>
75: */
76: private array $EntityTypes = [];
77:
78: /**
79: * Entity type => entity type ID
80: *
81: * @var array<class-string<SyncEntityInterface>,int>
82: */
83: private array $EntityTypeMap = [];
84:
85: /**
86: * Prefix => PHP namespace with trailing "/"
87: *
88: * @var array<string,string>
89: */
90: private array $NamespacesByPrefix;
91:
92: /**
93: * Prefix => namespace URI with trailing "/"
94: *
95: * @var array<string,string>
96: */
97: private array $NamespaceUrisByPrefix;
98:
99: /**
100: * Prefix => namespace helper
101: *
102: * @var array<string,SyncNamespaceHelperInterface>
103: */
104: private array $NamespaceHelpersByPrefix;
105:
106: /**
107: * Provider ID => entity type ID => entity ID => entity
108: *
109: * @var array<int,array<int,array<int|string,SyncEntityInterface>>>
110: */
111: private array $Entities;
112:
113: /**
114: * SPL object ID => checkpoint
115: *
116: * @var array<int,int>
117: */
118: private array $EntityCheckpoints;
119:
120: /**
121: * Provider ID => entity type ID => entity ID => [ deferred entity ]
122: *
123: * @var array<int,array<int,array<int|string,DeferredEntityInterface<SyncEntityInterface>[]>>>
124: */
125: private array $DeferredEntities = [];
126:
127: /**
128: * Provider ID => entity type ID => requesting entity type ID => requesting
129: * entity property => requesting entity ID => [ deferred relationship ]
130: *
131: * @var array<int,array<int,array<int,array<string,array<int|string,DeferredRelationshipInterface<SyncEntityInterface>[]>>>>>
132: */
133: private array $DeferredRelationships = [];
134:
135: private SyncErrorCollection $Errors;
136: private int $DeferralCheckpoint = 0;
137: private string $Command;
138: /** @var string[] */
139: private array $Arguments;
140: /** @var array<string,array{string,string,SyncNamespaceHelperInterface|null}> */
141: private array $DeferredNamespaces = [];
142: /** @var SyncProviderInterface[] */
143: private array $DeferredProviders = [];
144: /** @var class-string<SyncEntityInterface>[] */
145: private array $DeferredEntityTypes = [];
146:
147: /**
148: * Creates a new SyncStore object
149: *
150: * @param string $command The canonical name of the command performing sync
151: * operations (e.g. a qualified class and/or method name).
152: * @param string[] $arguments Arguments passed to the command.
153: */
154: public function __construct(
155: string $filename = ':memory:',
156: string $command = '',
157: array $arguments = []
158: ) {
159: $this->assertCanUpsert();
160:
161: $this->Errors = new SyncErrorCollection();
162: $this->Command = $command;
163: $this->Arguments = $arguments;
164:
165: $this->openDb(
166: $filename,
167: <<<SQL
168: CREATE TABLE IF NOT EXISTS
169: _sync_run (
170: run_id INTEGER NOT NULL PRIMARY KEY,
171: run_uuid BLOB NOT NULL UNIQUE,
172: run_command TEXT NOT NULL,
173: run_arguments_json TEXT NOT NULL,
174: started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
175: finished_at DATETIME,
176: exit_status INTEGER,
177: error_count INTEGER,
178: warning_count INTEGER,
179: errors_json TEXT
180: );
181:
182: CREATE TABLE IF NOT EXISTS
183: _sync_provider (
184: provider_id INTEGER NOT NULL PRIMARY KEY,
185: provider_hash BLOB NOT NULL UNIQUE,
186: provider_class TEXT NOT NULL,
187: added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
188: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
189: );
190:
191: CREATE TABLE IF NOT EXISTS
192: _sync_entity_type (
193: entity_type_id INTEGER NOT NULL PRIMARY KEY,
194: entity_type_class TEXT NOT NULL UNIQUE,
195: added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
196: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
197: );
198:
199: CREATE TABLE IF NOT EXISTS
200: _sync_entity_type_state (
201: provider_id INTEGER NOT NULL,
202: entity_type_id INTEGER NOT NULL,
203: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP,
204: last_sync DATETIME,
205: PRIMARY KEY (provider_id, entity_type_id),
206: FOREIGN KEY (provider_id) REFERENCES _sync_provider,
207: FOREIGN KEY (entity_type_id) REFERENCES _sync_entity_type
208: );
209:
210: CREATE TABLE IF NOT EXISTS
211: _sync_entity (
212: provider_id INTEGER NOT NULL,
213: entity_type_id INTEGER NOT NULL,
214: entity_id TEXT NOT NULL,
215: canonical_id TEXT,
216: is_dirty INTEGER NOT NULL DEFAULT 0,
217: is_deleted INTEGER NOT NULL DEFAULT 0,
218: added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
219: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP,
220: last_sync DATETIME,
221: entity_json TEXT NOT NULL,
222: PRIMARY KEY (provider_id, entity_type_id, entity_id),
223: FOREIGN KEY (provider_id) REFERENCES _sync_provider,
224: FOREIGN KEY (entity_type_id) REFERENCES _sync_entity_type
225: ) WITHOUT ROWID;
226:
227: CREATE TABLE IF NOT EXISTS
228: _sync_entity_namespace (
229: entity_namespace_id INTEGER NOT NULL PRIMARY KEY,
230: entity_namespace_prefix TEXT NOT NULL UNIQUE,
231: base_uri TEXT NOT NULL,
232: php_namespace TEXT NOT NULL,
233: added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
234: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
235: );
236:
237: SQL
238: );
239:
240: Event::dispatch(new SyncStoreLoadedEvent($this));
241: }
242:
243: /**
244: * Terminate the current run and close the database
245: */
246: public function close(int $exitStatus = 0): void
247: {
248: // Don't start a run now
249: if (!$this->isOpen() || $this->RunId === null) {
250: $this->closeDb();
251: return;
252: }
253:
254: $sql = <<<SQL
255: UPDATE
256: _sync_run
257: SET
258: finished_at = CURRENT_TIMESTAMP,
259: exit_status = :exit_status,
260: error_count = :error_count,
261: warning_count = :warning_count,
262: errors_json = :errors_json
263: WHERE
264: run_uuid = :run_uuid;
265: SQL;
266:
267: $stmt = $this->prepare($sql);
268: $stmt->bindValue(':exit_status', $exitStatus, \SQLITE3_INTEGER);
269: $stmt->bindValue(':run_uuid', $this->RunUuid, \SQLITE3_BLOB);
270: $stmt->bindValue(':error_count', $this->Errors->getErrorCount(), \SQLITE3_INTEGER);
271: $stmt->bindValue(':warning_count', $this->Errors->getWarningCount(), \SQLITE3_INTEGER);
272: $stmt->bindValue(':errors_json', Json::stringify($this->Errors), \SQLITE3_TEXT);
273: $stmt->execute();
274: $stmt->close();
275:
276: $this->closeDb();
277: }
278:
279: /**
280: * @phpstan-assert-if-true !null $this->RunId
281: */
282: public function runHasStarted(): bool
283: {
284: return $this->RunId !== null;
285: }
286:
287: /**
288: * @inheritDoc
289: */
290: public function getRunId(): int
291: {
292: $this->assertRunHasStarted();
293:
294: return $this->RunId;
295: }
296:
297: /**
298: * @inheritDoc
299: */
300: public function getRunUuid(): string
301: {
302: $this->assertRunHasStarted();
303:
304: return Get::uuid($this->RunUuid);
305: }
306:
307: /**
308: * @inheritDoc
309: */
310: public function getBinaryRunUuid(): string
311: {
312: $this->assertRunHasStarted();
313:
314: return $this->RunUuid;
315: }
316:
317: /**
318: * @phpstan-assert !null $this->RunId
319: */
320: private function assertRunHasStarted(): void
321: {
322: if ($this->RunId === null) {
323: throw new LogicException('Run has not started');
324: }
325: }
326:
327: /**
328: * @inheritDoc
329: */
330: public function registerProvider(SyncProviderInterface $provider)
331: {
332: // Don't start a run just to register a provider
333: if (!$this->runHasStarted()) {
334: $this->DeferredProviders[] = $provider;
335: return $this;
336: }
337:
338: $class = get_class($provider);
339: $hash = $this->getProviderSignature($provider);
340:
341: if (isset($this->ProviderMap[$hash])) {
342: throw new LogicException(sprintf(
343: 'Provider already registered: %s',
344: $class,
345: ));
346: }
347:
348: // Update `last_seen` if the provider is already in the database
349: $sql = <<<SQL
350: INSERT INTO
351: _sync_provider (provider_hash, provider_class)
352: VALUES
353: (:provider_hash, :provider_class) ON CONFLICT (provider_hash) DO
354: UPDATE
355: SET
356: last_seen = CURRENT_TIMESTAMP;
357: SQL;
358: $stmt = $this->prepare($sql);
359: $stmt->bindValue(':provider_hash', $hash, \SQLITE3_BLOB);
360: $stmt->bindValue(':provider_class', $class, \SQLITE3_TEXT);
361: $stmt->execute();
362: $stmt->close();
363:
364: $sql = <<<SQL
365: SELECT
366: provider_id
367: FROM
368: _sync_provider
369: WHERE
370: provider_hash = :provider_hash;
371: SQL;
372: $stmt = $this->prepare($sql);
373: $stmt->bindValue(':provider_hash', $hash, \SQLITE3_BLOB);
374: $result = $this->execute($stmt);
375: /** @var array{int}|false */
376: $row = $result->fetchArray(\SQLITE3_NUM);
377: $stmt->close();
378:
379: if ($row === false) {
380: // @codeCoverageIgnoreStart
381: throw new SyncStoreException('Error retrieving provider ID');
382: // @codeCoverageIgnoreEnd
383: }
384:
385: $providerId = $row[0];
386: $this->Providers[$providerId] = $provider;
387: $this->ProviderMap[$hash] = $providerId;
388:
389: return $this;
390: }
391:
392: /**
393: * @inheritDoc
394: */
395: public function hasProvider($provider): bool
396: {
397: if ($provider instanceof SyncProviderInterface) {
398: $providers = $this->runHasStarted()
399: ? $this->Providers
400: : $this->DeferredProviders;
401: return in_array($provider, $providers, true);
402: }
403:
404: if (!$this->runHasStarted()) {
405: foreach ($this->DeferredProviders as $deferred) {
406: if ($this->getProviderSignature($deferred) === $provider) {
407: return true;
408: }
409: }
410: return false;
411: }
412:
413: return isset($this->ProviderMap[$provider]);
414: }
415:
416: /**
417: * @inheritDoc
418: */
419: public function getProviderId($provider): int
420: {
421: if (!$this->runHasStarted()) {
422: $this->check();
423: }
424:
425: $hash = $provider instanceof SyncProviderInterface
426: ? $this->getProviderSignature($provider)
427: : $provider;
428: $id = $this->ProviderMap[$hash] ?? null;
429: if ($id === null) {
430: throw new LogicException('Provider not registered'
431: . ($provider instanceof SyncProviderInterface
432: ? sprintf(': %s', get_class($provider))
433: : ''));
434: }
435: return $id;
436: }
437:
438: /**
439: * @inheritDoc
440: */
441: public function getProvider($provider): SyncProviderInterface
442: {
443: if (is_int($provider)) {
444: if (!$this->runHasStarted()) {
445: throw new LogicException(sprintf(
446: 'Provider ID not issued during run: %d',
447: $provider,
448: ));
449: }
450: if (!isset($this->Providers[$provider])) {
451: throw new LogicException(sprintf(
452: 'Provider not registered: #%d',
453: $provider,
454: ));
455: }
456: return $this->Providers[$provider];
457: }
458:
459: // Don't start a run just to get a provider
460: if (!$this->runHasStarted()) {
461: foreach ($this->DeferredProviders as $deferred) {
462: if ($this->getProviderSignature($deferred) === $provider) {
463: return $deferred;
464: }
465: }
466: throw new LogicException('Provider not registered');
467: }
468:
469: $id = $this->ProviderMap[$provider] ?? null;
470: if ($id === null) {
471: throw new LogicException('Provider not registered');
472: }
473: return $this->Providers[$id];
474: }
475:
476: /**
477: * @inheritDoc
478: */
479: public function getProviderSignature(SyncProviderInterface $provider): string
480: {
481: $class = get_class($provider);
482: return Get::binaryHash(implode("\0", [
483: $class,
484: ...$provider->getBackendIdentifier(),
485: ]));
486: }
487:
488: /**
489: * @inheritDoc
490: */
491: public function registerEntityType(string $entityType)
492: {
493: // Don't start a run just to register an entity type
494: if (!$this->runHasStarted()) {
495: $this->DeferredEntityTypes[] = $entityType;
496: return $this;
497: }
498:
499: if (isset($this->EntityTypeMap[$entityType])) {
500: return $this;
501: }
502:
503: $class = new ReflectionClass($entityType);
504:
505: if ($entityType !== $class->getName()) {
506: throw new LogicException(sprintf(
507: 'Not an exact match for declared class (%s expected): %s',
508: $class->getName(),
509: $entityType,
510: ));
511: }
512:
513: if (!$class->implementsInterface(SyncEntityInterface::class)) {
514: // @codeCoverageIgnoreStart
515: throw new LogicException(sprintf(
516: 'Does not implement %s: %s',
517: SyncEntityInterface::class,
518: $entityType,
519: ));
520: // @codeCoverageIgnoreEnd
521: }
522:
523: // Update `last_seen` if the entity type is already in the database
524: $sql = <<<SQL
525: INSERT INTO
526: _sync_entity_type (entity_type_class)
527: VALUES
528: (:entity_type_class) ON CONFLICT (entity_type_class) DO
529: UPDATE
530: SET
531: last_seen = CURRENT_TIMESTAMP;
532: SQL;
533: $stmt = $this->prepare($sql);
534: $stmt->bindValue(':entity_type_class', $entityType, \SQLITE3_TEXT);
535: $stmt->execute();
536: $stmt->close();
537:
538: $sql = <<<SQL
539: SELECT
540: entity_type_id
541: FROM
542: _sync_entity_type
543: WHERE
544: entity_type_class = :entity_type_class;
545: SQL;
546: $stmt = $this->prepare($sql);
547: $stmt->bindValue(':entity_type_class', $entityType, \SQLITE3_TEXT);
548: $result = $this->execute($stmt);
549: /** @var array{int}|false */
550: $row = $result->fetchArray(\SQLITE3_NUM);
551: $stmt->close();
552:
553: if ($row === false) {
554: throw new SyncStoreException('Error retrieving entity type ID');
555: }
556:
557: $this->EntityTypes[$row[0]] = $entityType;
558: $this->EntityTypeMap[$entityType] = $row[0];
559:
560: return $this;
561: }
562:
563: /**
564: * @inheritDoc
565: */
566: public function hasEntityType(string $entityType): bool
567: {
568: if (!$this->runHasStarted()) {
569: return in_array($entityType, $this->DeferredEntityTypes, true);
570: }
571:
572: return isset($this->EntityTypeMap[$entityType]);
573: }
574:
575: /**
576: * @inheritDoc
577: */
578: public function getEntityTypeId(string $entityType): int
579: {
580: if (!$this->runHasStarted()) {
581: $this->check();
582: }
583:
584: $id = $this->EntityTypeMap[$entityType] ?? null;
585: if ($id === null) {
586: throw new LogicException(sprintf(
587: 'Entity not registered: %s',
588: $entityType,
589: ));
590: }
591: return $id;
592: }
593:
594: /**
595: * @inheritDoc
596: */
597: public function getEntityType(int $entityTypeId): string
598: {
599: if (!$this->runHasStarted()) {
600: throw new LogicException(sprintf(
601: 'Entity type ID not issued during run: %d',
602: $entityTypeId,
603: ));
604: }
605: if (!isset($this->EntityTypes[$entityTypeId])) {
606: throw new LogicException(sprintf(
607: 'Entity type not registered: #%d',
608: $entityTypeId,
609: ));
610: }
611: return $this->EntityTypes[$entityTypeId];
612: }
613:
614: /**
615: * @inheritDoc
616: */
617: public function registerNamespace(
618: string $prefix,
619: string $uri,
620: string $namespace,
621: ?SyncNamespaceHelperInterface $helper = null
622: ) {
623: $prefix = Str::lower($prefix);
624: if (isset($this->Namespaces[$prefix]) || (
625: !$this->runHasStarted()
626: && isset($this->DeferredNamespaces[$prefix])
627: )) {
628: throw new LogicException(sprintf(
629: 'Prefix already registered: %s',
630: $prefix,
631: ));
632: }
633:
634: // Namespaces are validated and normalised before deferral so
635: // `classToNamespace()` can be used without starting a run.
636: // `$DeferredNamespaces` is used to ensure it's only done once.
637: if (
638: !isset($this->DeferredNamespaces)
639: || !isset($this->DeferredNamespaces[$prefix])
640: ) {
641: if (!Regex::match('/^[a-z][-a-z0-9+.]*$/iD', $prefix)) {
642: throw new InvalidArgumentException(sprintf(
643: 'Invalid prefix: %s',
644: $prefix,
645: ));
646: }
647: $uri = rtrim($uri, '/') . '/';
648: $namespace = trim($namespace, '\\') . '\\';
649: }
650:
651: // Don't start a run just to register a namespace
652: if (!$this->runHasStarted()) {
653: $this->DeferredNamespaces[$prefix] = [$uri, $namespace, $helper];
654: return $this;
655: }
656:
657: // Update `last_seen` if the namespace is already in the database
658: $sql = <<<SQL
659: INSERT INTO
660: _sync_entity_namespace (entity_namespace_prefix, base_uri, php_namespace)
661: VALUES
662: (
663: :entity_namespace_prefix,
664: :base_uri,
665: :php_namespace
666: ) ON CONFLICT (entity_namespace_prefix) DO
667: UPDATE
668: SET
669: base_uri = excluded.base_uri,
670: php_namespace = excluded.php_namespace,
671: last_seen = CURRENT_TIMESTAMP;
672: SQL;
673: $stmt = $this->prepare($sql);
674: $stmt->bindValue(':entity_namespace_prefix', $prefix, \SQLITE3_TEXT);
675: $stmt->bindValue(':base_uri', $uri, \SQLITE3_TEXT);
676: $stmt->bindValue(':php_namespace', $namespace, \SQLITE3_TEXT);
677: $stmt->execute();
678: $stmt->close();
679:
680: $this->Namespaces[$prefix] = true;
681:
682: if ($helper) {
683: $this->NamespaceHelpersByPrefix[$prefix] = $helper;
684: }
685:
686: // Don't reload while bootstrapping
687: if (!isset($this->NamespacesByPrefix)) {
688: return $this;
689: }
690:
691: return $this->reload();
692: }
693:
694: /**
695: * @inheritDoc
696: */
697: public function getEntityTypeUri(string $entityType, bool $compact = true): string
698: {
699: $prefix = $this->classToNamespace($entityType, $uri, $namespace);
700: if ($prefix === null) {
701: return SyncUtil::getEntityTypeUri($entityType, $compact);
702: }
703: /** @var string $namespace */
704: $entityType = str_replace('\\', '/', substr(ltrim($entityType, '\\'), strlen($namespace)));
705:
706: return $compact
707: ? "{$prefix}:{$entityType}"
708: : "{$uri}{$entityType}";
709: }
710:
711: /**
712: * @inheritDoc
713: */
714: public function getNamespacePrefix(string $class): ?string
715: {
716: return $this->classToNamespace($class);
717: }
718:
719: /**
720: * @inheritDoc
721: */
722: public function getNamespaceHelper(string $class): ?SyncNamespaceHelperInterface
723: {
724: if ($this->classToNamespace(
725: $class,
726: $uri,
727: $namespace,
728: $helper
729: ) === null) {
730: return null;
731: }
732:
733: return $helper;
734: }
735:
736: /**
737: * @param class-string<SyncEntityInterface|SyncProviderInterface> $class
738: * @param-out SyncNamespaceHelperInterface|null $helper
739: */
740: private function classToNamespace(
741: string $class,
742: ?string &$uri = null,
743: ?string &$namespace = null,
744: ?SyncNamespaceHelperInterface &$helper = null
745: ): ?string {
746: $class = Str::lower(ltrim($class, '\\'));
747: // Don't start a run just to resolve a class to a namespace
748: $namespaces = $this->runHasStarted()
749: ? $this->getNamespaces()
750: : $this->DeferredNamespaces;
751: foreach ($namespaces as $prefix => [$_uri, $_namespace, $_helper]) {
752: $_namespace = Str::lower($_namespace);
753: if (strpos($class, $_namespace) === 0) {
754: $uri = $_uri;
755: $namespace = $_namespace;
756: $helper = $_helper;
757: return $prefix;
758: }
759: }
760: return null;
761: }
762:
763: /**
764: * @return Generator<string,array{string,string,SyncNamespaceHelperInterface|null}>
765: */
766: private function getNamespaces(): Generator
767: {
768: foreach ($this->NamespacesByPrefix as $prefix => $namespace) {
769: yield $prefix => [
770: $this->NamespaceUrisByPrefix[$prefix],
771: $namespace,
772: $this->NamespaceHelpersByPrefix[$prefix] ?? null,
773: ];
774: }
775: }
776:
777: /**
778: * @inheritDoc
779: */
780: public function setEntity(
781: int $providerId,
782: string $entityType,
783: $entityId,
784: SyncEntityInterface $entity
785: ) {
786: $entityTypeId = $this->EntityTypeMap[$entityType];
787: if (isset($this->Entities[$providerId][$entityTypeId][$entityId])) {
788: throw new LogicException('Entity already registered');
789: }
790: $this->Entities[$providerId][$entityTypeId][$entityId] = $entity;
791: $this->EntityCheckpoints[spl_object_id($entity)] = $this->DeferralCheckpoint++;
792:
793: // Resolve the entity's entries in the deferred entity queue (if any)
794: $deferred = $this->DeferredEntities[$providerId][$entityTypeId][$entityId] ?? null;
795: if ($deferred) {
796: foreach ($deferred as $i => $deferredEntity) {
797: $deferredEntity->replace($entity);
798: unset($this->DeferredEntities[$providerId][$entityTypeId][$entityId][$i]);
799: }
800: unset($this->DeferredEntities[$providerId][$entityTypeId][$entityId]);
801: }
802:
803: return $this;
804: }
805:
806: /**
807: * @template TEntity of SyncEntityInterface
808: *
809: * @param class-string<TEntity> $entityType
810: * @return TEntity|null
811: */
812: public function getEntity(
813: int $providerId,
814: string $entityType,
815: $entityId,
816: ?bool $offline = null
817: ): ?SyncEntityInterface {
818: $entityTypeId = $this->EntityTypeMap[$entityType];
819: /** @var TEntity|null */
820: $entity = $this->Entities[$providerId][$entityTypeId][$entityId] ?? null;
821: if ($entity || $offline === false) {
822: return $entity;
823: }
824: return null;
825: }
826:
827: /**
828: * @template TEntity of SyncEntityInterface
829: *
830: * @param class-string<TEntity> $entityType
831: * @param DeferredEntityInterface<TEntity> $entity
832: */
833: public function deferEntity(
834: int $providerId,
835: string $entityType,
836: $entityId,
837: DeferredEntityInterface $entity
838: ) {
839: $entityTypeId = $this->EntityTypeMap[$entityType];
840: /** @var TEntity|null */
841: $_entity = $this->Entities[$providerId][$entityTypeId][$entityId] ?? null;
842: if ($_entity) {
843: $entity->replace($_entity);
844: return $this;
845: }
846:
847: // Get the deferral policy of the context within which the entity was
848: // deferred
849: $context = $entity->getContext();
850: if ($context) {
851: $last = $context->getLastEntity();
852: if ($last) {
853: $context = $last->getContext();
854: }
855: }
856: $policy = $context
857: ? $context->getDeferralPolicy()
858: : null;
859:
860: $this->DeferredEntities[$providerId][$entityTypeId][$entityId][
861: $this->DeferralCheckpoint++
862: ] = $entity;
863:
864: // In `RESOLVE_EARLY` mode, deferred entities are added to
865: // `$this->DeferredEntities` for the benefit of `$this->setEntity()`,
866: // which only calls `DeferredEntityInterface::replace()` method on
867: // registered instances
868: if ($policy === DeferralPolicy::RESOLVE_EARLY) {
869: $entity->resolve();
870: return $this;
871: }
872:
873: return $this;
874: }
875:
876: /**
877: * @inheritDoc
878: */
879: public function deferRelationship(
880: int $providerId,
881: string $entityType,
882: string $forEntityType,
883: string $forEntityProperty,
884: $forEntityId,
885: DeferredRelationshipInterface $relationship
886: ) {
887: $entityTypeId = $this->EntityTypeMap[$entityType];
888: $forEntityTypeId = $this->EntityTypeMap[$forEntityType];
889:
890: /** @var DeferredRelationshipInterface<SyncEntityInterface>[]|null */
891: $deferredList = &$this->DeferredRelationships[$providerId][$entityTypeId][
892: $forEntityTypeId
893: ][$forEntityProperty][$forEntityId];
894:
895: if (isset($deferredList)) {
896: throw new LogicException('Relationship already registered');
897: }
898:
899: $deferredList = [];
900:
901: // Get hydration policy from the context within which the deferral was
902: // created
903: $context = $relationship->getContext();
904: if ($context) {
905: $last = $context->getLastEntity();
906: if ($last) {
907: $context = $last->getContext();
908: }
909: }
910: $policy = $context
911: ? $context->getHydrationPolicy($entityType)
912: : 0;
913:
914: if ($policy === HydrationPolicy::LAZY) {
915: return $this;
916: }
917:
918: if ($policy === HydrationPolicy::EAGER) {
919: $relationship->resolve();
920: return $this;
921: }
922:
923: $deferredList[$this->DeferralCheckpoint++] = $relationship;
924: return $this;
925: }
926:
927: /**
928: * @inheritDoc
929: */
930: public function resolveDeferrals(
931: ?int $fromCheckpoint = null,
932: ?string $entityType = null,
933: ?int $providerId = null
934: ): array {
935: $checkpoint = $this->DeferralCheckpoint;
936: do {
937: // Resolve relationships first because they typically deliver
938: // multiple entities per round trip, some of which may be in the
939: // deferred entity queue
940: $deferred = $this->resolveDeferredRelationships($fromCheckpoint, $entityType, null, null, $providerId);
941: if ($deferred) {
942: foreach ($deferred as $relationship) {
943: foreach ($relationship as $entity) {
944: $objectId = spl_object_id($entity);
945: if ($this->EntityCheckpoints[$objectId] < $checkpoint) {
946: continue;
947: }
948: $resolved[$objectId] = $entity;
949: }
950: }
951: continue;
952: }
953:
954: $deferred = $this->resolveDeferredEntities($fromCheckpoint, $entityType, $providerId);
955: if (!$deferred) {
956: continue;
957: }
958: foreach ($deferred as $entity) {
959: $resolved[spl_object_id($entity)] = $entity;
960: }
961: } while ($deferred);
962:
963: return array_values($resolved ?? []);
964: }
965:
966: /**
967: * @inheritDoc
968: */
969: public function getDeferralCheckpoint(): int
970: {
971: return $this->DeferralCheckpoint;
972: }
973:
974: /**
975: * @inheritDoc
976: */
977: public function resolveDeferredEntities(
978: ?int $fromCheckpoint = null,
979: ?string $entityType = null,
980: ?int $providerId = null
981: ): array {
982: $entityTypeId = $entityType === null
983: ? null
984: : $this->EntityTypeMap[$entityType];
985:
986: $resolved = [];
987: foreach ($this->DeferredEntities as $provId => $entitiesByTypeId) {
988: if ($providerId !== null && $provId !== $providerId) {
989: continue;
990: }
991: foreach ($entitiesByTypeId as $entTypeId => $entities) {
992: if ($entityTypeId !== null && $entTypeId !== $entityTypeId) {
993: continue;
994: }
995:
996: if ($fromCheckpoint !== null) {
997: $_entities = $entities;
998: foreach ($_entities as $entityId => $deferred) {
999: foreach ($deferred as $i => $deferredEntity) {
1000: if ($i < $fromCheckpoint) {
1001: unset($_entities[$entityId][$i]);
1002: if (!$_entities[$entityId]) {
1003: unset($_entities[$entityId]);
1004: }
1005: }
1006: }
1007: }
1008: if (!$_entities) {
1009: continue;
1010: }
1011: $entities = $_entities;
1012: }
1013:
1014: /** @var array<int|string,non-empty-array<DeferredEntityInterface<SyncEntityInterface>>> $entities */
1015: foreach ($entities as $entityId => $deferred) {
1016: $deferredEntity = reset($deferred);
1017: $resolved[] = $deferredEntity->resolve();
1018: }
1019: }
1020: }
1021:
1022: return $resolved;
1023: }
1024:
1025: /**
1026: * @inheritDoc
1027: */
1028: public function resolveDeferredRelationships(
1029: ?int $fromCheckpoint = null,
1030: ?string $entityType = null,
1031: ?string $forEntityType = null,
1032: ?string $forEntityProperty = null,
1033: ?int $providerId = null
1034: ): array {
1035: $entityTypeId = $entityType === null
1036: ? null
1037: : $this->EntityTypeMap[$entityType];
1038: $forEntityTypeId = $forEntityType === null
1039: ? null
1040: : $this->EntityTypeMap[$forEntityType];
1041:
1042: $resolved = [];
1043: foreach ($this->DeferredRelationships as $provId => $relationshipsByEntTypeId) {
1044: if ($providerId !== null && $provId !== $providerId) {
1045: continue;
1046: }
1047: foreach ($relationshipsByEntTypeId as $entTypeId => $relationshipsByForEntTypeId) {
1048: if ($entityTypeId !== null && $entTypeId !== $entityTypeId) {
1049: continue;
1050: }
1051: foreach ($relationshipsByForEntTypeId as $forEntTypeId => $relationshipsByForEntProp) {
1052: if ($forEntityTypeId !== null && $forEntTypeId !== $forEntityTypeId) {
1053: continue;
1054: }
1055: foreach ($relationshipsByForEntProp as $forEntProp => $relationshipsByForEntId) {
1056: if ($forEntityProperty !== null && $forEntProp !== $forEntityProperty) {
1057: continue;
1058: }
1059: foreach ($relationshipsByForEntId as $forEntId => $relationships) {
1060: if ($fromCheckpoint !== null) {
1061: $_relationships = $relationships;
1062: foreach ($_relationships as $index => $deferred) {
1063: if ($index < $fromCheckpoint) {
1064: unset($_relationships[$index]);
1065: }
1066: }
1067: if (!$_relationships) {
1068: continue;
1069: }
1070: $relationships = $_relationships;
1071: }
1072:
1073: foreach ($relationships as $index => $deferred) {
1074: $resolved[] = $deferred->resolve();
1075: unset($this->DeferredRelationships[$provId][$entTypeId][$forEntTypeId][$forEntProp][$forEntId][$index]);
1076: }
1077: }
1078: }
1079: }
1080: }
1081: }
1082:
1083: return $resolved;
1084: }
1085:
1086: /**
1087: * @inheritDoc
1088: */
1089: public function checkProviderHeartbeats(
1090: int $ttl = 300,
1091: bool $failEarly = true,
1092: SyncProviderInterface ...$providers
1093: ) {
1094: $this->check();
1095:
1096: if ($providers) {
1097: $providers = Arr::unique($providers);
1098: } elseif ($this->Providers) {
1099: $providers = $this->Providers;
1100: } else {
1101: return $this;
1102: }
1103:
1104: $failed = [];
1105: /** @var SyncProviderInterface $provider */
1106: foreach ($providers as $provider) {
1107: $id = $provider->getProviderId();
1108: $name = sprintf('%s [#%d]', $provider->getName(), $id);
1109: Console::logProgress('Checking', $name);
1110: try {
1111: $provider->checkHeartbeat($ttl);
1112: Console::log('Heartbeat OK:', $name);
1113: } catch (MethodNotImplementedExceptionInterface $ex) {
1114: Console::log('Heartbeat check not supported:', $name);
1115: } catch (UnreachableBackendExceptionInterface $ex) {
1116: Console::exception($ex, Level::DEBUG, null);
1117: Console::log('Heartbeat check failed:', $name);
1118: $failed[] = $provider;
1119: $this->recordError(
1120: SyncError::build()
1121: ->errorType(ErrorType::BACKEND_UNREACHABLE)
1122: ->message('Heartbeat check failed: %s')
1123: ->values([[
1124: 'provider_id' => $id,
1125: 'provider_class' => get_class($provider),
1126: 'exception' => get_class($ex),
1127: 'message' => $ex->getMessage()
1128: ]])
1129: ->build()
1130: );
1131: }
1132: if ($failEarly && $failed) {
1133: break;
1134: }
1135: }
1136:
1137: if ($failed) {
1138: throw new HeartbeatCheckFailedException(...$failed);
1139: }
1140:
1141: return $this;
1142: }
1143:
1144: /**
1145: * @inheritDoc
1146: */
1147: public function recordError(SyncErrorInterface $error, bool $deduplicate = false)
1148: {
1149: if ($deduplicate) {
1150: $key = $this->Errors->keyOf($error);
1151: if ($key !== null) {
1152: $this->Errors[$key] = $this->Errors[$key]->count();
1153: return $this;
1154: }
1155: }
1156:
1157: $this->Errors[] = $error;
1158:
1159: return $this;
1160: }
1161:
1162: /**
1163: * @inheritDoc
1164: */
1165: public function getErrors(): SyncErrorCollectionInterface
1166: {
1167: return clone $this->Errors;
1168: }
1169:
1170: /**
1171: * @phpstan-assert !null $this->RunId
1172: */
1173: protected function check()
1174: {
1175: if ($this->RunId !== null) {
1176: return $this;
1177: }
1178:
1179: if (!$this->isCheckRunning()) {
1180: return $this->safeCheck();
1181: }
1182:
1183: $sql = <<<SQL
1184: INSERT INTO _sync_run (run_uuid, run_command, run_arguments_json)
1185: VALUES (
1186: :run_uuid,
1187: :run_command,
1188: :run_arguments_json
1189: );
1190: SQL;
1191:
1192: $stmt = $this->prepare($sql);
1193: $stmt->bindValue(':run_uuid', $uuid = Get::binaryUuid(), \SQLITE3_BLOB);
1194: $stmt->bindValue(':run_command', $this->Command, \SQLITE3_TEXT);
1195: $stmt->bindValue(':run_arguments_json', Json::stringify($this->Arguments), \SQLITE3_TEXT);
1196: $stmt->execute();
1197: $stmt->close();
1198:
1199: $id = $this->db()->lastInsertRowID();
1200: $this->RunId = $id;
1201: $this->RunUuid = $uuid;
1202: unset($this->Command, $this->Arguments);
1203:
1204: foreach ($this->DeferredProviders as $provider) {
1205: $this->registerProvider($provider);
1206: }
1207: unset($this->DeferredProviders);
1208:
1209: foreach ($this->DeferredEntityTypes as $entity) {
1210: $this->registerEntityType($entity);
1211: }
1212: unset($this->DeferredEntityTypes);
1213:
1214: foreach ($this->DeferredNamespaces as $prefix => [$uri, $namespace, $helper]) {
1215: $this->registerNamespace($prefix, $uri, $namespace, $helper);
1216: }
1217: unset($this->DeferredNamespaces);
1218:
1219: return $this->reload();
1220: }
1221:
1222: /**
1223: * @return $this
1224: */
1225: private function reload()
1226: {
1227: $db = $this->db();
1228: $sql = <<<SQL
1229: SELECT
1230: entity_namespace_prefix,
1231: base_uri,
1232: php_namespace
1233: FROM
1234: _sync_entity_namespace
1235: ORDER BY
1236: LENGTH(php_namespace) DESC;
1237: SQL;
1238: $stmt = $this->prepare($sql);
1239: $result = $this->execute($stmt);
1240: $this->NamespacesByPrefix = [];
1241: $this->NamespaceUrisByPrefix = [];
1242: while (($row = $result->fetchArray(\SQLITE3_NUM)) !== false) {
1243: /** @var array{string,string,string} $row */
1244: $this->NamespacesByPrefix[$row[0]] = $row[2];
1245: $this->NamespaceUrisByPrefix[$row[0]] = $row[1];
1246: }
1247: $result->finalize();
1248: $stmt->close();
1249:
1250: return $this;
1251: }
1252:
1253: /**
1254: * @internal
1255: */
1256: public function __destruct()
1257: {
1258: $exitStatus = -1;
1259: if (Err::isLoaded() && Err::isShuttingDown()) {
1260: $exitStatus = Err::getExitStatus();
1261: }
1262: $this->close($exitStatus);
1263: }
1264: }
1265: