1: | <?php declare(strict_types=1); |
2: | |
3: | namespace Salient\Sync; |
4: | |
5: | use Salient\Contract\Core\Exception\MethodNotImplementedExceptionInterface; |
6: | use Salient\Contract\Core\MessageLevel as Level; |
7: | use Salient\Contract\Sync\Exception\UnreachableBackendExceptionInterface; |
8: | use Salient\Contract\Sync\DeferralPolicy; |
9: | use Salient\Contract\Sync\DeferredEntityInterface; |
10: | use Salient\Contract\Sync\DeferredRelationshipInterface; |
11: | use Salient\Contract\Sync\ErrorType; |
12: | use Salient\Contract\Sync\HydrationPolicy; |
13: | use Salient\Contract\Sync\SyncEntityInterface; |
14: | use Salient\Contract\Sync\SyncErrorCollectionInterface; |
15: | use Salient\Contract\Sync\SyncErrorInterface; |
16: | use Salient\Contract\Sync\SyncNamespaceHelperInterface; |
17: | use Salient\Contract\Sync\SyncProviderInterface; |
18: | use Salient\Contract\Sync\SyncStoreInterface; |
19: | use Salient\Core\Facade\Console; |
20: | use Salient\Core\Facade\Err; |
21: | use Salient\Core\Facade\Event; |
22: | use Salient\Core\AbstractStore; |
23: | use Salient\Sync\Event\SyncStoreLoadedEvent; |
24: | use Salient\Sync\Exception\HeartbeatCheckFailedException; |
25: | use Salient\Sync\Exception\SyncStoreException; |
26: | use Salient\Sync\Support\SyncErrorCollection; |
27: | use Salient\Utility\Arr; |
28: | use Salient\Utility\Get; |
29: | use Salient\Utility\Json; |
30: | use Salient\Utility\Regex; |
31: | use Salient\Utility\Str; |
32: | use Generator; |
33: | use InvalidArgumentException; |
34: | use LogicException; |
35: | use ReflectionClass; |
36: | |
37: | |
38: | |
39: | |
40: | |
41: | |
42: | |
43: | |
44: | |
45: | final class SyncStore extends AbstractStore implements SyncStoreInterface |
46: | { |
47: | private ?int $RunId = null; |
48: | private string $RunUuid; |
49: | |
50: | |
51: | |
52: | |
53: | |
54: | |
55: | private array $Namespaces = []; |
56: | |
57: | |
58: | |
59: | |
60: | |
61: | |
62: | private array $Providers = []; |
63: | |
64: | |
65: | |
66: | |
67: | |
68: | |
69: | private array $ProviderMap = []; |
70: | |
71: | |
72: | |
73: | |
74: | |
75: | |
76: | private array $EntityTypes = []; |
77: | |
78: | |
79: | |
80: | |
81: | |
82: | |
83: | private array $EntityTypeMap = []; |
84: | |
85: | |
86: | |
87: | |
88: | |
89: | |
90: | private array $NamespacesByPrefix; |
91: | |
92: | |
93: | |
94: | |
95: | |
96: | |
97: | private array $NamespaceUrisByPrefix; |
98: | |
99: | |
100: | |
101: | |
102: | |
103: | |
104: | private array $NamespaceHelpersByPrefix; |
105: | |
106: | |
107: | |
108: | |
109: | |
110: | |
111: | private array $Entities; |
112: | |
113: | |
114: | |
115: | |
116: | |
117: | |
118: | private array $EntityCheckpoints; |
119: | |
120: | |
121: | |
122: | |
123: | |
124: | |
125: | private array $DeferredEntities = []; |
126: | |
127: | |
128: | |
129: | |
130: | |
131: | |
132: | |
133: | private array $DeferredRelationships = []; |
134: | |
135: | private SyncErrorCollection $Errors; |
136: | private int $DeferralCheckpoint = 0; |
137: | private string $Command; |
138: | |
139: | private array $Arguments; |
140: | |
141: | private array $DeferredNamespaces = []; |
142: | |
143: | private array $DeferredProviders = []; |
144: | |
145: | private array $DeferredEntityTypes = []; |
146: | |
147: | |
148: | |
149: | |
150: | |
151: | |
152: | |
153: | |
154: | public function __construct( |
155: | string $filename = ':memory:', |
156: | string $command = '', |
157: | array $arguments = [] |
158: | ) { |
159: | $this->assertCanUpsert(); |
160: | |
161: | $this->Errors = new SyncErrorCollection(); |
162: | $this->Command = $command; |
163: | $this->Arguments = $arguments; |
164: | |
165: | $this->openDb( |
166: | $filename, |
167: | <<<SQL |
168: | CREATE TABLE IF NOT EXISTS |
169: | _sync_run ( |
170: | run_id INTEGER NOT NULL PRIMARY KEY, |
171: | run_uuid BLOB NOT NULL UNIQUE, |
172: | run_command TEXT NOT NULL, |
173: | run_arguments_json TEXT NOT NULL, |
174: | started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
175: | finished_at DATETIME, |
176: | exit_status INTEGER, |
177: | error_count INTEGER, |
178: | warning_count INTEGER, |
179: | errors_json TEXT |
180: | ); |
181: | |
182: | CREATE TABLE IF NOT EXISTS |
183: | _sync_provider ( |
184: | provider_id INTEGER NOT NULL PRIMARY KEY, |
185: | provider_hash BLOB NOT NULL UNIQUE, |
186: | provider_class TEXT NOT NULL, |
187: | added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
188: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP |
189: | ); |
190: | |
191: | CREATE TABLE IF NOT EXISTS |
192: | _sync_entity_type ( |
193: | entity_type_id INTEGER NOT NULL PRIMARY KEY, |
194: | entity_type_class TEXT NOT NULL UNIQUE, |
195: | added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
196: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP |
197: | ); |
198: | |
199: | CREATE TABLE IF NOT EXISTS |
200: | _sync_entity_type_state ( |
201: | provider_id INTEGER NOT NULL, |
202: | entity_type_id INTEGER NOT NULL, |
203: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP, |
204: | last_sync DATETIME, |
205: | PRIMARY KEY (provider_id, entity_type_id), |
206: | FOREIGN KEY (provider_id) REFERENCES _sync_provider, |
207: | FOREIGN KEY (entity_type_id) REFERENCES _sync_entity_type |
208: | ); |
209: | |
210: | CREATE TABLE IF NOT EXISTS |
211: | _sync_entity ( |
212: | provider_id INTEGER NOT NULL, |
213: | entity_type_id INTEGER NOT NULL, |
214: | entity_id TEXT NOT NULL, |
215: | canonical_id TEXT, |
216: | is_dirty INTEGER NOT NULL DEFAULT 0, |
217: | is_deleted INTEGER NOT NULL DEFAULT 0, |
218: | added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
219: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP, |
220: | last_sync DATETIME, |
221: | entity_json TEXT NOT NULL, |
222: | PRIMARY KEY (provider_id, entity_type_id, entity_id), |
223: | FOREIGN KEY (provider_id) REFERENCES _sync_provider, |
224: | FOREIGN KEY (entity_type_id) REFERENCES _sync_entity_type |
225: | ) WITHOUT ROWID; |
226: | |
227: | CREATE TABLE IF NOT EXISTS |
228: | _sync_entity_namespace ( |
229: | entity_namespace_id INTEGER NOT NULL PRIMARY KEY, |
230: | entity_namespace_prefix TEXT NOT NULL UNIQUE, |
231: | base_uri TEXT NOT NULL, |
232: | php_namespace TEXT NOT NULL, |
233: | added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
234: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP |
235: | ); |
236: | |
237: | SQL |
238: | ); |
239: | |
240: | Event::dispatch(new SyncStoreLoadedEvent($this)); |
241: | } |
242: | |
243: | |
244: | |
245: | |
246: | public function close(int $exitStatus = 0): void |
247: | { |
248: | |
249: | if (!$this->isOpen() || $this->RunId === null) { |
250: | $this->closeDb(); |
251: | return; |
252: | } |
253: | |
254: | $sql = <<<SQL |
255: | UPDATE |
256: | _sync_run |
257: | SET |
258: | finished_at = CURRENT_TIMESTAMP, |
259: | exit_status = :exit_status, |
260: | error_count = :error_count, |
261: | warning_count = :warning_count, |
262: | errors_json = :errors_json |
263: | WHERE |
264: | run_uuid = :run_uuid; |
265: | SQL; |
266: | |
267: | $stmt = $this->prepare($sql); |
268: | $stmt->bindValue(':exit_status', $exitStatus, \SQLITE3_INTEGER); |
269: | $stmt->bindValue(':run_uuid', $this->RunUuid, \SQLITE3_BLOB); |
270: | $stmt->bindValue(':error_count', $this->Errors->getErrorCount(), \SQLITE3_INTEGER); |
271: | $stmt->bindValue(':warning_count', $this->Errors->getWarningCount(), \SQLITE3_INTEGER); |
272: | $stmt->bindValue(':errors_json', Json::stringify($this->Errors), \SQLITE3_TEXT); |
273: | $stmt->execute(); |
274: | $stmt->close(); |
275: | |
276: | $this->closeDb(); |
277: | } |
278: | |
279: | |
280: | |
281: | |
282: | public function runHasStarted(): bool |
283: | { |
284: | return $this->RunId !== null; |
285: | } |
286: | |
287: | |
288: | |
289: | |
290: | public function getRunId(): int |
291: | { |
292: | $this->assertRunHasStarted(); |
293: | |
294: | return $this->RunId; |
295: | } |
296: | |
297: | |
298: | |
299: | |
300: | public function getRunUuid(): string |
301: | { |
302: | $this->assertRunHasStarted(); |
303: | |
304: | return Get::uuid($this->RunUuid); |
305: | } |
306: | |
307: | |
308: | |
309: | |
310: | public function getBinaryRunUuid(): string |
311: | { |
312: | $this->assertRunHasStarted(); |
313: | |
314: | return $this->RunUuid; |
315: | } |
316: | |
317: | |
318: | |
319: | |
320: | private function assertRunHasStarted(): void |
321: | { |
322: | if ($this->RunId === null) { |
323: | throw new LogicException('Run has not started'); |
324: | } |
325: | } |
326: | |
327: | |
328: | |
329: | |
330: | public function registerProvider(SyncProviderInterface $provider) |
331: | { |
332: | |
333: | if (!$this->runHasStarted()) { |
334: | $this->DeferredProviders[] = $provider; |
335: | return $this; |
336: | } |
337: | |
338: | $class = get_class($provider); |
339: | $hash = $this->getProviderSignature($provider); |
340: | |
341: | if (isset($this->ProviderMap[$hash])) { |
342: | throw new LogicException(sprintf( |
343: | 'Provider already registered: %s', |
344: | $class, |
345: | )); |
346: | } |
347: | |
348: | |
349: | $sql = <<<SQL |
350: | INSERT INTO |
351: | _sync_provider (provider_hash, provider_class) |
352: | VALUES |
353: | (:provider_hash, :provider_class) ON CONFLICT (provider_hash) DO |
354: | UPDATE |
355: | SET |
356: | last_seen = CURRENT_TIMESTAMP; |
357: | SQL; |
358: | $stmt = $this->prepare($sql); |
359: | $stmt->bindValue(':provider_hash', $hash, \SQLITE3_BLOB); |
360: | $stmt->bindValue(':provider_class', $class, \SQLITE3_TEXT); |
361: | $stmt->execute(); |
362: | $stmt->close(); |
363: | |
364: | $sql = <<<SQL |
365: | SELECT |
366: | provider_id |
367: | FROM |
368: | _sync_provider |
369: | WHERE |
370: | provider_hash = :provider_hash; |
371: | SQL; |
372: | $stmt = $this->prepare($sql); |
373: | $stmt->bindValue(':provider_hash', $hash, \SQLITE3_BLOB); |
374: | $result = $this->execute($stmt); |
375: | |
376: | $row = $result->fetchArray(\SQLITE3_NUM); |
377: | $stmt->close(); |
378: | |
379: | if ($row === false) { |
380: | |
381: | throw new SyncStoreException('Error retrieving provider ID'); |
382: | |
383: | } |
384: | |
385: | $providerId = $row[0]; |
386: | $this->Providers[$providerId] = $provider; |
387: | $this->ProviderMap[$hash] = $providerId; |
388: | |
389: | return $this; |
390: | } |
391: | |
392: | |
393: | |
394: | |
395: | public function hasProvider($provider): bool |
396: | { |
397: | if ($provider instanceof SyncProviderInterface) { |
398: | $providers = $this->runHasStarted() |
399: | ? $this->Providers |
400: | : $this->DeferredProviders; |
401: | return in_array($provider, $providers, true); |
402: | } |
403: | |
404: | if (!$this->runHasStarted()) { |
405: | foreach ($this->DeferredProviders as $deferred) { |
406: | if ($this->getProviderSignature($deferred) === $provider) { |
407: | return true; |
408: | } |
409: | } |
410: | return false; |
411: | } |
412: | |
413: | return isset($this->ProviderMap[$provider]); |
414: | } |
415: | |
416: | |
417: | |
418: | |
419: | public function getProviderId($provider): int |
420: | { |
421: | if (!$this->runHasStarted()) { |
422: | $this->check(); |
423: | } |
424: | |
425: | $hash = $provider instanceof SyncProviderInterface |
426: | ? $this->getProviderSignature($provider) |
427: | : $provider; |
428: | $id = $this->ProviderMap[$hash] ?? null; |
429: | if ($id === null) { |
430: | throw new LogicException('Provider not registered' |
431: | . ($provider instanceof SyncProviderInterface |
432: | ? sprintf(': %s', get_class($provider)) |
433: | : '')); |
434: | } |
435: | return $id; |
436: | } |
437: | |
438: | |
439: | |
440: | |
441: | public function getProvider($provider): SyncProviderInterface |
442: | { |
443: | if (is_int($provider)) { |
444: | if (!$this->runHasStarted()) { |
445: | throw new LogicException(sprintf( |
446: | 'Provider ID not issued during run: %d', |
447: | $provider, |
448: | )); |
449: | } |
450: | if (!isset($this->Providers[$provider])) { |
451: | throw new LogicException(sprintf( |
452: | 'Provider not registered: #%d', |
453: | $provider, |
454: | )); |
455: | } |
456: | return $this->Providers[$provider]; |
457: | } |
458: | |
459: | |
460: | if (!$this->runHasStarted()) { |
461: | foreach ($this->DeferredProviders as $deferred) { |
462: | if ($this->getProviderSignature($deferred) === $provider) { |
463: | return $deferred; |
464: | } |
465: | } |
466: | throw new LogicException('Provider not registered'); |
467: | } |
468: | |
469: | $id = $this->ProviderMap[$provider] ?? null; |
470: | if ($id === null) { |
471: | throw new LogicException('Provider not registered'); |
472: | } |
473: | return $this->Providers[$id]; |
474: | } |
475: | |
476: | |
477: | |
478: | |
479: | public function getProviderSignature(SyncProviderInterface $provider): string |
480: | { |
481: | $class = get_class($provider); |
482: | return Get::binaryHash(implode("\0", [ |
483: | $class, |
484: | ...$provider->getBackendIdentifier(), |
485: | ])); |
486: | } |
487: | |
488: | |
489: | |
490: | |
491: | public function registerEntityType(string $entityType) |
492: | { |
493: | |
494: | if (!$this->runHasStarted()) { |
495: | $this->DeferredEntityTypes[] = $entityType; |
496: | return $this; |
497: | } |
498: | |
499: | if (isset($this->EntityTypeMap[$entityType])) { |
500: | return $this; |
501: | } |
502: | |
503: | $class = new ReflectionClass($entityType); |
504: | |
505: | if ($entityType !== $class->getName()) { |
506: | throw new LogicException(sprintf( |
507: | 'Not an exact match for declared class (%s expected): %s', |
508: | $class->getName(), |
509: | $entityType, |
510: | )); |
511: | } |
512: | |
513: | if (!$class->implementsInterface(SyncEntityInterface::class)) { |
514: | |
515: | throw new LogicException(sprintf( |
516: | 'Does not implement %s: %s', |
517: | SyncEntityInterface::class, |
518: | $entityType, |
519: | )); |
520: | |
521: | } |
522: | |
523: | |
524: | $sql = <<<SQL |
525: | INSERT INTO |
526: | _sync_entity_type (entity_type_class) |
527: | VALUES |
528: | (:entity_type_class) ON CONFLICT (entity_type_class) DO |
529: | UPDATE |
530: | SET |
531: | last_seen = CURRENT_TIMESTAMP; |
532: | SQL; |
533: | $stmt = $this->prepare($sql); |
534: | $stmt->bindValue(':entity_type_class', $entityType, \SQLITE3_TEXT); |
535: | $stmt->execute(); |
536: | $stmt->close(); |
537: | |
538: | $sql = <<<SQL |
539: | SELECT |
540: | entity_type_id |
541: | FROM |
542: | _sync_entity_type |
543: | WHERE |
544: | entity_type_class = :entity_type_class; |
545: | SQL; |
546: | $stmt = $this->prepare($sql); |
547: | $stmt->bindValue(':entity_type_class', $entityType, \SQLITE3_TEXT); |
548: | $result = $this->execute($stmt); |
549: | |
550: | $row = $result->fetchArray(\SQLITE3_NUM); |
551: | $stmt->close(); |
552: | |
553: | if ($row === false) { |
554: | throw new SyncStoreException('Error retrieving entity type ID'); |
555: | } |
556: | |
557: | $this->EntityTypes[$row[0]] = $entityType; |
558: | $this->EntityTypeMap[$entityType] = $row[0]; |
559: | |
560: | return $this; |
561: | } |
562: | |
563: | |
564: | |
565: | |
566: | public function hasEntityType(string $entityType): bool |
567: | { |
568: | if (!$this->runHasStarted()) { |
569: | return in_array($entityType, $this->DeferredEntityTypes, true); |
570: | } |
571: | |
572: | return isset($this->EntityTypeMap[$entityType]); |
573: | } |
574: | |
575: | |
576: | |
577: | |
578: | public function getEntityTypeId(string $entityType): int |
579: | { |
580: | if (!$this->runHasStarted()) { |
581: | $this->check(); |
582: | } |
583: | |
584: | $id = $this->EntityTypeMap[$entityType] ?? null; |
585: | if ($id === null) { |
586: | throw new LogicException(sprintf( |
587: | 'Entity not registered: %s', |
588: | $entityType, |
589: | )); |
590: | } |
591: | return $id; |
592: | } |
593: | |
594: | |
595: | |
596: | |
597: | public function getEntityType(int $entityTypeId): string |
598: | { |
599: | if (!$this->runHasStarted()) { |
600: | throw new LogicException(sprintf( |
601: | 'Entity type ID not issued during run: %d', |
602: | $entityTypeId, |
603: | )); |
604: | } |
605: | if (!isset($this->EntityTypes[$entityTypeId])) { |
606: | throw new LogicException(sprintf( |
607: | 'Entity type not registered: #%d', |
608: | $entityTypeId, |
609: | )); |
610: | } |
611: | return $this->EntityTypes[$entityTypeId]; |
612: | } |
613: | |
614: | |
615: | |
616: | |
617: | public function registerNamespace( |
618: | string $prefix, |
619: | string $uri, |
620: | string $namespace, |
621: | ?SyncNamespaceHelperInterface $helper = null |
622: | ) { |
623: | $prefix = Str::lower($prefix); |
624: | if (isset($this->Namespaces[$prefix]) || ( |
625: | !$this->runHasStarted() |
626: | && isset($this->DeferredNamespaces[$prefix]) |
627: | )) { |
628: | throw new LogicException(sprintf( |
629: | 'Prefix already registered: %s', |
630: | $prefix, |
631: | )); |
632: | } |
633: | |
634: | |
635: | |
636: | |
637: | if ( |
638: | !isset($this->DeferredNamespaces) |
639: | || !isset($this->DeferredNamespaces[$prefix]) |
640: | ) { |
641: | if (!Regex::match('/^[a-z][-a-z0-9+.]*$/iD', $prefix)) { |
642: | throw new InvalidArgumentException(sprintf( |
643: | 'Invalid prefix: %s', |
644: | $prefix, |
645: | )); |
646: | } |
647: | $uri = rtrim($uri, '/') . '/'; |
648: | $namespace = trim($namespace, '\\') . '\\'; |
649: | } |
650: | |
651: | |
652: | if (!$this->runHasStarted()) { |
653: | $this->DeferredNamespaces[$prefix] = [$uri, $namespace, $helper]; |
654: | return $this; |
655: | } |
656: | |
657: | |
658: | $sql = <<<SQL |
659: | INSERT INTO |
660: | _sync_entity_namespace (entity_namespace_prefix, base_uri, php_namespace) |
661: | VALUES |
662: | ( |
663: | :entity_namespace_prefix, |
664: | :base_uri, |
665: | :php_namespace |
666: | ) ON CONFLICT (entity_namespace_prefix) DO |
667: | UPDATE |
668: | SET |
669: | base_uri = excluded.base_uri, |
670: | php_namespace = excluded.php_namespace, |
671: | last_seen = CURRENT_TIMESTAMP; |
672: | SQL; |
673: | $stmt = $this->prepare($sql); |
674: | $stmt->bindValue(':entity_namespace_prefix', $prefix, \SQLITE3_TEXT); |
675: | $stmt->bindValue(':base_uri', $uri, \SQLITE3_TEXT); |
676: | $stmt->bindValue(':php_namespace', $namespace, \SQLITE3_TEXT); |
677: | $stmt->execute(); |
678: | $stmt->close(); |
679: | |
680: | $this->Namespaces[$prefix] = true; |
681: | |
682: | if ($helper) { |
683: | $this->NamespaceHelpersByPrefix[$prefix] = $helper; |
684: | } |
685: | |
686: | |
687: | if (!isset($this->NamespacesByPrefix)) { |
688: | return $this; |
689: | } |
690: | |
691: | return $this->reload(); |
692: | } |
693: | |
694: | |
695: | |
696: | |
697: | public function getEntityTypeUri(string $entityType, bool $compact = true): string |
698: | { |
699: | $prefix = $this->classToNamespace($entityType, $uri, $namespace); |
700: | if ($prefix === null) { |
701: | return SyncUtil::getEntityTypeUri($entityType, $compact); |
702: | } |
703: | $entityType = str_replace('\\', '/', substr(ltrim($entityType, '\\'), strlen($namespace))); |
704: | |
705: | return $compact |
706: | ? "{$prefix}:{$entityType}" |
707: | : "{$uri}{$entityType}"; |
708: | } |
709: | |
710: | |
711: | |
712: | |
713: | public function getNamespacePrefix(string $class): ?string |
714: | { |
715: | return $this->classToNamespace($class); |
716: | } |
717: | |
718: | |
719: | |
720: | |
721: | public function getNamespaceHelper(string $class): ?SyncNamespaceHelperInterface |
722: | { |
723: | if ($this->classToNamespace( |
724: | $class, |
725: | $uri, |
726: | $namespace, |
727: | $helper |
728: | ) === null) { |
729: | return null; |
730: | } |
731: | |
732: | return $helper; |
733: | } |
734: | |
735: | |
736: | |
737: | |
738: | |
739: | private function classToNamespace( |
740: | string $class, |
741: | ?string &$uri = null, |
742: | ?string &$namespace = null, |
743: | ?SyncNamespaceHelperInterface &$helper = null |
744: | ): ?string { |
745: | $class = Str::lower(ltrim($class, '\\')); |
746: | |
747: | $namespaces = $this->runHasStarted() |
748: | ? $this->getNamespaces() |
749: | : $this->DeferredNamespaces; |
750: | foreach ($namespaces as $prefix => [$_uri, $_namespace, $_helper]) { |
751: | $_namespace = Str::lower($_namespace); |
752: | if (strpos($class, $_namespace) === 0) { |
753: | $uri = $_uri; |
754: | $namespace = $_namespace; |
755: | $helper = $_helper; |
756: | return $prefix; |
757: | } |
758: | } |
759: | return null; |
760: | } |
761: | |
762: | |
763: | |
764: | |
765: | private function getNamespaces(): Generator |
766: | { |
767: | foreach ($this->NamespacesByPrefix as $prefix => $namespace) { |
768: | yield $prefix => [ |
769: | $this->NamespaceUrisByPrefix[$prefix], |
770: | $namespace, |
771: | $this->NamespaceHelpersByPrefix[$prefix] ?? null, |
772: | ]; |
773: | } |
774: | } |
775: | |
776: | |
777: | |
778: | |
779: | public function setEntity( |
780: | int $providerId, |
781: | string $entityType, |
782: | $entityId, |
783: | SyncEntityInterface $entity |
784: | ) { |
785: | $entityTypeId = $this->EntityTypeMap[$entityType]; |
786: | if (isset($this->Entities[$providerId][$entityTypeId][$entityId])) { |
787: | throw new LogicException('Entity already registered'); |
788: | } |
789: | $this->Entities[$providerId][$entityTypeId][$entityId] = $entity; |
790: | $this->EntityCheckpoints[spl_object_id($entity)] = $this->DeferralCheckpoint++; |
791: | |
792: | |
793: | $deferred = $this->DeferredEntities[$providerId][$entityTypeId][$entityId] ?? null; |
794: | if ($deferred) { |
795: | foreach ($deferred as $i => $deferredEntity) { |
796: | $deferredEntity->replace($entity); |
797: | unset($this->DeferredEntities[$providerId][$entityTypeId][$entityId][$i]); |
798: | } |
799: | unset($this->DeferredEntities[$providerId][$entityTypeId][$entityId]); |
800: | } |
801: | |
802: | return $this; |
803: | } |
804: | |
805: | |
806: | |
807: | |
808: | public function getEntity( |
809: | int $providerId, |
810: | string $entityType, |
811: | $entityId, |
812: | ?bool $offline = null |
813: | ): ?SyncEntityInterface { |
814: | $entityTypeId = $this->EntityTypeMap[$entityType]; |
815: | $entity = $this->Entities[$providerId][$entityTypeId][$entityId] ?? null; |
816: | if ($entity || $offline === false) { |
817: | return $entity; |
818: | } |
819: | return null; |
820: | } |
821: | |
822: | |
823: | |
824: | |
825: | |
826: | |
827: | |
828: | public function deferEntity( |
829: | int $providerId, |
830: | string $entityType, |
831: | $entityId, |
832: | DeferredEntityInterface $entity |
833: | ) { |
834: | $entityTypeId = $this->EntityTypeMap[$entityType]; |
835: | |
836: | $_entity = $this->Entities[$providerId][$entityTypeId][$entityId] ?? null; |
837: | if ($_entity) { |
838: | $entity->replace($_entity); |
839: | return $this; |
840: | } |
841: | |
842: | |
843: | |
844: | $context = $entity->getContext(); |
845: | if ($context) { |
846: | $last = $context->getLastEntity(); |
847: | if ($last) { |
848: | $context = $last->getContext(); |
849: | } |
850: | } |
851: | $policy = $context |
852: | ? $context->getDeferralPolicy() |
853: | : null; |
854: | |
855: | $this->DeferredEntities[$providerId][$entityTypeId][$entityId][ |
856: | $this->DeferralCheckpoint++ |
857: | ] = $entity; |
858: | |
859: | |
860: | |
861: | |
862: | |
863: | if ($policy === DeferralPolicy::RESOLVE_EARLY) { |
864: | $entity->resolve(); |
865: | return $this; |
866: | } |
867: | |
868: | return $this; |
869: | } |
870: | |
871: | |
872: | |
873: | |
874: | public function deferRelationship( |
875: | int $providerId, |
876: | string $entityType, |
877: | string $forEntityType, |
878: | string $forEntityProperty, |
879: | $forEntityId, |
880: | DeferredRelationshipInterface $relationship |
881: | ) { |
882: | $entityTypeId = $this->EntityTypeMap[$entityType]; |
883: | $forEntityTypeId = $this->EntityTypeMap[$forEntityType]; |
884: | |
885: | |
886: | $deferredList = &$this->DeferredRelationships[$providerId][$entityTypeId][ |
887: | $forEntityTypeId |
888: | ][$forEntityProperty][$forEntityId]; |
889: | |
890: | if (isset($deferredList)) { |
891: | throw new LogicException('Relationship already registered'); |
892: | } |
893: | |
894: | $deferredList = []; |
895: | |
896: | |
897: | |
898: | $context = $relationship->getContext(); |
899: | if ($context) { |
900: | $last = $context->getLastEntity(); |
901: | if ($last) { |
902: | $context = $last->getContext(); |
903: | } |
904: | } |
905: | $policy = $context |
906: | ? $context->getHydrationPolicy($entityType) |
907: | : 0; |
908: | |
909: | if ($policy === HydrationPolicy::LAZY) { |
910: | return $this; |
911: | } |
912: | |
913: | if ($policy === HydrationPolicy::EAGER) { |
914: | $relationship->resolve(); |
915: | return $this; |
916: | } |
917: | |
918: | $deferredList[$this->DeferralCheckpoint++] = $relationship; |
919: | return $this; |
920: | } |
921: | |
922: | |
923: | |
924: | |
925: | public function resolveDeferrals( |
926: | ?int $fromCheckpoint = null, |
927: | ?string $entityType = null, |
928: | ?int $providerId = null |
929: | ): array { |
930: | $checkpoint = $this->DeferralCheckpoint; |
931: | do { |
932: | |
933: | |
934: | |
935: | $deferred = $this->resolveDeferredRelationships($fromCheckpoint, $entityType, null, null, $providerId); |
936: | if ($deferred) { |
937: | foreach ($deferred as $relationship) { |
938: | foreach ($relationship as $entity) { |
939: | $objectId = spl_object_id($entity); |
940: | if ($this->EntityCheckpoints[$objectId] < $checkpoint) { |
941: | continue; |
942: | } |
943: | $resolved[$objectId] = $entity; |
944: | } |
945: | } |
946: | continue; |
947: | } |
948: | |
949: | $deferred = $this->resolveDeferredEntities($fromCheckpoint, $entityType, $providerId); |
950: | if (!$deferred) { |
951: | continue; |
952: | } |
953: | foreach ($deferred as $entity) { |
954: | $resolved[spl_object_id($entity)] = $entity; |
955: | } |
956: | } while ($deferred); |
957: | |
958: | return array_values($resolved ?? []); |
959: | } |
960: | |
961: | |
962: | |
963: | |
964: | public function getDeferralCheckpoint(): int |
965: | { |
966: | return $this->DeferralCheckpoint; |
967: | } |
968: | |
969: | |
970: | |
971: | |
972: | public function resolveDeferredEntities( |
973: | ?int $fromCheckpoint = null, |
974: | ?string $entityType = null, |
975: | ?int $providerId = null |
976: | ): array { |
977: | $entityTypeId = $entityType === null |
978: | ? null |
979: | : $this->EntityTypeMap[$entityType]; |
980: | |
981: | $resolved = []; |
982: | foreach ($this->DeferredEntities as $provId => $entitiesByTypeId) { |
983: | if ($providerId !== null && $provId !== $providerId) { |
984: | continue; |
985: | } |
986: | foreach ($entitiesByTypeId as $entTypeId => $entities) { |
987: | if ($entityTypeId !== null && $entTypeId !== $entityTypeId) { |
988: | continue; |
989: | } |
990: | |
991: | if ($fromCheckpoint !== null) { |
992: | $_entities = $entities; |
993: | foreach ($_entities as $entityId => $deferred) { |
994: | foreach ($deferred as $i => $deferredEntity) { |
995: | if ($i < $fromCheckpoint) { |
996: | unset($_entities[$entityId][$i]); |
997: | if (!$_entities[$entityId]) { |
998: | unset($_entities[$entityId]); |
999: | } |
1000: | } |
1001: | } |
1002: | } |
1003: | if (!$_entities) { |
1004: | continue; |
1005: | } |
1006: | $entities = $_entities; |
1007: | } |
1008: | |
1009: | |
1010: | foreach ($entities as $entityId => $deferred) { |
1011: | $deferredEntity = reset($deferred); |
1012: | $resolved[] = $deferredEntity->resolve(); |
1013: | } |
1014: | } |
1015: | } |
1016: | |
1017: | return $resolved; |
1018: | } |
1019: | |
1020: | |
1021: | |
1022: | |
1023: | public function resolveDeferredRelationships( |
1024: | ?int $fromCheckpoint = null, |
1025: | ?string $entityType = null, |
1026: | ?string $forEntityType = null, |
1027: | ?string $forEntityProperty = null, |
1028: | ?int $providerId = null |
1029: | ): array { |
1030: | $entityTypeId = $entityType === null |
1031: | ? null |
1032: | : $this->EntityTypeMap[$entityType]; |
1033: | $forEntityTypeId = $forEntityType === null |
1034: | ? null |
1035: | : $this->EntityTypeMap[$forEntityType]; |
1036: | |
1037: | $resolved = []; |
1038: | foreach ($this->DeferredRelationships as $provId => $relationshipsByEntTypeId) { |
1039: | if ($providerId !== null && $provId !== $providerId) { |
1040: | continue; |
1041: | } |
1042: | foreach ($relationshipsByEntTypeId as $entTypeId => $relationshipsByForEntTypeId) { |
1043: | if ($entityTypeId !== null && $entTypeId !== $entityTypeId) { |
1044: | continue; |
1045: | } |
1046: | foreach ($relationshipsByForEntTypeId as $forEntTypeId => $relationshipsByForEntProp) { |
1047: | if ($forEntityTypeId !== null && $forEntTypeId !== $forEntityTypeId) { |
1048: | continue; |
1049: | } |
1050: | foreach ($relationshipsByForEntProp as $forEntProp => $relationshipsByForEntId) { |
1051: | if ($forEntityProperty !== null && $forEntProp !== $forEntityProperty) { |
1052: | continue; |
1053: | } |
1054: | foreach ($relationshipsByForEntId as $forEntId => $relationships) { |
1055: | if ($fromCheckpoint !== null) { |
1056: | $_relationships = $relationships; |
1057: | foreach ($_relationships as $index => $deferred) { |
1058: | if ($index < $fromCheckpoint) { |
1059: | unset($_relationships[$index]); |
1060: | } |
1061: | } |
1062: | if (!$_relationships) { |
1063: | continue; |
1064: | } |
1065: | $relationships = $_relationships; |
1066: | } |
1067: | |
1068: | foreach ($relationships as $index => $deferred) { |
1069: | $resolved[] = $deferred->resolve(); |
1070: | unset($this->DeferredRelationships[$provId][$entTypeId][$forEntTypeId][$forEntProp][$forEntId][$index]); |
1071: | } |
1072: | } |
1073: | } |
1074: | } |
1075: | } |
1076: | } |
1077: | |
1078: | return $resolved; |
1079: | } |
1080: | |
1081: | |
1082: | |
1083: | |
1084: | public function checkProviderHeartbeats( |
1085: | int $ttl = 300, |
1086: | bool $failEarly = true, |
1087: | SyncProviderInterface ...$providers |
1088: | ) { |
1089: | $this->check(); |
1090: | |
1091: | if ($providers) { |
1092: | $providers = Arr::unique($providers); |
1093: | } elseif ($this->Providers) { |
1094: | $providers = $this->Providers; |
1095: | } else { |
1096: | return $this; |
1097: | } |
1098: | |
1099: | $failed = []; |
1100: | |
1101: | foreach ($providers as $provider) { |
1102: | $id = $provider->getProviderId(); |
1103: | $name = sprintf('%s [#%d]', $provider->getName(), $id); |
1104: | Console::logProgress('Checking', $name); |
1105: | try { |
1106: | $provider->checkHeartbeat($ttl); |
1107: | Console::log('Heartbeat OK:', $name); |
1108: | } catch (MethodNotImplementedExceptionInterface $ex) { |
1109: | Console::log('Heartbeat check not supported:', $name); |
1110: | } catch (UnreachableBackendExceptionInterface $ex) { |
1111: | Console::exception($ex, Level::DEBUG, null); |
1112: | Console::log('Heartbeat check failed:', $name); |
1113: | $failed[] = $provider; |
1114: | $this->recordError( |
1115: | SyncError::build() |
1116: | ->errorType(ErrorType::BACKEND_UNREACHABLE) |
1117: | ->message('Heartbeat check failed: %s') |
1118: | ->values([[ |
1119: | 'provider_id' => $id, |
1120: | 'provider_class' => get_class($provider), |
1121: | 'exception' => get_class($ex), |
1122: | 'message' => $ex->getMessage() |
1123: | ]]) |
1124: | ->build() |
1125: | ); |
1126: | } |
1127: | if ($failEarly && $failed) { |
1128: | break; |
1129: | } |
1130: | } |
1131: | |
1132: | if ($failed) { |
1133: | throw new HeartbeatCheckFailedException(...$failed); |
1134: | } |
1135: | |
1136: | return $this; |
1137: | } |
1138: | |
1139: | |
1140: | |
1141: | |
1142: | public function recordError(SyncErrorInterface $error, bool $deduplicate = false) |
1143: | { |
1144: | if ($deduplicate) { |
1145: | $key = $this->Errors->keyOf($error); |
1146: | if ($key !== null) { |
1147: | $this->Errors[$key] = $this->Errors[$key]->count(); |
1148: | return $this; |
1149: | } |
1150: | } |
1151: | |
1152: | $this->Errors[] = $error; |
1153: | |
1154: | return $this; |
1155: | } |
1156: | |
1157: | |
1158: | |
1159: | |
1160: | public function getErrors(): SyncErrorCollectionInterface |
1161: | { |
1162: | return clone $this->Errors; |
1163: | } |
1164: | |
1165: | |
1166: | |
1167: | |
1168: | protected function check() |
1169: | { |
1170: | if ($this->RunId !== null) { |
1171: | return $this; |
1172: | } |
1173: | |
1174: | if (!$this->isCheckRunning()) { |
1175: | return $this->safeCheck(); |
1176: | } |
1177: | |
1178: | $sql = <<<SQL |
1179: | INSERT INTO _sync_run (run_uuid, run_command, run_arguments_json) |
1180: | VALUES ( |
1181: | :run_uuid, |
1182: | :run_command, |
1183: | :run_arguments_json |
1184: | ); |
1185: | SQL; |
1186: | |
1187: | $stmt = $this->prepare($sql); |
1188: | $stmt->bindValue(':run_uuid', $uuid = Get::binaryUuid(), \SQLITE3_BLOB); |
1189: | $stmt->bindValue(':run_command', $this->Command, \SQLITE3_TEXT); |
1190: | $stmt->bindValue(':run_arguments_json', Json::stringify($this->Arguments), \SQLITE3_TEXT); |
1191: | $stmt->execute(); |
1192: | $stmt->close(); |
1193: | |
1194: | $id = $this->db()->lastInsertRowID(); |
1195: | $this->RunId = $id; |
1196: | $this->RunUuid = $uuid; |
1197: | unset($this->Command, $this->Arguments); |
1198: | |
1199: | foreach ($this->DeferredProviders as $provider) { |
1200: | $this->registerProvider($provider); |
1201: | } |
1202: | unset($this->DeferredProviders); |
1203: | |
1204: | foreach ($this->DeferredEntityTypes as $entity) { |
1205: | $this->registerEntityType($entity); |
1206: | } |
1207: | unset($this->DeferredEntityTypes); |
1208: | |
1209: | foreach ($this->DeferredNamespaces as $prefix => [$uri, $namespace, $helper]) { |
1210: | $this->registerNamespace($prefix, $uri, $namespace, $helper); |
1211: | } |
1212: | unset($this->DeferredNamespaces); |
1213: | |
1214: | return $this->reload(); |
1215: | } |
1216: | |
1217: | |
1218: | |
1219: | |
1220: | private function reload() |
1221: | { |
1222: | $db = $this->db(); |
1223: | $sql = <<<SQL |
1224: | SELECT |
1225: | entity_namespace_prefix, |
1226: | base_uri, |
1227: | php_namespace |
1228: | FROM |
1229: | _sync_entity_namespace |
1230: | ORDER BY |
1231: | LENGTH(php_namespace) DESC; |
1232: | SQL; |
1233: | $stmt = $this->prepare($sql); |
1234: | $result = $this->execute($stmt); |
1235: | $this->NamespacesByPrefix = []; |
1236: | $this->NamespaceUrisByPrefix = []; |
1237: | while (($row = $result->fetchArray(\SQLITE3_NUM)) !== false) { |
1238: | |
1239: | $this->NamespacesByPrefix[$row[0]] = $row[2]; |
1240: | $this->NamespaceUrisByPrefix[$row[0]] = $row[1]; |
1241: | } |
1242: | $result->finalize(); |
1243: | $stmt->close(); |
1244: | |
1245: | return $this; |
1246: | } |
1247: | |
1248: | |
1249: | |
1250: | |
1251: | public function __destruct() |
1252: | { |
1253: | $exitStatus = -1; |
1254: | if (Err::isLoaded() && Err::isShuttingDown()) { |
1255: | $exitStatus = Err::getExitStatus(); |
1256: | } |
1257: | $this->close($exitStatus); |
1258: | } |
1259: | } |
1260: | |