1: | <?php declare(strict_types=1); |
2: | |
3: | namespace Salient\Sync; |
4: | |
5: | use Salient\Contract\Core\Exception\MethodNotImplementedException; |
6: | use Salient\Contract\Sync\Exception\UnreachableBackendExceptionInterface; |
7: | use Salient\Contract\Sync\DeferralPolicy; |
8: | use Salient\Contract\Sync\DeferredEntityInterface; |
9: | use Salient\Contract\Sync\DeferredRelationshipInterface; |
10: | use Salient\Contract\Sync\ErrorType; |
11: | use Salient\Contract\Sync\HydrationPolicy; |
12: | use Salient\Contract\Sync\SyncEntityInterface; |
13: | use Salient\Contract\Sync\SyncErrorCollectionInterface; |
14: | use Salient\Contract\Sync\SyncErrorInterface; |
15: | use Salient\Contract\Sync\SyncNamespaceHelperInterface; |
16: | use Salient\Contract\Sync\SyncProviderInterface; |
17: | use Salient\Contract\Sync\SyncStoreInterface; |
18: | use Salient\Core\Facade\Console; |
19: | use Salient\Core\Facade\Err; |
20: | use Salient\Core\Facade\Event; |
21: | use Salient\Core\Store; |
22: | use Salient\Sync\Event\SyncStoreLoadedEvent; |
23: | use Salient\Sync\Exception\HeartbeatCheckFailedException; |
24: | use Salient\Sync\Exception\SyncStoreException; |
25: | use Salient\Sync\Support\SyncErrorCollection; |
26: | use Salient\Utility\Arr; |
27: | use Salient\Utility\Get; |
28: | use Salient\Utility\Json; |
29: | use Salient\Utility\Regex; |
30: | use Salient\Utility\Str; |
31: | use Generator; |
32: | use InvalidArgumentException; |
33: | use LogicException; |
34: | use ReflectionClass; |
35: | |
36: | |
37: | |
38: | |
39: | |
40: | |
41: | |
42: | |
43: | |
44: | final class SyncStore extends Store implements SyncStoreInterface |
45: | { |
46: | private ?int $RunId = null; |
47: | private string $RunUuid; |
48: | |
49: | |
50: | |
51: | |
52: | |
53: | |
54: | private array $Namespaces = []; |
55: | |
56: | |
57: | |
58: | |
59: | |
60: | |
61: | private array $Providers = []; |
62: | |
63: | |
64: | |
65: | |
66: | |
67: | |
68: | private array $ProviderMap = []; |
69: | |
70: | |
71: | |
72: | |
73: | |
74: | |
75: | private array $EntityTypes = []; |
76: | |
77: | |
78: | |
79: | |
80: | |
81: | |
82: | private array $EntityTypeMap = []; |
83: | |
84: | |
85: | |
86: | |
87: | |
88: | |
89: | private array $NamespacesByPrefix; |
90: | |
91: | |
92: | |
93: | |
94: | |
95: | |
96: | private array $NamespaceUrisByPrefix; |
97: | |
98: | |
99: | |
100: | |
101: | |
102: | |
103: | private array $NamespaceHelpersByPrefix; |
104: | |
105: | |
106: | |
107: | |
108: | |
109: | |
110: | private array $Entities; |
111: | |
112: | |
113: | |
114: | |
115: | |
116: | |
117: | private array $EntityCheckpoints; |
118: | |
119: | |
120: | |
121: | |
122: | |
123: | |
124: | private array $DeferredEntities = []; |
125: | |
126: | |
127: | |
128: | |
129: | |
130: | |
131: | |
132: | private array $DeferredRelationships = []; |
133: | |
134: | private SyncErrorCollection $Errors; |
135: | private int $DeferralCheckpoint = 0; |
136: | private string $Command; |
137: | |
138: | private array $Arguments; |
139: | |
140: | private array $DeferredNamespaces = []; |
141: | |
142: | private array $DeferredProviders = []; |
143: | |
144: | private array $DeferredEntityTypes = []; |
145: | |
146: | |
147: | |
148: | |
149: | |
150: | |
151: | public function __construct( |
152: | string $filename = ':memory:', |
153: | string $command = '', |
154: | array $arguments = [] |
155: | ) { |
156: | $this->assertCanUpsert(); |
157: | |
158: | $this->Errors = new SyncErrorCollection(); |
159: | $this->Command = $command; |
160: | $this->Arguments = $arguments; |
161: | |
162: | $this->openDb( |
163: | $filename, |
164: | <<<SQL |
165: | CREATE TABLE IF NOT EXISTS |
166: | _sync_run ( |
167: | run_id INTEGER NOT NULL PRIMARY KEY, |
168: | run_uuid BLOB NOT NULL UNIQUE, |
169: | run_command TEXT NOT NULL, |
170: | run_arguments_json TEXT NOT NULL, |
171: | started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
172: | finished_at DATETIME, |
173: | exit_status INTEGER, |
174: | error_count INTEGER, |
175: | warning_count INTEGER, |
176: | errors_json TEXT |
177: | ); |
178: | |
179: | CREATE TABLE IF NOT EXISTS |
180: | _sync_provider ( |
181: | provider_id INTEGER NOT NULL PRIMARY KEY, |
182: | provider_hash BLOB NOT NULL UNIQUE, |
183: | provider_class TEXT NOT NULL, |
184: | added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
185: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP |
186: | ); |
187: | |
188: | CREATE TABLE IF NOT EXISTS |
189: | _sync_entity_type ( |
190: | entity_type_id INTEGER NOT NULL PRIMARY KEY, |
191: | entity_type_class TEXT NOT NULL UNIQUE, |
192: | added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
193: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP |
194: | ); |
195: | |
196: | CREATE TABLE IF NOT EXISTS |
197: | _sync_entity_type_state ( |
198: | provider_id INTEGER NOT NULL, |
199: | entity_type_id INTEGER NOT NULL, |
200: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP, |
201: | last_sync DATETIME, |
202: | PRIMARY KEY (provider_id, entity_type_id), |
203: | FOREIGN KEY (provider_id) REFERENCES _sync_provider, |
204: | FOREIGN KEY (entity_type_id) REFERENCES _sync_entity_type |
205: | ); |
206: | |
207: | CREATE TABLE IF NOT EXISTS |
208: | _sync_entity ( |
209: | provider_id INTEGER NOT NULL, |
210: | entity_type_id INTEGER NOT NULL, |
211: | entity_id TEXT NOT NULL, |
212: | canonical_id TEXT, |
213: | is_dirty INTEGER NOT NULL DEFAULT 0, |
214: | is_deleted INTEGER NOT NULL DEFAULT 0, |
215: | added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
216: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP, |
217: | last_sync DATETIME, |
218: | entity_json TEXT NOT NULL, |
219: | PRIMARY KEY (provider_id, entity_type_id, entity_id), |
220: | FOREIGN KEY (provider_id) REFERENCES _sync_provider, |
221: | FOREIGN KEY (entity_type_id) REFERENCES _sync_entity_type |
222: | ) WITHOUT ROWID; |
223: | |
224: | CREATE TABLE IF NOT EXISTS |
225: | _sync_entity_namespace ( |
226: | entity_namespace_id INTEGER NOT NULL PRIMARY KEY, |
227: | entity_namespace_prefix TEXT NOT NULL UNIQUE, |
228: | base_uri TEXT NOT NULL, |
229: | php_namespace TEXT NOT NULL, |
230: | added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
231: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP |
232: | ); |
233: | |
234: | SQL |
235: | ); |
236: | |
237: | Event::dispatch(new SyncStoreLoadedEvent($this)); |
238: | } |
239: | |
240: | |
241: | |
242: | |
243: | public function close(int $exitStatus = 0): void |
244: | { |
245: | |
246: | if (!$this->isOpen() || $this->RunId === null) { |
247: | $this->closeDb(); |
248: | return; |
249: | } |
250: | |
251: | $sql = <<<SQL |
252: | UPDATE |
253: | _sync_run |
254: | SET |
255: | finished_at = CURRENT_TIMESTAMP, |
256: | exit_status = :exit_status, |
257: | error_count = :error_count, |
258: | warning_count = :warning_count, |
259: | errors_json = :errors_json |
260: | WHERE |
261: | run_uuid = :run_uuid; |
262: | SQL; |
263: | |
264: | $stmt = $this->prepare($sql); |
265: | $stmt->bindValue(':exit_status', $exitStatus, \SQLITE3_INTEGER); |
266: | $stmt->bindValue(':run_uuid', $this->RunUuid, \SQLITE3_BLOB); |
267: | $stmt->bindValue(':error_count', $this->Errors->getErrorCount(), \SQLITE3_INTEGER); |
268: | $stmt->bindValue(':warning_count', $this->Errors->getWarningCount(), \SQLITE3_INTEGER); |
269: | $stmt->bindValue(':errors_json', Json::encode($this->Errors), \SQLITE3_TEXT); |
270: | $stmt->execute(); |
271: | $stmt->close(); |
272: | |
273: | $this->closeDb(); |
274: | } |
275: | |
276: | |
277: | |
278: | |
279: | public function runHasStarted(): bool |
280: | { |
281: | return $this->RunId !== null; |
282: | } |
283: | |
284: | |
285: | |
286: | |
287: | public function getRunId(): int |
288: | { |
289: | $this->assertRunHasStarted(); |
290: | |
291: | return $this->RunId; |
292: | } |
293: | |
294: | |
295: | |
296: | |
297: | public function getRunUuid(): string |
298: | { |
299: | $this->assertRunHasStarted(); |
300: | |
301: | return Get::uuid($this->RunUuid); |
302: | } |
303: | |
304: | |
305: | |
306: | |
307: | public function getBinaryRunUuid(): string |
308: | { |
309: | $this->assertRunHasStarted(); |
310: | |
311: | return $this->RunUuid; |
312: | } |
313: | |
314: | |
315: | |
316: | |
317: | private function assertRunHasStarted(): void |
318: | { |
319: | if ($this->RunId === null) { |
320: | throw new LogicException('Run has not started'); |
321: | } |
322: | } |
323: | |
324: | |
325: | |
326: | |
327: | public function registerProvider(SyncProviderInterface $provider) |
328: | { |
329: | |
330: | if (!$this->runHasStarted()) { |
331: | $this->DeferredProviders[] = $provider; |
332: | return $this; |
333: | } |
334: | |
335: | $class = get_class($provider); |
336: | $hash = $this->getProviderSignature($provider); |
337: | |
338: | if (isset($this->ProviderMap[$hash])) { |
339: | throw new LogicException(sprintf( |
340: | 'Provider already registered: %s', |
341: | $class, |
342: | )); |
343: | } |
344: | |
345: | |
346: | $sql = <<<SQL |
347: | INSERT INTO |
348: | _sync_provider (provider_hash, provider_class) |
349: | VALUES |
350: | (:provider_hash, :provider_class) ON CONFLICT (provider_hash) DO |
351: | UPDATE |
352: | SET |
353: | last_seen = CURRENT_TIMESTAMP; |
354: | SQL; |
355: | $stmt = $this->prepare($sql); |
356: | $stmt->bindValue(':provider_hash', $hash, \SQLITE3_BLOB); |
357: | $stmt->bindValue(':provider_class', $class, \SQLITE3_TEXT); |
358: | $stmt->execute(); |
359: | $stmt->close(); |
360: | |
361: | $sql = <<<SQL |
362: | SELECT |
363: | provider_id |
364: | FROM |
365: | _sync_provider |
366: | WHERE |
367: | provider_hash = :provider_hash; |
368: | SQL; |
369: | $stmt = $this->prepare($sql); |
370: | $stmt->bindValue(':provider_hash', $hash, \SQLITE3_BLOB); |
371: | $result = $this->execute($stmt); |
372: | |
373: | $row = $result->fetchArray(\SQLITE3_NUM); |
374: | $stmt->close(); |
375: | |
376: | if ($row === false) { |
377: | |
378: | throw new SyncStoreException('Error retrieving provider ID'); |
379: | |
380: | } |
381: | |
382: | $providerId = $row[0]; |
383: | $this->Providers[$providerId] = $provider; |
384: | $this->ProviderMap[$hash] = $providerId; |
385: | |
386: | return $this; |
387: | } |
388: | |
389: | |
390: | |
391: | |
392: | public function hasProvider($provider): bool |
393: | { |
394: | if ($provider instanceof SyncProviderInterface) { |
395: | $providers = $this->runHasStarted() |
396: | ? $this->Providers |
397: | : $this->DeferredProviders; |
398: | return in_array($provider, $providers, true); |
399: | } |
400: | |
401: | if (!$this->runHasStarted()) { |
402: | foreach ($this->DeferredProviders as $deferred) { |
403: | if ($this->getProviderSignature($deferred) === $provider) { |
404: | return true; |
405: | } |
406: | } |
407: | return false; |
408: | } |
409: | |
410: | return isset($this->ProviderMap[$provider]); |
411: | } |
412: | |
413: | |
414: | |
415: | |
416: | public function getProviderId($provider): int |
417: | { |
418: | if (!$this->runHasStarted()) { |
419: | $this->check(); |
420: | } |
421: | |
422: | $hash = $provider instanceof SyncProviderInterface |
423: | ? $this->getProviderSignature($provider) |
424: | : $provider; |
425: | $id = $this->ProviderMap[$hash] ?? null; |
426: | if ($id === null) { |
427: | throw new LogicException('Provider not registered' |
428: | . ($provider instanceof SyncProviderInterface |
429: | ? sprintf(': %s', get_class($provider)) |
430: | : '')); |
431: | } |
432: | return $id; |
433: | } |
434: | |
435: | |
436: | |
437: | |
438: | public function getProvider($provider): SyncProviderInterface |
439: | { |
440: | if (is_int($provider)) { |
441: | if (!$this->runHasStarted()) { |
442: | throw new LogicException(sprintf( |
443: | 'Provider ID not issued during run: %d', |
444: | $provider, |
445: | )); |
446: | } |
447: | if (!isset($this->Providers[$provider])) { |
448: | throw new LogicException(sprintf( |
449: | 'Provider not registered: #%d', |
450: | $provider, |
451: | )); |
452: | } |
453: | return $this->Providers[$provider]; |
454: | } |
455: | |
456: | |
457: | if (!$this->runHasStarted()) { |
458: | foreach ($this->DeferredProviders as $deferred) { |
459: | if ($this->getProviderSignature($deferred) === $provider) { |
460: | return $deferred; |
461: | } |
462: | } |
463: | throw new LogicException('Provider not registered'); |
464: | } |
465: | |
466: | $id = $this->ProviderMap[$provider] ?? null; |
467: | if ($id === null) { |
468: | throw new LogicException('Provider not registered'); |
469: | } |
470: | return $this->Providers[$id]; |
471: | } |
472: | |
473: | |
474: | |
475: | |
476: | public function getProviderSignature(SyncProviderInterface $provider): string |
477: | { |
478: | $class = get_class($provider); |
479: | return Get::binaryHash(implode("\0", [ |
480: | $class, |
481: | ...$provider->getBackendIdentifier(), |
482: | ])); |
483: | } |
484: | |
485: | |
486: | |
487: | |
488: | public function registerEntityType(string $entityType) |
489: | { |
490: | |
491: | if (!$this->runHasStarted()) { |
492: | $this->DeferredEntityTypes[] = $entityType; |
493: | return $this; |
494: | } |
495: | |
496: | if (isset($this->EntityTypeMap[$entityType])) { |
497: | return $this; |
498: | } |
499: | |
500: | $class = new ReflectionClass($entityType); |
501: | |
502: | if ($entityType !== $class->getName()) { |
503: | throw new LogicException(sprintf( |
504: | 'Not an exact match for declared class (%s expected): %s', |
505: | $class->getName(), |
506: | $entityType, |
507: | )); |
508: | } |
509: | |
510: | if (!$class->implementsInterface(SyncEntityInterface::class)) { |
511: | |
512: | throw new LogicException(sprintf( |
513: | 'Does not implement %s: %s', |
514: | SyncEntityInterface::class, |
515: | $entityType, |
516: | )); |
517: | |
518: | } |
519: | |
520: | |
521: | $sql = <<<SQL |
522: | INSERT INTO |
523: | _sync_entity_type (entity_type_class) |
524: | VALUES |
525: | (:entity_type_class) ON CONFLICT (entity_type_class) DO |
526: | UPDATE |
527: | SET |
528: | last_seen = CURRENT_TIMESTAMP; |
529: | SQL; |
530: | $stmt = $this->prepare($sql); |
531: | $stmt->bindValue(':entity_type_class', $entityType, \SQLITE3_TEXT); |
532: | $stmt->execute(); |
533: | $stmt->close(); |
534: | |
535: | $sql = <<<SQL |
536: | SELECT |
537: | entity_type_id |
538: | FROM |
539: | _sync_entity_type |
540: | WHERE |
541: | entity_type_class = :entity_type_class; |
542: | SQL; |
543: | $stmt = $this->prepare($sql); |
544: | $stmt->bindValue(':entity_type_class', $entityType, \SQLITE3_TEXT); |
545: | $result = $this->execute($stmt); |
546: | |
547: | $row = $result->fetchArray(\SQLITE3_NUM); |
548: | $stmt->close(); |
549: | |
550: | if ($row === false) { |
551: | throw new SyncStoreException('Error retrieving entity type ID'); |
552: | } |
553: | |
554: | $this->EntityTypes[$row[0]] = $entityType; |
555: | $this->EntityTypeMap[$entityType] = $row[0]; |
556: | |
557: | return $this; |
558: | } |
559: | |
560: | |
561: | |
562: | |
563: | public function hasEntityType(string $entityType): bool |
564: | { |
565: | if (!$this->runHasStarted()) { |
566: | return in_array($entityType, $this->DeferredEntityTypes, true); |
567: | } |
568: | |
569: | return isset($this->EntityTypeMap[$entityType]); |
570: | } |
571: | |
572: | |
573: | |
574: | |
575: | public function getEntityTypeId(string $entityType): int |
576: | { |
577: | if (!$this->runHasStarted()) { |
578: | $this->check(); |
579: | } |
580: | |
581: | $id = $this->EntityTypeMap[$entityType] ?? null; |
582: | if ($id === null) { |
583: | throw new LogicException(sprintf( |
584: | 'Entity not registered: %s', |
585: | $entityType, |
586: | )); |
587: | } |
588: | return $id; |
589: | } |
590: | |
591: | |
592: | |
593: | |
594: | public function getEntityType(int $entityTypeId): string |
595: | { |
596: | if (!$this->runHasStarted()) { |
597: | throw new LogicException(sprintf( |
598: | 'Entity type ID not issued during run: %d', |
599: | $entityTypeId, |
600: | )); |
601: | } |
602: | if (!isset($this->EntityTypes[$entityTypeId])) { |
603: | throw new LogicException(sprintf( |
604: | 'Entity type not registered: #%d', |
605: | $entityTypeId, |
606: | )); |
607: | } |
608: | return $this->EntityTypes[$entityTypeId]; |
609: | } |
610: | |
611: | |
612: | |
613: | |
614: | public function registerNamespace( |
615: | string $prefix, |
616: | string $uri, |
617: | string $namespace, |
618: | ?SyncNamespaceHelperInterface $helper = null |
619: | ) { |
620: | $prefix = Str::lower($prefix); |
621: | if (isset($this->Namespaces[$prefix]) || ( |
622: | !$this->runHasStarted() |
623: | && isset($this->DeferredNamespaces[$prefix]) |
624: | )) { |
625: | throw new LogicException(sprintf( |
626: | 'Prefix already registered: %s', |
627: | $prefix, |
628: | )); |
629: | } |
630: | |
631: | |
632: | |
633: | |
634: | if ( |
635: | !isset($this->DeferredNamespaces) |
636: | || !isset($this->DeferredNamespaces[$prefix]) |
637: | ) { |
638: | if (!Regex::match('/^[a-z][-a-z0-9+.]*$/iD', $prefix)) { |
639: | throw new InvalidArgumentException(sprintf( |
640: | 'Invalid prefix: %s', |
641: | $prefix, |
642: | )); |
643: | } |
644: | $uri = rtrim($uri, '/') . '/'; |
645: | $namespace = trim($namespace, '\\') . '\\'; |
646: | } |
647: | |
648: | |
649: | if (!$this->runHasStarted()) { |
650: | $this->DeferredNamespaces[$prefix] = [$uri, $namespace, $helper]; |
651: | return $this; |
652: | } |
653: | |
654: | |
655: | $sql = <<<SQL |
656: | INSERT INTO |
657: | _sync_entity_namespace (entity_namespace_prefix, base_uri, php_namespace) |
658: | VALUES |
659: | ( |
660: | :entity_namespace_prefix, |
661: | :base_uri, |
662: | :php_namespace |
663: | ) ON CONFLICT (entity_namespace_prefix) DO |
664: | UPDATE |
665: | SET |
666: | base_uri = excluded.base_uri, |
667: | php_namespace = excluded.php_namespace, |
668: | last_seen = CURRENT_TIMESTAMP; |
669: | SQL; |
670: | $stmt = $this->prepare($sql); |
671: | $stmt->bindValue(':entity_namespace_prefix', $prefix, \SQLITE3_TEXT); |
672: | $stmt->bindValue(':base_uri', $uri, \SQLITE3_TEXT); |
673: | $stmt->bindValue(':php_namespace', $namespace, \SQLITE3_TEXT); |
674: | $stmt->execute(); |
675: | $stmt->close(); |
676: | |
677: | $this->Namespaces[$prefix] = true; |
678: | |
679: | if ($helper) { |
680: | $this->NamespaceHelpersByPrefix[$prefix] = $helper; |
681: | } |
682: | |
683: | |
684: | if (!isset($this->NamespacesByPrefix)) { |
685: | return $this; |
686: | } |
687: | |
688: | return $this->reload(); |
689: | } |
690: | |
691: | |
692: | |
693: | |
694: | public function getEntityTypeUri(string $entityType, bool $compact = true): string |
695: | { |
696: | $prefix = $this->classToNamespace($entityType, $uri, $namespace); |
697: | if ($prefix === null) { |
698: | return SyncUtil::getEntityTypeUri($entityType, $compact); |
699: | } |
700: | |
701: | $entityType = str_replace('\\', '/', substr(ltrim($entityType, '\\'), strlen($namespace))); |
702: | |
703: | return $compact |
704: | ? "{$prefix}:{$entityType}" |
705: | : "{$uri}{$entityType}"; |
706: | } |
707: | |
708: | |
709: | |
710: | |
711: | public function getNamespacePrefix(string $class): ?string |
712: | { |
713: | return $this->classToNamespace($class); |
714: | } |
715: | |
716: | |
717: | |
718: | |
719: | public function getNamespaceHelper(string $class): ?SyncNamespaceHelperInterface |
720: | { |
721: | if ($this->classToNamespace( |
722: | $class, |
723: | $uri, |
724: | $namespace, |
725: | $helper |
726: | ) === null) { |
727: | return null; |
728: | } |
729: | |
730: | return $helper; |
731: | } |
732: | |
733: | |
734: | |
735: | |
736: | |
737: | private function classToNamespace( |
738: | string $class, |
739: | ?string &$uri = null, |
740: | ?string &$namespace = null, |
741: | ?SyncNamespaceHelperInterface &$helper = null |
742: | ): ?string { |
743: | $class = Str::lower(ltrim($class, '\\')); |
744: | |
745: | $namespaces = $this->runHasStarted() |
746: | ? $this->getNamespaces() |
747: | : $this->DeferredNamespaces; |
748: | foreach ($namespaces as $prefix => [$_uri, $_namespace, $_helper]) { |
749: | $_namespace = Str::lower($_namespace); |
750: | if (strpos($class, $_namespace) === 0) { |
751: | $uri = $_uri; |
752: | $namespace = $_namespace; |
753: | $helper = $_helper; |
754: | return $prefix; |
755: | } |
756: | } |
757: | return null; |
758: | } |
759: | |
760: | |
761: | |
762: | |
763: | private function getNamespaces(): Generator |
764: | { |
765: | foreach ($this->NamespacesByPrefix as $prefix => $namespace) { |
766: | yield $prefix => [ |
767: | $this->NamespaceUrisByPrefix[$prefix], |
768: | $namespace, |
769: | $this->NamespaceHelpersByPrefix[$prefix] ?? null, |
770: | ]; |
771: | } |
772: | } |
773: | |
774: | |
775: | |
776: | |
777: | public function setEntity( |
778: | int $providerId, |
779: | string $entityType, |
780: | $entityId, |
781: | SyncEntityInterface $entity |
782: | ) { |
783: | $entityTypeId = $this->EntityTypeMap[$entityType]; |
784: | if (isset($this->Entities[$providerId][$entityTypeId][$entityId])) { |
785: | throw new LogicException('Entity already registered'); |
786: | } |
787: | $this->Entities[$providerId][$entityTypeId][$entityId] = $entity; |
788: | $this->EntityCheckpoints[spl_object_id($entity)] = $this->DeferralCheckpoint++; |
789: | |
790: | |
791: | $deferred = $this->DeferredEntities[$providerId][$entityTypeId][$entityId] ?? null; |
792: | if ($deferred) { |
793: | foreach ($deferred as $i => $deferredEntity) { |
794: | $deferredEntity->replace($entity); |
795: | unset($this->DeferredEntities[$providerId][$entityTypeId][$entityId][$i]); |
796: | } |
797: | unset($this->DeferredEntities[$providerId][$entityTypeId][$entityId]); |
798: | } |
799: | |
800: | return $this; |
801: | } |
802: | |
803: | |
804: | |
805: | |
806: | |
807: | |
808: | |
809: | public function getEntity( |
810: | int $providerId, |
811: | string $entityType, |
812: | $entityId, |
813: | ?bool $offline = null |
814: | ): ?SyncEntityInterface { |
815: | $entityTypeId = $this->EntityTypeMap[$entityType]; |
816: | |
817: | $entity = $this->Entities[$providerId][$entityTypeId][$entityId] ?? null; |
818: | if ($entity || $offline === false) { |
819: | return $entity; |
820: | } |
821: | return null; |
822: | } |
823: | |
824: | |
825: | |
826: | |
827: | |
828: | |
829: | |
830: | public function deferEntity( |
831: | int $providerId, |
832: | string $entityType, |
833: | $entityId, |
834: | DeferredEntityInterface $entity |
835: | ) { |
836: | $entityTypeId = $this->EntityTypeMap[$entityType]; |
837: | |
838: | $_entity = $this->Entities[$providerId][$entityTypeId][$entityId] ?? null; |
839: | if ($_entity) { |
840: | $entity->replace($_entity); |
841: | return $this; |
842: | } |
843: | |
844: | |
845: | |
846: | $context = $entity->getContext(); |
847: | if ($context) { |
848: | $last = $context->getLastEntity(); |
849: | if ($last) { |
850: | $context = $last->getContext(); |
851: | } |
852: | } |
853: | $policy = $context |
854: | ? $context->getDeferralPolicy() |
855: | : null; |
856: | |
857: | $this->DeferredEntities[$providerId][$entityTypeId][$entityId][ |
858: | $this->DeferralCheckpoint++ |
859: | ] = $entity; |
860: | |
861: | |
862: | |
863: | |
864: | |
865: | if ($policy === DeferralPolicy::RESOLVE_EARLY) { |
866: | $entity->resolve(); |
867: | return $this; |
868: | } |
869: | |
870: | return $this; |
871: | } |
872: | |
873: | |
874: | |
875: | |
876: | public function deferRelationship( |
877: | int $providerId, |
878: | string $entityType, |
879: | string $forEntityType, |
880: | string $forEntityProperty, |
881: | $forEntityId, |
882: | DeferredRelationshipInterface $relationship |
883: | ) { |
884: | $entityTypeId = $this->EntityTypeMap[$entityType]; |
885: | $forEntityTypeId = $this->EntityTypeMap[$forEntityType]; |
886: | |
887: | |
888: | $deferredList = &$this->DeferredRelationships[$providerId][$entityTypeId][ |
889: | $forEntityTypeId |
890: | ][$forEntityProperty][$forEntityId]; |
891: | |
892: | if (isset($deferredList)) { |
893: | throw new LogicException('Relationship already registered'); |
894: | } |
895: | |
896: | $deferredList = []; |
897: | |
898: | |
899: | |
900: | $context = $relationship->getContext(); |
901: | if ($context) { |
902: | $last = $context->getLastEntity(); |
903: | if ($last) { |
904: | $context = $last->getContext(); |
905: | } |
906: | } |
907: | $policy = $context |
908: | ? $context->getHydrationPolicy($entityType) |
909: | : 0; |
910: | |
911: | if ($policy === HydrationPolicy::LAZY) { |
912: | return $this; |
913: | } |
914: | |
915: | if ($policy === HydrationPolicy::EAGER) { |
916: | $relationship->resolve(); |
917: | return $this; |
918: | } |
919: | |
920: | $deferredList[$this->DeferralCheckpoint++] = $relationship; |
921: | return $this; |
922: | } |
923: | |
924: | |
925: | |
926: | |
927: | public function resolveDeferrals( |
928: | ?int $fromCheckpoint = null, |
929: | ?string $entityType = null, |
930: | ?int $providerId = null |
931: | ): array { |
932: | $checkpoint = $this->DeferralCheckpoint; |
933: | do { |
934: | |
935: | |
936: | |
937: | $deferred = $this->resolveDeferredRelationships($fromCheckpoint, $entityType, null, null, $providerId); |
938: | if ($deferred) { |
939: | foreach ($deferred as $relationship) { |
940: | foreach ($relationship as $entity) { |
941: | $objectId = spl_object_id($entity); |
942: | if ($this->EntityCheckpoints[$objectId] < $checkpoint) { |
943: | continue; |
944: | } |
945: | $resolved[$objectId] = $entity; |
946: | } |
947: | } |
948: | continue; |
949: | } |
950: | |
951: | $deferred = $this->resolveDeferredEntities($fromCheckpoint, $entityType, $providerId); |
952: | if (!$deferred) { |
953: | continue; |
954: | } |
955: | foreach ($deferred as $entity) { |
956: | $resolved[spl_object_id($entity)] = $entity; |
957: | } |
958: | } while ($deferred); |
959: | |
960: | return array_values($resolved ?? []); |
961: | } |
962: | |
963: | |
964: | |
965: | |
966: | public function getDeferralCheckpoint(): int |
967: | { |
968: | return $this->DeferralCheckpoint; |
969: | } |
970: | |
971: | |
972: | |
973: | |
974: | public function resolveDeferredEntities( |
975: | ?int $fromCheckpoint = null, |
976: | ?string $entityType = null, |
977: | ?int $providerId = null |
978: | ): array { |
979: | $entityTypeId = $entityType === null |
980: | ? null |
981: | : $this->EntityTypeMap[$entityType]; |
982: | |
983: | $resolved = []; |
984: | foreach ($this->DeferredEntities as $provId => $entitiesByTypeId) { |
985: | if ($providerId !== null && $provId !== $providerId) { |
986: | continue; |
987: | } |
988: | foreach ($entitiesByTypeId as $entTypeId => $entities) { |
989: | if ($entityTypeId !== null && $entTypeId !== $entityTypeId) { |
990: | continue; |
991: | } |
992: | |
993: | if ($fromCheckpoint !== null) { |
994: | $_entities = $entities; |
995: | foreach ($_entities as $entityId => $deferred) { |
996: | foreach ($deferred as $i => $deferredEntity) { |
997: | if ($i < $fromCheckpoint) { |
998: | unset($_entities[$entityId][$i]); |
999: | if (!$_entities[$entityId]) { |
1000: | unset($_entities[$entityId]); |
1001: | } |
1002: | } |
1003: | } |
1004: | } |
1005: | if (!$_entities) { |
1006: | continue; |
1007: | } |
1008: | $entities = $_entities; |
1009: | } |
1010: | |
1011: | |
1012: | foreach ($entities as $entityId => $deferred) { |
1013: | $deferredEntity = reset($deferred); |
1014: | $resolved[] = $deferredEntity->resolve(); |
1015: | } |
1016: | } |
1017: | } |
1018: | |
1019: | return $resolved; |
1020: | } |
1021: | |
1022: | |
1023: | |
1024: | |
1025: | public function resolveDeferredRelationships( |
1026: | ?int $fromCheckpoint = null, |
1027: | ?string $entityType = null, |
1028: | ?string $forEntityType = null, |
1029: | ?string $forEntityProperty = null, |
1030: | ?int $providerId = null |
1031: | ): array { |
1032: | $entityTypeId = $entityType === null |
1033: | ? null |
1034: | : $this->EntityTypeMap[$entityType]; |
1035: | $forEntityTypeId = $forEntityType === null |
1036: | ? null |
1037: | : $this->EntityTypeMap[$forEntityType]; |
1038: | |
1039: | $resolved = []; |
1040: | foreach ($this->DeferredRelationships as $provId => $relationshipsByEntTypeId) { |
1041: | if ($providerId !== null && $provId !== $providerId) { |
1042: | continue; |
1043: | } |
1044: | foreach ($relationshipsByEntTypeId as $entTypeId => $relationshipsByForEntTypeId) { |
1045: | if ($entityTypeId !== null && $entTypeId !== $entityTypeId) { |
1046: | continue; |
1047: | } |
1048: | foreach ($relationshipsByForEntTypeId as $forEntTypeId => $relationshipsByForEntProp) { |
1049: | if ($forEntityTypeId !== null && $forEntTypeId !== $forEntityTypeId) { |
1050: | continue; |
1051: | } |
1052: | foreach ($relationshipsByForEntProp as $forEntProp => $relationshipsByForEntId) { |
1053: | if ($forEntityProperty !== null && $forEntProp !== $forEntityProperty) { |
1054: | continue; |
1055: | } |
1056: | foreach ($relationshipsByForEntId as $forEntId => $relationships) { |
1057: | if ($fromCheckpoint !== null) { |
1058: | $_relationships = $relationships; |
1059: | foreach ($_relationships as $index => $deferred) { |
1060: | if ($index < $fromCheckpoint) { |
1061: | unset($_relationships[$index]); |
1062: | } |
1063: | } |
1064: | if (!$_relationships) { |
1065: | continue; |
1066: | } |
1067: | $relationships = $_relationships; |
1068: | } |
1069: | |
1070: | foreach ($relationships as $index => $deferred) { |
1071: | $resolved[] = $deferred->resolve(); |
1072: | unset($this->DeferredRelationships[$provId][$entTypeId][$forEntTypeId][$forEntProp][$forEntId][$index]); |
1073: | } |
1074: | } |
1075: | } |
1076: | } |
1077: | } |
1078: | } |
1079: | |
1080: | return $resolved; |
1081: | } |
1082: | |
1083: | |
1084: | |
1085: | |
1086: | public function checkProviderHeartbeats( |
1087: | int $ttl = 300, |
1088: | bool $failEarly = true, |
1089: | SyncProviderInterface ...$providers |
1090: | ) { |
1091: | $this->check(); |
1092: | |
1093: | if ($providers) { |
1094: | $providers = Arr::unique($providers); |
1095: | } elseif ($this->Providers) { |
1096: | $providers = $this->Providers; |
1097: | } else { |
1098: | return $this; |
1099: | } |
1100: | |
1101: | $failed = []; |
1102: | |
1103: | foreach ($providers as $provider) { |
1104: | $id = $provider->getProviderId(); |
1105: | $name = sprintf('%s [#%d]', $provider->getName(), $id); |
1106: | Console::logProgress('Checking', $name); |
1107: | try { |
1108: | $provider->checkHeartbeat($ttl); |
1109: | Console::log('Heartbeat OK:', $name); |
1110: | } catch (MethodNotImplementedException $ex) { |
1111: | Console::log('Heartbeat check not supported:', $name); |
1112: | } catch (UnreachableBackendExceptionInterface $ex) { |
1113: | Console::exception($ex, Console::LEVEL_DEBUG, null); |
1114: | Console::log('Heartbeat check failed:', $name); |
1115: | $failed[] = $provider; |
1116: | $this->recordError( |
1117: | SyncError::build() |
1118: | ->errorType(ErrorType::BACKEND_UNREACHABLE) |
1119: | ->message('Heartbeat check failed: %s') |
1120: | ->values([[ |
1121: | 'provider_id' => $id, |
1122: | 'provider_class' => get_class($provider), |
1123: | 'exception' => get_class($ex), |
1124: | 'message' => $ex->getMessage() |
1125: | ]]) |
1126: | ->build() |
1127: | ); |
1128: | } |
1129: | if ($failEarly && $failed) { |
1130: | break; |
1131: | } |
1132: | } |
1133: | |
1134: | if ($failed) { |
1135: | throw new HeartbeatCheckFailedException(...$failed); |
1136: | } |
1137: | |
1138: | return $this; |
1139: | } |
1140: | |
1141: | |
1142: | |
1143: | |
1144: | public function recordError(SyncErrorInterface $error, bool $deduplicate = false) |
1145: | { |
1146: | if ($deduplicate) { |
1147: | $key = $this->Errors->keyOf($error); |
1148: | if ($key !== null) { |
1149: | $this->Errors[$key] = $this->Errors[$key]->count(); |
1150: | return $this; |
1151: | } |
1152: | } |
1153: | |
1154: | $this->Errors[] = $error; |
1155: | |
1156: | return $this; |
1157: | } |
1158: | |
1159: | |
1160: | |
1161: | |
1162: | public function getErrors(): SyncErrorCollectionInterface |
1163: | { |
1164: | return clone $this->Errors; |
1165: | } |
1166: | |
1167: | |
1168: | |
1169: | |
1170: | protected function check() |
1171: | { |
1172: | if ($this->RunId !== null) { |
1173: | return $this; |
1174: | } |
1175: | |
1176: | if (!$this->isCheckRunning()) { |
1177: | return $this->safeCheck(); |
1178: | } |
1179: | |
1180: | $sql = <<<SQL |
1181: | INSERT INTO _sync_run (run_uuid, run_command, run_arguments_json) |
1182: | VALUES ( |
1183: | :run_uuid, |
1184: | :run_command, |
1185: | :run_arguments_json |
1186: | ); |
1187: | SQL; |
1188: | |
1189: | $stmt = $this->prepare($sql); |
1190: | $stmt->bindValue(':run_uuid', $uuid = Get::binaryUuid(), \SQLITE3_BLOB); |
1191: | $stmt->bindValue(':run_command', $this->Command, \SQLITE3_TEXT); |
1192: | $stmt->bindValue(':run_arguments_json', Json::encode($this->Arguments), \SQLITE3_TEXT); |
1193: | $stmt->execute(); |
1194: | $stmt->close(); |
1195: | |
1196: | $id = $this->db()->lastInsertRowID(); |
1197: | $this->RunId = $id; |
1198: | $this->RunUuid = $uuid; |
1199: | unset($this->Command, $this->Arguments); |
1200: | |
1201: | foreach ($this->DeferredProviders as $provider) { |
1202: | $this->registerProvider($provider); |
1203: | } |
1204: | unset($this->DeferredProviders); |
1205: | |
1206: | foreach ($this->DeferredEntityTypes as $entity) { |
1207: | $this->registerEntityType($entity); |
1208: | } |
1209: | unset($this->DeferredEntityTypes); |
1210: | |
1211: | foreach ($this->DeferredNamespaces as $prefix => [$uri, $namespace, $helper]) { |
1212: | $this->registerNamespace($prefix, $uri, $namespace, $helper); |
1213: | } |
1214: | unset($this->DeferredNamespaces); |
1215: | |
1216: | return $this->reload(); |
1217: | } |
1218: | |
1219: | |
1220: | |
1221: | |
1222: | private function reload() |
1223: | { |
1224: | $db = $this->db(); |
1225: | $sql = <<<SQL |
1226: | SELECT |
1227: | entity_namespace_prefix, |
1228: | base_uri, |
1229: | php_namespace |
1230: | FROM |
1231: | _sync_entity_namespace |
1232: | ORDER BY |
1233: | LENGTH(php_namespace) DESC; |
1234: | SQL; |
1235: | $stmt = $this->prepare($sql); |
1236: | $result = $this->execute($stmt); |
1237: | $this->NamespacesByPrefix = []; |
1238: | $this->NamespaceUrisByPrefix = []; |
1239: | while (($row = $result->fetchArray(\SQLITE3_NUM)) !== false) { |
1240: | |
1241: | $this->NamespacesByPrefix[$row[0]] = $row[2]; |
1242: | $this->NamespaceUrisByPrefix[$row[0]] = $row[1]; |
1243: | } |
1244: | $result->finalize(); |
1245: | $stmt->close(); |
1246: | |
1247: | return $this; |
1248: | } |
1249: | |
1250: | |
1251: | |
1252: | |
1253: | public function __destruct() |
1254: | { |
1255: | $exitStatus = -1; |
1256: | if (Err::isLoaded() && Err::isShuttingDown()) { |
1257: | $exitStatus = Err::getExitStatus(); |
1258: | } |
1259: | $this->close($exitStatus); |
1260: | } |
1261: | } |
1262: | |