1: <?php declare(strict_types=1);
2:
3: namespace Salient\Sync;
4:
5: use Salient\Contract\Core\Exception\MethodNotImplementedException;
6: use Salient\Contract\Sync\Exception\UnreachableBackendExceptionInterface;
7: use Salient\Contract\Sync\DeferralPolicy;
8: use Salient\Contract\Sync\DeferredEntityInterface;
9: use Salient\Contract\Sync\DeferredRelationshipInterface;
10: use Salient\Contract\Sync\ErrorType;
11: use Salient\Contract\Sync\HydrationPolicy;
12: use Salient\Contract\Sync\SyncEntityInterface;
13: use Salient\Contract\Sync\SyncErrorCollectionInterface;
14: use Salient\Contract\Sync\SyncErrorInterface;
15: use Salient\Contract\Sync\SyncNamespaceHelperInterface;
16: use Salient\Contract\Sync\SyncProviderInterface;
17: use Salient\Contract\Sync\SyncStoreInterface;
18: use Salient\Core\Facade\Console;
19: use Salient\Core\Facade\Err;
20: use Salient\Core\Facade\Event;
21: use Salient\Core\Store;
22: use Salient\Sync\Event\SyncStoreLoadedEvent;
23: use Salient\Sync\Exception\HeartbeatCheckFailedException;
24: use Salient\Sync\Exception\SyncStoreException;
25: use Salient\Sync\Support\SyncErrorCollection;
26: use Salient\Utility\Arr;
27: use Salient\Utility\Get;
28: use Salient\Utility\Json;
29: use Salient\Utility\Regex;
30: use Salient\Utility\Str;
31: use Generator;
32: use InvalidArgumentException;
33: use LogicException;
34: use ReflectionClass;
35:
36: /**
37: * Tracks the state of entities synced to and from third-party backends in a
38: * local SQLite database
39: *
40: * Creating a {@see SyncStore} instance starts a sync operation run that must be
41: * terminated by calling {@see SyncStore::close()}, otherwise a failed run is
42: * recorded.
43: */
44: final class SyncStore extends Store implements SyncStoreInterface
45: {
46: private ?int $RunId = null;
47: private string $RunUuid;
48:
49: /**
50: * Prefix => true
51: *
52: * @var array<string,true>
53: */
54: private array $Namespaces = [];
55:
56: /**
57: * Provider ID => provider
58: *
59: * @var array<int,SyncProviderInterface>
60: */
61: private array $Providers = [];
62:
63: /**
64: * Provider hash => provider ID
65: *
66: * @var array<string,int>
67: */
68: private array $ProviderMap = [];
69:
70: /**
71: * Entity type ID => entity type
72: *
73: * @var array<int,class-string<SyncEntityInterface>>
74: */
75: private array $EntityTypes = [];
76:
77: /**
78: * Entity type => entity type ID
79: *
80: * @var array<class-string<SyncEntityInterface>,int>
81: */
82: private array $EntityTypeMap = [];
83:
84: /**
85: * Prefix => PHP namespace with trailing "/"
86: *
87: * @var array<string,string>
88: */
89: private array $NamespacesByPrefix;
90:
91: /**
92: * Prefix => namespace URI with trailing "/"
93: *
94: * @var array<string,string>
95: */
96: private array $NamespaceUrisByPrefix;
97:
98: /**
99: * Prefix => namespace helper
100: *
101: * @var array<string,SyncNamespaceHelperInterface>
102: */
103: private array $NamespaceHelpersByPrefix;
104:
105: /**
106: * Provider ID => entity type ID => entity ID => entity
107: *
108: * @var array<int,array<int,array<int|string,SyncEntityInterface>>>
109: */
110: private array $Entities;
111:
112: /**
113: * SPL object ID => checkpoint
114: *
115: * @var array<int,int>
116: */
117: private array $EntityCheckpoints;
118:
119: /**
120: * Provider ID => entity type ID => entity ID => [ deferred entity ]
121: *
122: * @var array<int,array<int,array<int|string,DeferredEntityInterface<SyncEntityInterface>[]>>>
123: */
124: private array $DeferredEntities = [];
125:
126: /**
127: * Provider ID => entity type ID => requesting entity type ID => requesting
128: * entity property => requesting entity ID => [ deferred relationship ]
129: *
130: * @var array<int,array<int,array<int,array<string,array<int|string,DeferredRelationshipInterface<SyncEntityInterface>[]>>>>>
131: */
132: private array $DeferredRelationships = [];
133:
134: private SyncErrorCollection $Errors;
135: private int $DeferralCheckpoint = 0;
136: private string $Command;
137: /** @var string[] */
138: private array $Arguments;
139: /** @var array<string,array{string,string,SyncNamespaceHelperInterface|null}> */
140: private array $DeferredNamespaces = [];
141: /** @var SyncProviderInterface[] */
142: private array $DeferredProviders = [];
143: /** @var class-string<SyncEntityInterface>[] */
144: private array $DeferredEntityTypes = [];
145:
146: /**
147: * @param string $command The canonical name of the command performing sync
148: * operations (e.g. a qualified class and/or method name).
149: * @param string[] $arguments Arguments passed to the command.
150: */
151: public function __construct(
152: string $filename = ':memory:',
153: string $command = '',
154: array $arguments = []
155: ) {
156: $this->assertCanUpsert();
157:
158: $this->Errors = new SyncErrorCollection();
159: $this->Command = $command;
160: $this->Arguments = $arguments;
161:
162: $this->openDb(
163: $filename,
164: <<<SQL
165: CREATE TABLE IF NOT EXISTS
166: _sync_run (
167: run_id INTEGER NOT NULL PRIMARY KEY,
168: run_uuid BLOB NOT NULL UNIQUE,
169: run_command TEXT NOT NULL,
170: run_arguments_json TEXT NOT NULL,
171: started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
172: finished_at DATETIME,
173: exit_status INTEGER,
174: error_count INTEGER,
175: warning_count INTEGER,
176: errors_json TEXT
177: );
178:
179: CREATE TABLE IF NOT EXISTS
180: _sync_provider (
181: provider_id INTEGER NOT NULL PRIMARY KEY,
182: provider_hash BLOB NOT NULL UNIQUE,
183: provider_class TEXT NOT NULL,
184: added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
185: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
186: );
187:
188: CREATE TABLE IF NOT EXISTS
189: _sync_entity_type (
190: entity_type_id INTEGER NOT NULL PRIMARY KEY,
191: entity_type_class TEXT NOT NULL UNIQUE,
192: added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
193: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
194: );
195:
196: CREATE TABLE IF NOT EXISTS
197: _sync_entity_type_state (
198: provider_id INTEGER NOT NULL,
199: entity_type_id INTEGER NOT NULL,
200: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP,
201: last_sync DATETIME,
202: PRIMARY KEY (provider_id, entity_type_id),
203: FOREIGN KEY (provider_id) REFERENCES _sync_provider,
204: FOREIGN KEY (entity_type_id) REFERENCES _sync_entity_type
205: );
206:
207: CREATE TABLE IF NOT EXISTS
208: _sync_entity (
209: provider_id INTEGER NOT NULL,
210: entity_type_id INTEGER NOT NULL,
211: entity_id TEXT NOT NULL,
212: canonical_id TEXT,
213: is_dirty INTEGER NOT NULL DEFAULT 0,
214: is_deleted INTEGER NOT NULL DEFAULT 0,
215: added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
216: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP,
217: last_sync DATETIME,
218: entity_json TEXT NOT NULL,
219: PRIMARY KEY (provider_id, entity_type_id, entity_id),
220: FOREIGN KEY (provider_id) REFERENCES _sync_provider,
221: FOREIGN KEY (entity_type_id) REFERENCES _sync_entity_type
222: ) WITHOUT ROWID;
223:
224: CREATE TABLE IF NOT EXISTS
225: _sync_entity_namespace (
226: entity_namespace_id INTEGER NOT NULL PRIMARY KEY,
227: entity_namespace_prefix TEXT NOT NULL UNIQUE,
228: base_uri TEXT NOT NULL,
229: php_namespace TEXT NOT NULL,
230: added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
231: last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
232: );
233:
234: SQL
235: );
236:
237: Event::dispatch(new SyncStoreLoadedEvent($this));
238: }
239:
240: /**
241: * Terminate the current run and close the database
242: */
243: public function close(int $exitStatus = 0): void
244: {
245: // Don't start a run now
246: if (!$this->isOpen() || $this->RunId === null) {
247: $this->closeDb();
248: return;
249: }
250:
251: $sql = <<<SQL
252: UPDATE
253: _sync_run
254: SET
255: finished_at = CURRENT_TIMESTAMP,
256: exit_status = :exit_status,
257: error_count = :error_count,
258: warning_count = :warning_count,
259: errors_json = :errors_json
260: WHERE
261: run_uuid = :run_uuid;
262: SQL;
263:
264: $stmt = $this->prepare($sql);
265: $stmt->bindValue(':exit_status', $exitStatus, \SQLITE3_INTEGER);
266: $stmt->bindValue(':run_uuid', $this->RunUuid, \SQLITE3_BLOB);
267: $stmt->bindValue(':error_count', $this->Errors->getErrorCount(), \SQLITE3_INTEGER);
268: $stmt->bindValue(':warning_count', $this->Errors->getWarningCount(), \SQLITE3_INTEGER);
269: $stmt->bindValue(':errors_json', Json::encode($this->Errors), \SQLITE3_TEXT);
270: $stmt->execute();
271: $stmt->close();
272:
273: $this->closeDb();
274: }
275:
276: /**
277: * @phpstan-assert-if-true !null $this->RunId
278: */
279: public function runHasStarted(): bool
280: {
281: return $this->RunId !== null;
282: }
283:
284: /**
285: * @inheritDoc
286: */
287: public function getRunId(): int
288: {
289: $this->assertRunHasStarted();
290:
291: return $this->RunId;
292: }
293:
294: /**
295: * @inheritDoc
296: */
297: public function getRunUuid(): string
298: {
299: $this->assertRunHasStarted();
300:
301: return Get::uuid($this->RunUuid);
302: }
303:
304: /**
305: * @inheritDoc
306: */
307: public function getBinaryRunUuid(): string
308: {
309: $this->assertRunHasStarted();
310:
311: return $this->RunUuid;
312: }
313:
314: /**
315: * @phpstan-assert !null $this->RunId
316: */
317: private function assertRunHasStarted(): void
318: {
319: if ($this->RunId === null) {
320: throw new LogicException('Run has not started');
321: }
322: }
323:
324: /**
325: * @inheritDoc
326: */
327: public function registerProvider(SyncProviderInterface $provider)
328: {
329: // Don't start a run just to register a provider
330: if (!$this->runHasStarted()) {
331: $this->DeferredProviders[] = $provider;
332: return $this;
333: }
334:
335: $class = get_class($provider);
336: $hash = $this->getProviderSignature($provider);
337:
338: if (isset($this->ProviderMap[$hash])) {
339: throw new LogicException(sprintf(
340: 'Provider already registered: %s',
341: $class,
342: ));
343: }
344:
345: // Update `last_seen` if the provider is already in the database
346: $sql = <<<SQL
347: INSERT INTO
348: _sync_provider (provider_hash, provider_class)
349: VALUES
350: (:provider_hash, :provider_class) ON CONFLICT (provider_hash) DO
351: UPDATE
352: SET
353: last_seen = CURRENT_TIMESTAMP;
354: SQL;
355: $stmt = $this->prepare($sql);
356: $stmt->bindValue(':provider_hash', $hash, \SQLITE3_BLOB);
357: $stmt->bindValue(':provider_class', $class, \SQLITE3_TEXT);
358: $stmt->execute();
359: $stmt->close();
360:
361: $sql = <<<SQL
362: SELECT
363: provider_id
364: FROM
365: _sync_provider
366: WHERE
367: provider_hash = :provider_hash;
368: SQL;
369: $stmt = $this->prepare($sql);
370: $stmt->bindValue(':provider_hash', $hash, \SQLITE3_BLOB);
371: $result = $this->execute($stmt);
372: /** @var array{int}|false */
373: $row = $result->fetchArray(\SQLITE3_NUM);
374: $stmt->close();
375:
376: if ($row === false) {
377: // @codeCoverageIgnoreStart
378: throw new SyncStoreException('Error retrieving provider ID');
379: // @codeCoverageIgnoreEnd
380: }
381:
382: $providerId = $row[0];
383: $this->Providers[$providerId] = $provider;
384: $this->ProviderMap[$hash] = $providerId;
385:
386: return $this;
387: }
388:
389: /**
390: * @inheritDoc
391: */
392: public function hasProvider($provider): bool
393: {
394: if ($provider instanceof SyncProviderInterface) {
395: $providers = $this->runHasStarted()
396: ? $this->Providers
397: : $this->DeferredProviders;
398: return in_array($provider, $providers, true);
399: }
400:
401: if (!$this->runHasStarted()) {
402: foreach ($this->DeferredProviders as $deferred) {
403: if ($this->getProviderSignature($deferred) === $provider) {
404: return true;
405: }
406: }
407: return false;
408: }
409:
410: return isset($this->ProviderMap[$provider]);
411: }
412:
413: /**
414: * @inheritDoc
415: */
416: public function getProviderId($provider): int
417: {
418: if (!$this->runHasStarted()) {
419: $this->check();
420: }
421:
422: $hash = $provider instanceof SyncProviderInterface
423: ? $this->getProviderSignature($provider)
424: : $provider;
425: $id = $this->ProviderMap[$hash] ?? null;
426: if ($id === null) {
427: throw new LogicException('Provider not registered'
428: . ($provider instanceof SyncProviderInterface
429: ? sprintf(': %s', get_class($provider))
430: : ''));
431: }
432: return $id;
433: }
434:
435: /**
436: * @inheritDoc
437: */
438: public function getProvider($provider): SyncProviderInterface
439: {
440: if (is_int($provider)) {
441: if (!$this->runHasStarted()) {
442: throw new LogicException(sprintf(
443: 'Provider ID not issued during run: %d',
444: $provider,
445: ));
446: }
447: if (!isset($this->Providers[$provider])) {
448: throw new LogicException(sprintf(
449: 'Provider not registered: #%d',
450: $provider,
451: ));
452: }
453: return $this->Providers[$provider];
454: }
455:
456: // Don't start a run just to get a provider
457: if (!$this->runHasStarted()) {
458: foreach ($this->DeferredProviders as $deferred) {
459: if ($this->getProviderSignature($deferred) === $provider) {
460: return $deferred;
461: }
462: }
463: throw new LogicException('Provider not registered');
464: }
465:
466: $id = $this->ProviderMap[$provider] ?? null;
467: if ($id === null) {
468: throw new LogicException('Provider not registered');
469: }
470: return $this->Providers[$id];
471: }
472:
473: /**
474: * @inheritDoc
475: */
476: public function getProviderSignature(SyncProviderInterface $provider): string
477: {
478: $class = get_class($provider);
479: return Get::binaryHash(implode("\0", [
480: $class,
481: ...$provider->getBackendIdentifier(),
482: ]));
483: }
484:
485: /**
486: * @inheritDoc
487: */
488: public function registerEntityType(string $entityType)
489: {
490: // Don't start a run just to register an entity type
491: if (!$this->runHasStarted()) {
492: $this->DeferredEntityTypes[] = $entityType;
493: return $this;
494: }
495:
496: if (isset($this->EntityTypeMap[$entityType])) {
497: return $this;
498: }
499:
500: $class = new ReflectionClass($entityType);
501:
502: if ($entityType !== $class->getName()) {
503: throw new LogicException(sprintf(
504: 'Not an exact match for declared class (%s expected): %s',
505: $class->getName(),
506: $entityType,
507: ));
508: }
509:
510: if (!$class->implementsInterface(SyncEntityInterface::class)) {
511: // @codeCoverageIgnoreStart
512: throw new LogicException(sprintf(
513: 'Does not implement %s: %s',
514: SyncEntityInterface::class,
515: $entityType,
516: ));
517: // @codeCoverageIgnoreEnd
518: }
519:
520: // Update `last_seen` if the entity type is already in the database
521: $sql = <<<SQL
522: INSERT INTO
523: _sync_entity_type (entity_type_class)
524: VALUES
525: (:entity_type_class) ON CONFLICT (entity_type_class) DO
526: UPDATE
527: SET
528: last_seen = CURRENT_TIMESTAMP;
529: SQL;
530: $stmt = $this->prepare($sql);
531: $stmt->bindValue(':entity_type_class', $entityType, \SQLITE3_TEXT);
532: $stmt->execute();
533: $stmt->close();
534:
535: $sql = <<<SQL
536: SELECT
537: entity_type_id
538: FROM
539: _sync_entity_type
540: WHERE
541: entity_type_class = :entity_type_class;
542: SQL;
543: $stmt = $this->prepare($sql);
544: $stmt->bindValue(':entity_type_class', $entityType, \SQLITE3_TEXT);
545: $result = $this->execute($stmt);
546: /** @var array{int}|false */
547: $row = $result->fetchArray(\SQLITE3_NUM);
548: $stmt->close();
549:
550: if ($row === false) {
551: throw new SyncStoreException('Error retrieving entity type ID');
552: }
553:
554: $this->EntityTypes[$row[0]] = $entityType;
555: $this->EntityTypeMap[$entityType] = $row[0];
556:
557: return $this;
558: }
559:
560: /**
561: * @inheritDoc
562: */
563: public function hasEntityType(string $entityType): bool
564: {
565: if (!$this->runHasStarted()) {
566: return in_array($entityType, $this->DeferredEntityTypes, true);
567: }
568:
569: return isset($this->EntityTypeMap[$entityType]);
570: }
571:
572: /**
573: * @inheritDoc
574: */
575: public function getEntityTypeId(string $entityType): int
576: {
577: if (!$this->runHasStarted()) {
578: $this->check();
579: }
580:
581: $id = $this->EntityTypeMap[$entityType] ?? null;
582: if ($id === null) {
583: throw new LogicException(sprintf(
584: 'Entity not registered: %s',
585: $entityType,
586: ));
587: }
588: return $id;
589: }
590:
591: /**
592: * @inheritDoc
593: */
594: public function getEntityType(int $entityTypeId): string
595: {
596: if (!$this->runHasStarted()) {
597: throw new LogicException(sprintf(
598: 'Entity type ID not issued during run: %d',
599: $entityTypeId,
600: ));
601: }
602: if (!isset($this->EntityTypes[$entityTypeId])) {
603: throw new LogicException(sprintf(
604: 'Entity type not registered: #%d',
605: $entityTypeId,
606: ));
607: }
608: return $this->EntityTypes[$entityTypeId];
609: }
610:
611: /**
612: * @inheritDoc
613: */
614: public function registerNamespace(
615: string $prefix,
616: string $uri,
617: string $namespace,
618: ?SyncNamespaceHelperInterface $helper = null
619: ) {
620: $prefix = Str::lower($prefix);
621: if (isset($this->Namespaces[$prefix]) || (
622: !$this->runHasStarted()
623: && isset($this->DeferredNamespaces[$prefix])
624: )) {
625: throw new LogicException(sprintf(
626: 'Prefix already registered: %s',
627: $prefix,
628: ));
629: }
630:
631: // Namespaces are validated and normalised before deferral so
632: // `classToNamespace()` can be used without starting a run.
633: // `$DeferredNamespaces` is used to ensure it's only done once.
634: if (
635: !isset($this->DeferredNamespaces)
636: || !isset($this->DeferredNamespaces[$prefix])
637: ) {
638: if (!Regex::match('/^[a-z][-a-z0-9+.]*$/iD', $prefix)) {
639: throw new InvalidArgumentException(sprintf(
640: 'Invalid prefix: %s',
641: $prefix,
642: ));
643: }
644: $uri = rtrim($uri, '/') . '/';
645: $namespace = trim($namespace, '\\') . '\\';
646: }
647:
648: // Don't start a run just to register a namespace
649: if (!$this->runHasStarted()) {
650: $this->DeferredNamespaces[$prefix] = [$uri, $namespace, $helper];
651: return $this;
652: }
653:
654: // Update `last_seen` if the namespace is already in the database
655: $sql = <<<SQL
656: INSERT INTO
657: _sync_entity_namespace (entity_namespace_prefix, base_uri, php_namespace)
658: VALUES
659: (
660: :entity_namespace_prefix,
661: :base_uri,
662: :php_namespace
663: ) ON CONFLICT (entity_namespace_prefix) DO
664: UPDATE
665: SET
666: base_uri = excluded.base_uri,
667: php_namespace = excluded.php_namespace,
668: last_seen = CURRENT_TIMESTAMP;
669: SQL;
670: $stmt = $this->prepare($sql);
671: $stmt->bindValue(':entity_namespace_prefix', $prefix, \SQLITE3_TEXT);
672: $stmt->bindValue(':base_uri', $uri, \SQLITE3_TEXT);
673: $stmt->bindValue(':php_namespace', $namespace, \SQLITE3_TEXT);
674: $stmt->execute();
675: $stmt->close();
676:
677: $this->Namespaces[$prefix] = true;
678:
679: if ($helper) {
680: $this->NamespaceHelpersByPrefix[$prefix] = $helper;
681: }
682:
683: // Don't reload while bootstrapping
684: if (!isset($this->NamespacesByPrefix)) {
685: return $this;
686: }
687:
688: return $this->reload();
689: }
690:
691: /**
692: * @inheritDoc
693: */
694: public function getEntityTypeUri(string $entityType, bool $compact = true): string
695: {
696: $prefix = $this->classToNamespace($entityType, $uri, $namespace);
697: if ($prefix === null) {
698: return SyncUtil::getEntityTypeUri($entityType, $compact);
699: }
700: /** @var string $namespace */
701: $entityType = str_replace('\\', '/', substr(ltrim($entityType, '\\'), strlen($namespace)));
702:
703: return $compact
704: ? "{$prefix}:{$entityType}"
705: : "{$uri}{$entityType}";
706: }
707:
708: /**
709: * @inheritDoc
710: */
711: public function getNamespacePrefix(string $class): ?string
712: {
713: return $this->classToNamespace($class);
714: }
715:
716: /**
717: * @inheritDoc
718: */
719: public function getNamespaceHelper(string $class): ?SyncNamespaceHelperInterface
720: {
721: if ($this->classToNamespace(
722: $class,
723: $uri,
724: $namespace,
725: $helper
726: ) === null) {
727: return null;
728: }
729:
730: return $helper;
731: }
732:
733: /**
734: * @param class-string<SyncEntityInterface|SyncProviderInterface> $class
735: * @param-out SyncNamespaceHelperInterface|null $helper
736: */
737: private function classToNamespace(
738: string $class,
739: ?string &$uri = null,
740: ?string &$namespace = null,
741: ?SyncNamespaceHelperInterface &$helper = null
742: ): ?string {
743: $class = Str::lower(ltrim($class, '\\'));
744: // Don't start a run just to resolve a class to a namespace
745: $namespaces = $this->runHasStarted()
746: ? $this->getNamespaces()
747: : $this->DeferredNamespaces;
748: foreach ($namespaces as $prefix => [$_uri, $_namespace, $_helper]) {
749: $_namespace = Str::lower($_namespace);
750: if (strpos($class, $_namespace) === 0) {
751: $uri = $_uri;
752: $namespace = $_namespace;
753: $helper = $_helper;
754: return $prefix;
755: }
756: }
757: return null;
758: }
759:
760: /**
761: * @return Generator<string,array{string,string,SyncNamespaceHelperInterface|null}>
762: */
763: private function getNamespaces(): Generator
764: {
765: foreach ($this->NamespacesByPrefix as $prefix => $namespace) {
766: yield $prefix => [
767: $this->NamespaceUrisByPrefix[$prefix],
768: $namespace,
769: $this->NamespaceHelpersByPrefix[$prefix] ?? null,
770: ];
771: }
772: }
773:
774: /**
775: * @inheritDoc
776: */
777: public function setEntity(
778: int $providerId,
779: string $entityType,
780: $entityId,
781: SyncEntityInterface $entity
782: ) {
783: $entityTypeId = $this->EntityTypeMap[$entityType];
784: if (isset($this->Entities[$providerId][$entityTypeId][$entityId])) {
785: throw new LogicException('Entity already registered');
786: }
787: $this->Entities[$providerId][$entityTypeId][$entityId] = $entity;
788: $this->EntityCheckpoints[spl_object_id($entity)] = $this->DeferralCheckpoint++;
789:
790: // Resolve the entity's entries in the deferred entity queue (if any)
791: $deferred = $this->DeferredEntities[$providerId][$entityTypeId][$entityId] ?? null;
792: if ($deferred) {
793: foreach ($deferred as $i => $deferredEntity) {
794: $deferredEntity->replace($entity);
795: unset($this->DeferredEntities[$providerId][$entityTypeId][$entityId][$i]);
796: }
797: unset($this->DeferredEntities[$providerId][$entityTypeId][$entityId]);
798: }
799:
800: return $this;
801: }
802:
803: /**
804: * @template TEntity of SyncEntityInterface
805: *
806: * @param class-string<TEntity> $entityType
807: * @return TEntity|null
808: */
809: public function getEntity(
810: int $providerId,
811: string $entityType,
812: $entityId,
813: ?bool $offline = null
814: ): ?SyncEntityInterface {
815: $entityTypeId = $this->EntityTypeMap[$entityType];
816: /** @var TEntity|null */
817: $entity = $this->Entities[$providerId][$entityTypeId][$entityId] ?? null;
818: if ($entity || $offline === false) {
819: return $entity;
820: }
821: return null;
822: }
823:
824: /**
825: * @template TEntity of SyncEntityInterface
826: *
827: * @param class-string<TEntity> $entityType
828: * @param DeferredEntityInterface<TEntity> $entity
829: */
830: public function deferEntity(
831: int $providerId,
832: string $entityType,
833: $entityId,
834: DeferredEntityInterface $entity
835: ) {
836: $entityTypeId = $this->EntityTypeMap[$entityType];
837: /** @var TEntity|null */
838: $_entity = $this->Entities[$providerId][$entityTypeId][$entityId] ?? null;
839: if ($_entity) {
840: $entity->replace($_entity);
841: return $this;
842: }
843:
844: // Get the deferral policy of the context within which the entity was
845: // deferred
846: $context = $entity->getContext();
847: if ($context) {
848: $last = $context->getLastEntity();
849: if ($last) {
850: $context = $last->getContext();
851: }
852: }
853: $policy = $context
854: ? $context->getDeferralPolicy()
855: : null;
856:
857: $this->DeferredEntities[$providerId][$entityTypeId][$entityId][
858: $this->DeferralCheckpoint++
859: ] = $entity;
860:
861: // In `RESOLVE_EARLY` mode, deferred entities are added to
862: // `$this->DeferredEntities` for the benefit of `$this->setEntity()`,
863: // which only calls `DeferredEntityInterface::replace()` method on
864: // registered instances
865: if ($policy === DeferralPolicy::RESOLVE_EARLY) {
866: $entity->resolve();
867: return $this;
868: }
869:
870: return $this;
871: }
872:
873: /**
874: * @inheritDoc
875: */
876: public function deferRelationship(
877: int $providerId,
878: string $entityType,
879: string $forEntityType,
880: string $forEntityProperty,
881: $forEntityId,
882: DeferredRelationshipInterface $relationship
883: ) {
884: $entityTypeId = $this->EntityTypeMap[$entityType];
885: $forEntityTypeId = $this->EntityTypeMap[$forEntityType];
886:
887: /** @var DeferredRelationshipInterface<SyncEntityInterface>[]|null */
888: $deferredList = &$this->DeferredRelationships[$providerId][$entityTypeId][
889: $forEntityTypeId
890: ][$forEntityProperty][$forEntityId];
891:
892: if (isset($deferredList)) {
893: throw new LogicException('Relationship already registered');
894: }
895:
896: $deferredList = [];
897:
898: // Get hydration policy from the context within which the deferral was
899: // created
900: $context = $relationship->getContext();
901: if ($context) {
902: $last = $context->getLastEntity();
903: if ($last) {
904: $context = $last->getContext();
905: }
906: }
907: $policy = $context
908: ? $context->getHydrationPolicy($entityType)
909: : 0;
910:
911: if ($policy === HydrationPolicy::LAZY) {
912: return $this;
913: }
914:
915: if ($policy === HydrationPolicy::EAGER) {
916: $relationship->resolve();
917: return $this;
918: }
919:
920: $deferredList[$this->DeferralCheckpoint++] = $relationship;
921: return $this;
922: }
923:
924: /**
925: * @inheritDoc
926: */
927: public function resolveDeferrals(
928: ?int $fromCheckpoint = null,
929: ?string $entityType = null,
930: ?int $providerId = null
931: ): array {
932: $checkpoint = $this->DeferralCheckpoint;
933: do {
934: // Resolve relationships first because they typically deliver
935: // multiple entities per round trip, some of which may be in the
936: // deferred entity queue
937: $deferred = $this->resolveDeferredRelationships($fromCheckpoint, $entityType, null, null, $providerId);
938: if ($deferred) {
939: foreach ($deferred as $relationship) {
940: foreach ($relationship as $entity) {
941: $objectId = spl_object_id($entity);
942: if ($this->EntityCheckpoints[$objectId] < $checkpoint) {
943: continue;
944: }
945: $resolved[$objectId] = $entity;
946: }
947: }
948: continue;
949: }
950:
951: $deferred = $this->resolveDeferredEntities($fromCheckpoint, $entityType, $providerId);
952: if (!$deferred) {
953: continue;
954: }
955: foreach ($deferred as $entity) {
956: $resolved[spl_object_id($entity)] = $entity;
957: }
958: } while ($deferred);
959:
960: return array_values($resolved ?? []);
961: }
962:
963: /**
964: * @inheritDoc
965: */
966: public function getDeferralCheckpoint(): int
967: {
968: return $this->DeferralCheckpoint;
969: }
970:
971: /**
972: * @inheritDoc
973: */
974: public function resolveDeferredEntities(
975: ?int $fromCheckpoint = null,
976: ?string $entityType = null,
977: ?int $providerId = null
978: ): array {
979: $entityTypeId = $entityType === null
980: ? null
981: : $this->EntityTypeMap[$entityType];
982:
983: $resolved = [];
984: foreach ($this->DeferredEntities as $provId => $entitiesByTypeId) {
985: if ($providerId !== null && $provId !== $providerId) {
986: continue;
987: }
988: foreach ($entitiesByTypeId as $entTypeId => $entities) {
989: if ($entityTypeId !== null && $entTypeId !== $entityTypeId) {
990: continue;
991: }
992:
993: if ($fromCheckpoint !== null) {
994: $_entities = $entities;
995: foreach ($_entities as $entityId => $deferred) {
996: foreach ($deferred as $i => $deferredEntity) {
997: if ($i < $fromCheckpoint) {
998: unset($_entities[$entityId][$i]);
999: if (!$_entities[$entityId]) {
1000: unset($_entities[$entityId]);
1001: }
1002: }
1003: }
1004: }
1005: if (!$_entities) {
1006: continue;
1007: }
1008: $entities = $_entities;
1009: }
1010:
1011: /** @var array<int|string,non-empty-array<DeferredEntityInterface<SyncEntityInterface>>> $entities */
1012: foreach ($entities as $entityId => $deferred) {
1013: $deferredEntity = reset($deferred);
1014: $resolved[] = $deferredEntity->resolve();
1015: }
1016: }
1017: }
1018:
1019: return $resolved;
1020: }
1021:
1022: /**
1023: * @inheritDoc
1024: */
1025: public function resolveDeferredRelationships(
1026: ?int $fromCheckpoint = null,
1027: ?string $entityType = null,
1028: ?string $forEntityType = null,
1029: ?string $forEntityProperty = null,
1030: ?int $providerId = null
1031: ): array {
1032: $entityTypeId = $entityType === null
1033: ? null
1034: : $this->EntityTypeMap[$entityType];
1035: $forEntityTypeId = $forEntityType === null
1036: ? null
1037: : $this->EntityTypeMap[$forEntityType];
1038:
1039: $resolved = [];
1040: foreach ($this->DeferredRelationships as $provId => $relationshipsByEntTypeId) {
1041: if ($providerId !== null && $provId !== $providerId) {
1042: continue;
1043: }
1044: foreach ($relationshipsByEntTypeId as $entTypeId => $relationshipsByForEntTypeId) {
1045: if ($entityTypeId !== null && $entTypeId !== $entityTypeId) {
1046: continue;
1047: }
1048: foreach ($relationshipsByForEntTypeId as $forEntTypeId => $relationshipsByForEntProp) {
1049: if ($forEntityTypeId !== null && $forEntTypeId !== $forEntityTypeId) {
1050: continue;
1051: }
1052: foreach ($relationshipsByForEntProp as $forEntProp => $relationshipsByForEntId) {
1053: if ($forEntityProperty !== null && $forEntProp !== $forEntityProperty) {
1054: continue;
1055: }
1056: foreach ($relationshipsByForEntId as $forEntId => $relationships) {
1057: if ($fromCheckpoint !== null) {
1058: $_relationships = $relationships;
1059: foreach ($_relationships as $index => $deferred) {
1060: if ($index < $fromCheckpoint) {
1061: unset($_relationships[$index]);
1062: }
1063: }
1064: if (!$_relationships) {
1065: continue;
1066: }
1067: $relationships = $_relationships;
1068: }
1069:
1070: foreach ($relationships as $index => $deferred) {
1071: $resolved[] = $deferred->resolve();
1072: unset($this->DeferredRelationships[$provId][$entTypeId][$forEntTypeId][$forEntProp][$forEntId][$index]);
1073: }
1074: }
1075: }
1076: }
1077: }
1078: }
1079:
1080: return $resolved;
1081: }
1082:
1083: /**
1084: * @inheritDoc
1085: */
1086: public function checkProviderHeartbeats(
1087: int $ttl = 300,
1088: bool $failEarly = true,
1089: SyncProviderInterface ...$providers
1090: ) {
1091: $this->check();
1092:
1093: if ($providers) {
1094: $providers = Arr::unique($providers);
1095: } elseif ($this->Providers) {
1096: $providers = $this->Providers;
1097: } else {
1098: return $this;
1099: }
1100:
1101: $failed = [];
1102: /** @var SyncProviderInterface $provider */
1103: foreach ($providers as $provider) {
1104: $id = $provider->getProviderId();
1105: $name = sprintf('%s [#%d]', $provider->getName(), $id);
1106: Console::logProgress('Checking', $name);
1107: try {
1108: $provider->checkHeartbeat($ttl);
1109: Console::log('Heartbeat OK:', $name);
1110: } catch (MethodNotImplementedException $ex) {
1111: Console::log('Heartbeat check not supported:', $name);
1112: } catch (UnreachableBackendExceptionInterface $ex) {
1113: Console::exception($ex, Console::LEVEL_DEBUG, null);
1114: Console::log('Heartbeat check failed:', $name);
1115: $failed[] = $provider;
1116: $this->recordError(
1117: SyncError::build()
1118: ->errorType(ErrorType::BACKEND_UNREACHABLE)
1119: ->message('Heartbeat check failed: %s')
1120: ->values([[
1121: 'provider_id' => $id,
1122: 'provider_class' => get_class($provider),
1123: 'exception' => get_class($ex),
1124: 'message' => $ex->getMessage()
1125: ]])
1126: ->build()
1127: );
1128: }
1129: if ($failEarly && $failed) {
1130: break;
1131: }
1132: }
1133:
1134: if ($failed) {
1135: throw new HeartbeatCheckFailedException(...$failed);
1136: }
1137:
1138: return $this;
1139: }
1140:
1141: /**
1142: * @inheritDoc
1143: */
1144: public function recordError(SyncErrorInterface $error, bool $deduplicate = false)
1145: {
1146: if ($deduplicate) {
1147: $key = $this->Errors->keyOf($error);
1148: if ($key !== null) {
1149: $this->Errors[$key] = $this->Errors[$key]->count();
1150: return $this;
1151: }
1152: }
1153:
1154: $this->Errors[] = $error;
1155:
1156: return $this;
1157: }
1158:
1159: /**
1160: * @inheritDoc
1161: */
1162: public function getErrors(): SyncErrorCollectionInterface
1163: {
1164: return clone $this->Errors;
1165: }
1166:
1167: /**
1168: * @phpstan-assert !null $this->RunId
1169: */
1170: protected function check()
1171: {
1172: if ($this->RunId !== null) {
1173: return $this;
1174: }
1175:
1176: if (!$this->isCheckRunning()) {
1177: return $this->safeCheck();
1178: }
1179:
1180: $sql = <<<SQL
1181: INSERT INTO _sync_run (run_uuid, run_command, run_arguments_json)
1182: VALUES (
1183: :run_uuid,
1184: :run_command,
1185: :run_arguments_json
1186: );
1187: SQL;
1188:
1189: $stmt = $this->prepare($sql);
1190: $stmt->bindValue(':run_uuid', $uuid = Get::binaryUuid(), \SQLITE3_BLOB);
1191: $stmt->bindValue(':run_command', $this->Command, \SQLITE3_TEXT);
1192: $stmt->bindValue(':run_arguments_json', Json::encode($this->Arguments), \SQLITE3_TEXT);
1193: $stmt->execute();
1194: $stmt->close();
1195:
1196: $id = $this->db()->lastInsertRowID();
1197: $this->RunId = $id;
1198: $this->RunUuid = $uuid;
1199: unset($this->Command, $this->Arguments);
1200:
1201: foreach ($this->DeferredProviders as $provider) {
1202: $this->registerProvider($provider);
1203: }
1204: unset($this->DeferredProviders);
1205:
1206: foreach ($this->DeferredEntityTypes as $entity) {
1207: $this->registerEntityType($entity);
1208: }
1209: unset($this->DeferredEntityTypes);
1210:
1211: foreach ($this->DeferredNamespaces as $prefix => [$uri, $namespace, $helper]) {
1212: $this->registerNamespace($prefix, $uri, $namespace, $helper);
1213: }
1214: unset($this->DeferredNamespaces);
1215:
1216: return $this->reload();
1217: }
1218:
1219: /**
1220: * @return $this
1221: */
1222: private function reload()
1223: {
1224: $db = $this->db();
1225: $sql = <<<SQL
1226: SELECT
1227: entity_namespace_prefix,
1228: base_uri,
1229: php_namespace
1230: FROM
1231: _sync_entity_namespace
1232: ORDER BY
1233: LENGTH(php_namespace) DESC;
1234: SQL;
1235: $stmt = $this->prepare($sql);
1236: $result = $this->execute($stmt);
1237: $this->NamespacesByPrefix = [];
1238: $this->NamespaceUrisByPrefix = [];
1239: while (($row = $result->fetchArray(\SQLITE3_NUM)) !== false) {
1240: /** @var array{string,string,string} $row */
1241: $this->NamespacesByPrefix[$row[0]] = $row[2];
1242: $this->NamespaceUrisByPrefix[$row[0]] = $row[1];
1243: }
1244: $result->finalize();
1245: $stmt->close();
1246:
1247: return $this;
1248: }
1249:
1250: /**
1251: * @internal
1252: */
1253: public function __destruct()
1254: {
1255: $exitStatus = -1;
1256: if (Err::isLoaded() && Err::isShuttingDown()) {
1257: $exitStatus = Err::getExitStatus();
1258: }
1259: $this->close($exitStatus);
1260: }
1261: }
1262: