1: | <?php declare(strict_types=1); |
2: | |
3: | namespace Salient\Sync; |
4: | |
5: | use Salient\Contract\Core\Exception\MethodNotImplementedExceptionInterface; |
6: | use Salient\Contract\Core\MessageLevel as Level; |
7: | use Salient\Contract\Sync\Exception\UnreachableBackendExceptionInterface; |
8: | use Salient\Contract\Sync\DeferralPolicy; |
9: | use Salient\Contract\Sync\DeferredEntityInterface; |
10: | use Salient\Contract\Sync\DeferredRelationshipInterface; |
11: | use Salient\Contract\Sync\ErrorType; |
12: | use Salient\Contract\Sync\HydrationPolicy; |
13: | use Salient\Contract\Sync\SyncEntityInterface; |
14: | use Salient\Contract\Sync\SyncErrorCollectionInterface; |
15: | use Salient\Contract\Sync\SyncErrorInterface; |
16: | use Salient\Contract\Sync\SyncNamespaceHelperInterface; |
17: | use Salient\Contract\Sync\SyncProviderInterface; |
18: | use Salient\Contract\Sync\SyncStoreInterface; |
19: | use Salient\Core\Facade\Console; |
20: | use Salient\Core\Facade\Err; |
21: | use Salient\Core\Facade\Event; |
22: | use Salient\Core\AbstractStore; |
23: | use Salient\Sync\Event\SyncStoreLoadedEvent; |
24: | use Salient\Sync\Exception\HeartbeatCheckFailedException; |
25: | use Salient\Sync\Exception\SyncStoreException; |
26: | use Salient\Sync\Support\SyncErrorCollection; |
27: | use Salient\Utility\Arr; |
28: | use Salient\Utility\Get; |
29: | use Salient\Utility\Json; |
30: | use Salient\Utility\Regex; |
31: | use Salient\Utility\Str; |
32: | use Generator; |
33: | use InvalidArgumentException; |
34: | use LogicException; |
35: | use ReflectionClass; |
36: | |
37: | |
38: | |
39: | |
40: | |
41: | |
42: | |
43: | |
44: | |
45: | final class SyncStore extends AbstractStore implements SyncStoreInterface |
46: | { |
47: | private ?int $RunId = null; |
48: | private string $RunUuid; |
49: | |
50: | |
51: | |
52: | |
53: | |
54: | |
55: | private array $Namespaces = []; |
56: | |
57: | |
58: | |
59: | |
60: | |
61: | |
62: | private array $Providers = []; |
63: | |
64: | |
65: | |
66: | |
67: | |
68: | |
69: | private array $ProviderMap = []; |
70: | |
71: | |
72: | |
73: | |
74: | |
75: | |
76: | private array $EntityTypes = []; |
77: | |
78: | |
79: | |
80: | |
81: | |
82: | |
83: | private array $EntityTypeMap = []; |
84: | |
85: | |
86: | |
87: | |
88: | |
89: | |
90: | private array $NamespacesByPrefix; |
91: | |
92: | |
93: | |
94: | |
95: | |
96: | |
97: | private array $NamespaceUrisByPrefix; |
98: | |
99: | |
100: | |
101: | |
102: | |
103: | |
104: | private array $NamespaceHelpersByPrefix; |
105: | |
106: | |
107: | |
108: | |
109: | |
110: | |
111: | private array $Entities; |
112: | |
113: | |
114: | |
115: | |
116: | |
117: | |
118: | private array $EntityCheckpoints; |
119: | |
120: | |
121: | |
122: | |
123: | |
124: | |
125: | private array $DeferredEntities = []; |
126: | |
127: | |
128: | |
129: | |
130: | |
131: | |
132: | |
133: | private array $DeferredRelationships = []; |
134: | |
135: | private SyncErrorCollection $Errors; |
136: | private int $DeferralCheckpoint = 0; |
137: | private string $Command; |
138: | |
139: | private array $Arguments; |
140: | |
141: | private array $DeferredNamespaces = []; |
142: | |
143: | private array $DeferredProviders = []; |
144: | |
145: | private array $DeferredEntityTypes = []; |
146: | |
147: | |
148: | |
149: | |
150: | |
151: | |
152: | |
153: | |
154: | public function __construct( |
155: | string $filename = ':memory:', |
156: | string $command = '', |
157: | array $arguments = [] |
158: | ) { |
159: | $this->assertCanUpsert(); |
160: | |
161: | $this->Errors = new SyncErrorCollection(); |
162: | $this->Command = $command; |
163: | $this->Arguments = $arguments; |
164: | |
165: | $this->openDb( |
166: | $filename, |
167: | <<<SQL |
168: | CREATE TABLE IF NOT EXISTS |
169: | _sync_run ( |
170: | run_id INTEGER NOT NULL PRIMARY KEY, |
171: | run_uuid BLOB NOT NULL UNIQUE, |
172: | run_command TEXT NOT NULL, |
173: | run_arguments_json TEXT NOT NULL, |
174: | started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
175: | finished_at DATETIME, |
176: | exit_status INTEGER, |
177: | error_count INTEGER, |
178: | warning_count INTEGER, |
179: | errors_json TEXT |
180: | ); |
181: | |
182: | CREATE TABLE IF NOT EXISTS |
183: | _sync_provider ( |
184: | provider_id INTEGER NOT NULL PRIMARY KEY, |
185: | provider_hash BLOB NOT NULL UNIQUE, |
186: | provider_class TEXT NOT NULL, |
187: | added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
188: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP |
189: | ); |
190: | |
191: | CREATE TABLE IF NOT EXISTS |
192: | _sync_entity_type ( |
193: | entity_type_id INTEGER NOT NULL PRIMARY KEY, |
194: | entity_type_class TEXT NOT NULL UNIQUE, |
195: | added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
196: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP |
197: | ); |
198: | |
199: | CREATE TABLE IF NOT EXISTS |
200: | _sync_entity_type_state ( |
201: | provider_id INTEGER NOT NULL, |
202: | entity_type_id INTEGER NOT NULL, |
203: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP, |
204: | last_sync DATETIME, |
205: | PRIMARY KEY (provider_id, entity_type_id), |
206: | FOREIGN KEY (provider_id) REFERENCES _sync_provider, |
207: | FOREIGN KEY (entity_type_id) REFERENCES _sync_entity_type |
208: | ); |
209: | |
210: | CREATE TABLE IF NOT EXISTS |
211: | _sync_entity ( |
212: | provider_id INTEGER NOT NULL, |
213: | entity_type_id INTEGER NOT NULL, |
214: | entity_id TEXT NOT NULL, |
215: | canonical_id TEXT, |
216: | is_dirty INTEGER NOT NULL DEFAULT 0, |
217: | is_deleted INTEGER NOT NULL DEFAULT 0, |
218: | added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
219: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP, |
220: | last_sync DATETIME, |
221: | entity_json TEXT NOT NULL, |
222: | PRIMARY KEY (provider_id, entity_type_id, entity_id), |
223: | FOREIGN KEY (provider_id) REFERENCES _sync_provider, |
224: | FOREIGN KEY (entity_type_id) REFERENCES _sync_entity_type |
225: | ) WITHOUT ROWID; |
226: | |
227: | CREATE TABLE IF NOT EXISTS |
228: | _sync_entity_namespace ( |
229: | entity_namespace_id INTEGER NOT NULL PRIMARY KEY, |
230: | entity_namespace_prefix TEXT NOT NULL UNIQUE, |
231: | base_uri TEXT NOT NULL, |
232: | php_namespace TEXT NOT NULL, |
233: | added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, |
234: | last_seen DATETIME DEFAULT CURRENT_TIMESTAMP |
235: | ); |
236: | |
237: | SQL |
238: | ); |
239: | |
240: | Event::dispatch(new SyncStoreLoadedEvent($this)); |
241: | } |
242: | |
243: | |
244: | |
245: | |
246: | public function close(int $exitStatus = 0): void |
247: | { |
248: | |
249: | if (!$this->isOpen() || $this->RunId === null) { |
250: | $this->closeDb(); |
251: | return; |
252: | } |
253: | |
254: | $sql = <<<SQL |
255: | UPDATE |
256: | _sync_run |
257: | SET |
258: | finished_at = CURRENT_TIMESTAMP, |
259: | exit_status = :exit_status, |
260: | error_count = :error_count, |
261: | warning_count = :warning_count, |
262: | errors_json = :errors_json |
263: | WHERE |
264: | run_uuid = :run_uuid; |
265: | SQL; |
266: | |
267: | $stmt = $this->prepare($sql); |
268: | $stmt->bindValue(':exit_status', $exitStatus, \SQLITE3_INTEGER); |
269: | $stmt->bindValue(':run_uuid', $this->RunUuid, \SQLITE3_BLOB); |
270: | $stmt->bindValue(':error_count', $this->Errors->getErrorCount(), \SQLITE3_INTEGER); |
271: | $stmt->bindValue(':warning_count', $this->Errors->getWarningCount(), \SQLITE3_INTEGER); |
272: | $stmt->bindValue(':errors_json', Json::stringify($this->Errors), \SQLITE3_TEXT); |
273: | $stmt->execute(); |
274: | $stmt->close(); |
275: | |
276: | $this->closeDb(); |
277: | } |
278: | |
279: | |
280: | |
281: | |
282: | public function runHasStarted(): bool |
283: | { |
284: | return $this->RunId !== null; |
285: | } |
286: | |
287: | |
288: | |
289: | |
290: | public function getRunId(): int |
291: | { |
292: | $this->assertRunHasStarted(); |
293: | |
294: | return $this->RunId; |
295: | } |
296: | |
297: | |
298: | |
299: | |
300: | public function getRunUuid(): string |
301: | { |
302: | $this->assertRunHasStarted(); |
303: | |
304: | return Get::uuid($this->RunUuid); |
305: | } |
306: | |
307: | |
308: | |
309: | |
310: | public function getBinaryRunUuid(): string |
311: | { |
312: | $this->assertRunHasStarted(); |
313: | |
314: | return $this->RunUuid; |
315: | } |
316: | |
317: | |
318: | |
319: | |
320: | private function assertRunHasStarted(): void |
321: | { |
322: | if ($this->RunId === null) { |
323: | throw new LogicException('Run has not started'); |
324: | } |
325: | } |
326: | |
327: | |
328: | |
329: | |
330: | public function registerProvider(SyncProviderInterface $provider) |
331: | { |
332: | |
333: | if (!$this->runHasStarted()) { |
334: | $this->DeferredProviders[] = $provider; |
335: | return $this; |
336: | } |
337: | |
338: | $class = get_class($provider); |
339: | $hash = $this->getProviderSignature($provider); |
340: | |
341: | if (isset($this->ProviderMap[$hash])) { |
342: | throw new LogicException(sprintf( |
343: | 'Provider already registered: %s', |
344: | $class, |
345: | )); |
346: | } |
347: | |
348: | |
349: | $sql = <<<SQL |
350: | INSERT INTO |
351: | _sync_provider (provider_hash, provider_class) |
352: | VALUES |
353: | (:provider_hash, :provider_class) ON CONFLICT (provider_hash) DO |
354: | UPDATE |
355: | SET |
356: | last_seen = CURRENT_TIMESTAMP; |
357: | SQL; |
358: | $stmt = $this->prepare($sql); |
359: | $stmt->bindValue(':provider_hash', $hash, \SQLITE3_BLOB); |
360: | $stmt->bindValue(':provider_class', $class, \SQLITE3_TEXT); |
361: | $stmt->execute(); |
362: | $stmt->close(); |
363: | |
364: | $sql = <<<SQL |
365: | SELECT |
366: | provider_id |
367: | FROM |
368: | _sync_provider |
369: | WHERE |
370: | provider_hash = :provider_hash; |
371: | SQL; |
372: | $stmt = $this->prepare($sql); |
373: | $stmt->bindValue(':provider_hash', $hash, \SQLITE3_BLOB); |
374: | $result = $this->execute($stmt); |
375: | |
376: | $row = $result->fetchArray(\SQLITE3_NUM); |
377: | $stmt->close(); |
378: | |
379: | if ($row === false) { |
380: | |
381: | throw new SyncStoreException('Error retrieving provider ID'); |
382: | |
383: | } |
384: | |
385: | $providerId = $row[0]; |
386: | $this->Providers[$providerId] = $provider; |
387: | $this->ProviderMap[$hash] = $providerId; |
388: | |
389: | return $this; |
390: | } |
391: | |
392: | |
393: | |
394: | |
395: | public function hasProvider($provider): bool |
396: | { |
397: | if ($provider instanceof SyncProviderInterface) { |
398: | $providers = $this->runHasStarted() |
399: | ? $this->Providers |
400: | : $this->DeferredProviders; |
401: | return in_array($provider, $providers, true); |
402: | } |
403: | |
404: | if (!$this->runHasStarted()) { |
405: | foreach ($this->DeferredProviders as $deferred) { |
406: | if ($this->getProviderSignature($deferred) === $provider) { |
407: | return true; |
408: | } |
409: | } |
410: | return false; |
411: | } |
412: | |
413: | return isset($this->ProviderMap[$provider]); |
414: | } |
415: | |
416: | |
417: | |
418: | |
419: | public function getProviderId($provider): int |
420: | { |
421: | if (!$this->runHasStarted()) { |
422: | $this->check(); |
423: | } |
424: | |
425: | $hash = $provider instanceof SyncProviderInterface |
426: | ? $this->getProviderSignature($provider) |
427: | : $provider; |
428: | $id = $this->ProviderMap[$hash] ?? null; |
429: | if ($id === null) { |
430: | throw new LogicException('Provider not registered' |
431: | . ($provider instanceof SyncProviderInterface |
432: | ? sprintf(': %s', get_class($provider)) |
433: | : '')); |
434: | } |
435: | return $id; |
436: | } |
437: | |
438: | |
439: | |
440: | |
441: | public function getProvider($provider): SyncProviderInterface |
442: | { |
443: | if (is_int($provider)) { |
444: | if (!$this->runHasStarted()) { |
445: | throw new LogicException(sprintf( |
446: | 'Provider ID not issued during run: %d', |
447: | $provider, |
448: | )); |
449: | } |
450: | if (!isset($this->Providers[$provider])) { |
451: | throw new LogicException(sprintf( |
452: | 'Provider not registered: #%d', |
453: | $provider, |
454: | )); |
455: | } |
456: | return $this->Providers[$provider]; |
457: | } |
458: | |
459: | |
460: | if (!$this->runHasStarted()) { |
461: | foreach ($this->DeferredProviders as $deferred) { |
462: | if ($this->getProviderSignature($deferred) === $provider) { |
463: | return $deferred; |
464: | } |
465: | } |
466: | throw new LogicException('Provider not registered'); |
467: | } |
468: | |
469: | $id = $this->ProviderMap[$provider] ?? null; |
470: | if ($id === null) { |
471: | throw new LogicException('Provider not registered'); |
472: | } |
473: | return $this->Providers[$id]; |
474: | } |
475: | |
476: | |
477: | |
478: | |
479: | public function getProviderSignature(SyncProviderInterface $provider): string |
480: | { |
481: | $class = get_class($provider); |
482: | return Get::binaryHash(implode("\0", [ |
483: | $class, |
484: | ...$provider->getBackendIdentifier(), |
485: | ])); |
486: | } |
487: | |
488: | |
489: | |
490: | |
491: | public function registerEntityType(string $entityType) |
492: | { |
493: | |
494: | if (!$this->runHasStarted()) { |
495: | $this->DeferredEntityTypes[] = $entityType; |
496: | return $this; |
497: | } |
498: | |
499: | if (isset($this->EntityTypeMap[$entityType])) { |
500: | return $this; |
501: | } |
502: | |
503: | $class = new ReflectionClass($entityType); |
504: | |
505: | if ($entityType !== $class->getName()) { |
506: | throw new LogicException(sprintf( |
507: | 'Not an exact match for declared class (%s expected): %s', |
508: | $class->getName(), |
509: | $entityType, |
510: | )); |
511: | } |
512: | |
513: | if (!$class->implementsInterface(SyncEntityInterface::class)) { |
514: | |
515: | throw new LogicException(sprintf( |
516: | 'Does not implement %s: %s', |
517: | SyncEntityInterface::class, |
518: | $entityType, |
519: | )); |
520: | |
521: | } |
522: | |
523: | |
524: | $sql = <<<SQL |
525: | INSERT INTO |
526: | _sync_entity_type (entity_type_class) |
527: | VALUES |
528: | (:entity_type_class) ON CONFLICT (entity_type_class) DO |
529: | UPDATE |
530: | SET |
531: | last_seen = CURRENT_TIMESTAMP; |
532: | SQL; |
533: | $stmt = $this->prepare($sql); |
534: | $stmt->bindValue(':entity_type_class', $entityType, \SQLITE3_TEXT); |
535: | $stmt->execute(); |
536: | $stmt->close(); |
537: | |
538: | $sql = <<<SQL |
539: | SELECT |
540: | entity_type_id |
541: | FROM |
542: | _sync_entity_type |
543: | WHERE |
544: | entity_type_class = :entity_type_class; |
545: | SQL; |
546: | $stmt = $this->prepare($sql); |
547: | $stmt->bindValue(':entity_type_class', $entityType, \SQLITE3_TEXT); |
548: | $result = $this->execute($stmt); |
549: | |
550: | $row = $result->fetchArray(\SQLITE3_NUM); |
551: | $stmt->close(); |
552: | |
553: | if ($row === false) { |
554: | throw new SyncStoreException('Error retrieving entity type ID'); |
555: | } |
556: | |
557: | $this->EntityTypes[$row[0]] = $entityType; |
558: | $this->EntityTypeMap[$entityType] = $row[0]; |
559: | |
560: | return $this; |
561: | } |
562: | |
563: | |
564: | |
565: | |
566: | public function hasEntityType(string $entityType): bool |
567: | { |
568: | if (!$this->runHasStarted()) { |
569: | return in_array($entityType, $this->DeferredEntityTypes, true); |
570: | } |
571: | |
572: | return isset($this->EntityTypeMap[$entityType]); |
573: | } |
574: | |
575: | |
576: | |
577: | |
578: | public function getEntityTypeId(string $entityType): int |
579: | { |
580: | if (!$this->runHasStarted()) { |
581: | $this->check(); |
582: | } |
583: | |
584: | $id = $this->EntityTypeMap[$entityType] ?? null; |
585: | if ($id === null) { |
586: | throw new LogicException(sprintf( |
587: | 'Entity not registered: %s', |
588: | $entityType, |
589: | )); |
590: | } |
591: | return $id; |
592: | } |
593: | |
594: | |
595: | |
596: | |
597: | public function getEntityType(int $entityTypeId): string |
598: | { |
599: | if (!$this->runHasStarted()) { |
600: | throw new LogicException(sprintf( |
601: | 'Entity type ID not issued during run: %d', |
602: | $entityTypeId, |
603: | )); |
604: | } |
605: | if (!isset($this->EntityTypes[$entityTypeId])) { |
606: | throw new LogicException(sprintf( |
607: | 'Entity type not registered: #%d', |
608: | $entityTypeId, |
609: | )); |
610: | } |
611: | return $this->EntityTypes[$entityTypeId]; |
612: | } |
613: | |
614: | |
615: | |
616: | |
617: | public function registerNamespace( |
618: | string $prefix, |
619: | string $uri, |
620: | string $namespace, |
621: | ?SyncNamespaceHelperInterface $helper = null |
622: | ) { |
623: | $prefix = Str::lower($prefix); |
624: | if (isset($this->Namespaces[$prefix]) || ( |
625: | !$this->runHasStarted() |
626: | && isset($this->DeferredNamespaces[$prefix]) |
627: | )) { |
628: | throw new LogicException(sprintf( |
629: | 'Prefix already registered: %s', |
630: | $prefix, |
631: | )); |
632: | } |
633: | |
634: | |
635: | |
636: | |
637: | if ( |
638: | !isset($this->DeferredNamespaces) |
639: | || !isset($this->DeferredNamespaces[$prefix]) |
640: | ) { |
641: | if (!Regex::match('/^[a-z][-a-z0-9+.]*$/iD', $prefix)) { |
642: | throw new InvalidArgumentException(sprintf( |
643: | 'Invalid prefix: %s', |
644: | $prefix, |
645: | )); |
646: | } |
647: | $uri = rtrim($uri, '/') . '/'; |
648: | $namespace = trim($namespace, '\\') . '\\'; |
649: | } |
650: | |
651: | |
652: | if (!$this->runHasStarted()) { |
653: | $this->DeferredNamespaces[$prefix] = [$uri, $namespace, $helper]; |
654: | return $this; |
655: | } |
656: | |
657: | |
658: | $sql = <<<SQL |
659: | INSERT INTO |
660: | _sync_entity_namespace (entity_namespace_prefix, base_uri, php_namespace) |
661: | VALUES |
662: | ( |
663: | :entity_namespace_prefix, |
664: | :base_uri, |
665: | :php_namespace |
666: | ) ON CONFLICT (entity_namespace_prefix) DO |
667: | UPDATE |
668: | SET |
669: | base_uri = excluded.base_uri, |
670: | php_namespace = excluded.php_namespace, |
671: | last_seen = CURRENT_TIMESTAMP; |
672: | SQL; |
673: | $stmt = $this->prepare($sql); |
674: | $stmt->bindValue(':entity_namespace_prefix', $prefix, \SQLITE3_TEXT); |
675: | $stmt->bindValue(':base_uri', $uri, \SQLITE3_TEXT); |
676: | $stmt->bindValue(':php_namespace', $namespace, \SQLITE3_TEXT); |
677: | $stmt->execute(); |
678: | $stmt->close(); |
679: | |
680: | $this->Namespaces[$prefix] = true; |
681: | |
682: | if ($helper) { |
683: | $this->NamespaceHelpersByPrefix[$prefix] = $helper; |
684: | } |
685: | |
686: | |
687: | if (!isset($this->NamespacesByPrefix)) { |
688: | return $this; |
689: | } |
690: | |
691: | return $this->reload(); |
692: | } |
693: | |
694: | |
695: | |
696: | |
697: | public function getEntityTypeUri(string $entityType, bool $compact = true): string |
698: | { |
699: | $prefix = $this->classToNamespace($entityType, $uri, $namespace); |
700: | if ($prefix === null) { |
701: | return SyncUtil::getEntityTypeUri($entityType, $compact); |
702: | } |
703: | |
704: | $entityType = str_replace('\\', '/', substr(ltrim($entityType, '\\'), strlen($namespace))); |
705: | |
706: | return $compact |
707: | ? "{$prefix}:{$entityType}" |
708: | : "{$uri}{$entityType}"; |
709: | } |
710: | |
711: | |
712: | |
713: | |
714: | public function getNamespacePrefix(string $class): ?string |
715: | { |
716: | return $this->classToNamespace($class); |
717: | } |
718: | |
719: | |
720: | |
721: | |
722: | public function getNamespaceHelper(string $class): ?SyncNamespaceHelperInterface |
723: | { |
724: | if ($this->classToNamespace( |
725: | $class, |
726: | $uri, |
727: | $namespace, |
728: | $helper |
729: | ) === null) { |
730: | return null; |
731: | } |
732: | |
733: | return $helper; |
734: | } |
735: | |
736: | |
737: | |
738: | |
739: | |
740: | private function classToNamespace( |
741: | string $class, |
742: | ?string &$uri = null, |
743: | ?string &$namespace = null, |
744: | ?SyncNamespaceHelperInterface &$helper = null |
745: | ): ?string { |
746: | $class = Str::lower(ltrim($class, '\\')); |
747: | |
748: | $namespaces = $this->runHasStarted() |
749: | ? $this->getNamespaces() |
750: | : $this->DeferredNamespaces; |
751: | foreach ($namespaces as $prefix => [$_uri, $_namespace, $_helper]) { |
752: | $_namespace = Str::lower($_namespace); |
753: | if (strpos($class, $_namespace) === 0) { |
754: | $uri = $_uri; |
755: | $namespace = $_namespace; |
756: | $helper = $_helper; |
757: | return $prefix; |
758: | } |
759: | } |
760: | return null; |
761: | } |
762: | |
763: | |
764: | |
765: | |
766: | private function getNamespaces(): Generator |
767: | { |
768: | foreach ($this->NamespacesByPrefix as $prefix => $namespace) { |
769: | yield $prefix => [ |
770: | $this->NamespaceUrisByPrefix[$prefix], |
771: | $namespace, |
772: | $this->NamespaceHelpersByPrefix[$prefix] ?? null, |
773: | ]; |
774: | } |
775: | } |
776: | |
777: | |
778: | |
779: | |
780: | public function setEntity( |
781: | int $providerId, |
782: | string $entityType, |
783: | $entityId, |
784: | SyncEntityInterface $entity |
785: | ) { |
786: | $entityTypeId = $this->EntityTypeMap[$entityType]; |
787: | if (isset($this->Entities[$providerId][$entityTypeId][$entityId])) { |
788: | throw new LogicException('Entity already registered'); |
789: | } |
790: | $this->Entities[$providerId][$entityTypeId][$entityId] = $entity; |
791: | $this->EntityCheckpoints[spl_object_id($entity)] = $this->DeferralCheckpoint++; |
792: | |
793: | |
794: | $deferred = $this->DeferredEntities[$providerId][$entityTypeId][$entityId] ?? null; |
795: | if ($deferred) { |
796: | foreach ($deferred as $i => $deferredEntity) { |
797: | $deferredEntity->replace($entity); |
798: | unset($this->DeferredEntities[$providerId][$entityTypeId][$entityId][$i]); |
799: | } |
800: | unset($this->DeferredEntities[$providerId][$entityTypeId][$entityId]); |
801: | } |
802: | |
803: | return $this; |
804: | } |
805: | |
806: | |
807: | |
808: | |
809: | |
810: | |
811: | |
812: | public function getEntity( |
813: | int $providerId, |
814: | string $entityType, |
815: | $entityId, |
816: | ?bool $offline = null |
817: | ): ?SyncEntityInterface { |
818: | $entityTypeId = $this->EntityTypeMap[$entityType]; |
819: | |
820: | $entity = $this->Entities[$providerId][$entityTypeId][$entityId] ?? null; |
821: | if ($entity || $offline === false) { |
822: | return $entity; |
823: | } |
824: | return null; |
825: | } |
826: | |
827: | |
828: | |
829: | |
830: | |
831: | |
832: | |
833: | public function deferEntity( |
834: | int $providerId, |
835: | string $entityType, |
836: | $entityId, |
837: | DeferredEntityInterface $entity |
838: | ) { |
839: | $entityTypeId = $this->EntityTypeMap[$entityType]; |
840: | |
841: | $_entity = $this->Entities[$providerId][$entityTypeId][$entityId] ?? null; |
842: | if ($_entity) { |
843: | $entity->replace($_entity); |
844: | return $this; |
845: | } |
846: | |
847: | |
848: | |
849: | $context = $entity->getContext(); |
850: | if ($context) { |
851: | $last = $context->getLastEntity(); |
852: | if ($last) { |
853: | $context = $last->getContext(); |
854: | } |
855: | } |
856: | $policy = $context |
857: | ? $context->getDeferralPolicy() |
858: | : null; |
859: | |
860: | $this->DeferredEntities[$providerId][$entityTypeId][$entityId][ |
861: | $this->DeferralCheckpoint++ |
862: | ] = $entity; |
863: | |
864: | |
865: | |
866: | |
867: | |
868: | if ($policy === DeferralPolicy::RESOLVE_EARLY) { |
869: | $entity->resolve(); |
870: | return $this; |
871: | } |
872: | |
873: | return $this; |
874: | } |
875: | |
876: | |
877: | |
878: | |
879: | public function deferRelationship( |
880: | int $providerId, |
881: | string $entityType, |
882: | string $forEntityType, |
883: | string $forEntityProperty, |
884: | $forEntityId, |
885: | DeferredRelationshipInterface $relationship |
886: | ) { |
887: | $entityTypeId = $this->EntityTypeMap[$entityType]; |
888: | $forEntityTypeId = $this->EntityTypeMap[$forEntityType]; |
889: | |
890: | |
891: | $deferredList = &$this->DeferredRelationships[$providerId][$entityTypeId][ |
892: | $forEntityTypeId |
893: | ][$forEntityProperty][$forEntityId]; |
894: | |
895: | if (isset($deferredList)) { |
896: | throw new LogicException('Relationship already registered'); |
897: | } |
898: | |
899: | $deferredList = []; |
900: | |
901: | |
902: | |
903: | $context = $relationship->getContext(); |
904: | if ($context) { |
905: | $last = $context->getLastEntity(); |
906: | if ($last) { |
907: | $context = $last->getContext(); |
908: | } |
909: | } |
910: | $policy = $context |
911: | ? $context->getHydrationPolicy($entityType) |
912: | : 0; |
913: | |
914: | if ($policy === HydrationPolicy::LAZY) { |
915: | return $this; |
916: | } |
917: | |
918: | if ($policy === HydrationPolicy::EAGER) { |
919: | $relationship->resolve(); |
920: | return $this; |
921: | } |
922: | |
923: | $deferredList[$this->DeferralCheckpoint++] = $relationship; |
924: | return $this; |
925: | } |
926: | |
927: | |
928: | |
929: | |
930: | public function resolveDeferrals( |
931: | ?int $fromCheckpoint = null, |
932: | ?string $entityType = null, |
933: | ?int $providerId = null |
934: | ): array { |
935: | $checkpoint = $this->DeferralCheckpoint; |
936: | do { |
937: | |
938: | |
939: | |
940: | $deferred = $this->resolveDeferredRelationships($fromCheckpoint, $entityType, null, null, $providerId); |
941: | if ($deferred) { |
942: | foreach ($deferred as $relationship) { |
943: | foreach ($relationship as $entity) { |
944: | $objectId = spl_object_id($entity); |
945: | if ($this->EntityCheckpoints[$objectId] < $checkpoint) { |
946: | continue; |
947: | } |
948: | $resolved[$objectId] = $entity; |
949: | } |
950: | } |
951: | continue; |
952: | } |
953: | |
954: | $deferred = $this->resolveDeferredEntities($fromCheckpoint, $entityType, $providerId); |
955: | if (!$deferred) { |
956: | continue; |
957: | } |
958: | foreach ($deferred as $entity) { |
959: | $resolved[spl_object_id($entity)] = $entity; |
960: | } |
961: | } while ($deferred); |
962: | |
963: | return array_values($resolved ?? []); |
964: | } |
965: | |
966: | |
967: | |
968: | |
969: | public function getDeferralCheckpoint(): int |
970: | { |
971: | return $this->DeferralCheckpoint; |
972: | } |
973: | |
974: | |
975: | |
976: | |
977: | public function resolveDeferredEntities( |
978: | ?int $fromCheckpoint = null, |
979: | ?string $entityType = null, |
980: | ?int $providerId = null |
981: | ): array { |
982: | $entityTypeId = $entityType === null |
983: | ? null |
984: | : $this->EntityTypeMap[$entityType]; |
985: | |
986: | $resolved = []; |
987: | foreach ($this->DeferredEntities as $provId => $entitiesByTypeId) { |
988: | if ($providerId !== null && $provId !== $providerId) { |
989: | continue; |
990: | } |
991: | foreach ($entitiesByTypeId as $entTypeId => $entities) { |
992: | if ($entityTypeId !== null && $entTypeId !== $entityTypeId) { |
993: | continue; |
994: | } |
995: | |
996: | if ($fromCheckpoint !== null) { |
997: | $_entities = $entities; |
998: | foreach ($_entities as $entityId => $deferred) { |
999: | foreach ($deferred as $i => $deferredEntity) { |
1000: | if ($i < $fromCheckpoint) { |
1001: | unset($_entities[$entityId][$i]); |
1002: | if (!$_entities[$entityId]) { |
1003: | unset($_entities[$entityId]); |
1004: | } |
1005: | } |
1006: | } |
1007: | } |
1008: | if (!$_entities) { |
1009: | continue; |
1010: | } |
1011: | $entities = $_entities; |
1012: | } |
1013: | |
1014: | |
1015: | foreach ($entities as $entityId => $deferred) { |
1016: | $deferredEntity = reset($deferred); |
1017: | $resolved[] = $deferredEntity->resolve(); |
1018: | } |
1019: | } |
1020: | } |
1021: | |
1022: | return $resolved; |
1023: | } |
1024: | |
1025: | |
1026: | |
1027: | |
1028: | public function resolveDeferredRelationships( |
1029: | ?int $fromCheckpoint = null, |
1030: | ?string $entityType = null, |
1031: | ?string $forEntityType = null, |
1032: | ?string $forEntityProperty = null, |
1033: | ?int $providerId = null |
1034: | ): array { |
1035: | $entityTypeId = $entityType === null |
1036: | ? null |
1037: | : $this->EntityTypeMap[$entityType]; |
1038: | $forEntityTypeId = $forEntityType === null |
1039: | ? null |
1040: | : $this->EntityTypeMap[$forEntityType]; |
1041: | |
1042: | $resolved = []; |
1043: | foreach ($this->DeferredRelationships as $provId => $relationshipsByEntTypeId) { |
1044: | if ($providerId !== null && $provId !== $providerId) { |
1045: | continue; |
1046: | } |
1047: | foreach ($relationshipsByEntTypeId as $entTypeId => $relationshipsByForEntTypeId) { |
1048: | if ($entityTypeId !== null && $entTypeId !== $entityTypeId) { |
1049: | continue; |
1050: | } |
1051: | foreach ($relationshipsByForEntTypeId as $forEntTypeId => $relationshipsByForEntProp) { |
1052: | if ($forEntityTypeId !== null && $forEntTypeId !== $forEntityTypeId) { |
1053: | continue; |
1054: | } |
1055: | foreach ($relationshipsByForEntProp as $forEntProp => $relationshipsByForEntId) { |
1056: | if ($forEntityProperty !== null && $forEntProp !== $forEntityProperty) { |
1057: | continue; |
1058: | } |
1059: | foreach ($relationshipsByForEntId as $forEntId => $relationships) { |
1060: | if ($fromCheckpoint !== null) { |
1061: | $_relationships = $relationships; |
1062: | foreach ($_relationships as $index => $deferred) { |
1063: | if ($index < $fromCheckpoint) { |
1064: | unset($_relationships[$index]); |
1065: | } |
1066: | } |
1067: | if (!$_relationships) { |
1068: | continue; |
1069: | } |
1070: | $relationships = $_relationships; |
1071: | } |
1072: | |
1073: | foreach ($relationships as $index => $deferred) { |
1074: | $resolved[] = $deferred->resolve(); |
1075: | unset($this->DeferredRelationships[$provId][$entTypeId][$forEntTypeId][$forEntProp][$forEntId][$index]); |
1076: | } |
1077: | } |
1078: | } |
1079: | } |
1080: | } |
1081: | } |
1082: | |
1083: | return $resolved; |
1084: | } |
1085: | |
1086: | |
1087: | |
1088: | |
1089: | public function checkProviderHeartbeats( |
1090: | int $ttl = 300, |
1091: | bool $failEarly = true, |
1092: | SyncProviderInterface ...$providers |
1093: | ) { |
1094: | $this->check(); |
1095: | |
1096: | if ($providers) { |
1097: | $providers = Arr::unique($providers); |
1098: | } elseif ($this->Providers) { |
1099: | $providers = $this->Providers; |
1100: | } else { |
1101: | return $this; |
1102: | } |
1103: | |
1104: | $failed = []; |
1105: | |
1106: | foreach ($providers as $provider) { |
1107: | $id = $provider->getProviderId(); |
1108: | $name = sprintf('%s [#%d]', $provider->getName(), $id); |
1109: | Console::logProgress('Checking', $name); |
1110: | try { |
1111: | $provider->checkHeartbeat($ttl); |
1112: | Console::log('Heartbeat OK:', $name); |
1113: | } catch (MethodNotImplementedExceptionInterface $ex) { |
1114: | Console::log('Heartbeat check not supported:', $name); |
1115: | } catch (UnreachableBackendExceptionInterface $ex) { |
1116: | Console::exception($ex, Level::DEBUG, null); |
1117: | Console::log('Heartbeat check failed:', $name); |
1118: | $failed[] = $provider; |
1119: | $this->recordError( |
1120: | SyncError::build() |
1121: | ->errorType(ErrorType::BACKEND_UNREACHABLE) |
1122: | ->message('Heartbeat check failed: %s') |
1123: | ->values([[ |
1124: | 'provider_id' => $id, |
1125: | 'provider_class' => get_class($provider), |
1126: | 'exception' => get_class($ex), |
1127: | 'message' => $ex->getMessage() |
1128: | ]]) |
1129: | ->build() |
1130: | ); |
1131: | } |
1132: | if ($failEarly && $failed) { |
1133: | break; |
1134: | } |
1135: | } |
1136: | |
1137: | if ($failed) { |
1138: | throw new HeartbeatCheckFailedException(...$failed); |
1139: | } |
1140: | |
1141: | return $this; |
1142: | } |
1143: | |
1144: | |
1145: | |
1146: | |
1147: | public function recordError(SyncErrorInterface $error, bool $deduplicate = false) |
1148: | { |
1149: | if ($deduplicate) { |
1150: | $key = $this->Errors->keyOf($error); |
1151: | if ($key !== null) { |
1152: | $this->Errors[$key] = $this->Errors[$key]->count(); |
1153: | return $this; |
1154: | } |
1155: | } |
1156: | |
1157: | $this->Errors[] = $error; |
1158: | |
1159: | return $this; |
1160: | } |
1161: | |
1162: | |
1163: | |
1164: | |
1165: | public function getErrors(): SyncErrorCollectionInterface |
1166: | { |
1167: | return clone $this->Errors; |
1168: | } |
1169: | |
1170: | |
1171: | |
1172: | |
1173: | protected function check() |
1174: | { |
1175: | if ($this->RunId !== null) { |
1176: | return $this; |
1177: | } |
1178: | |
1179: | if (!$this->isCheckRunning()) { |
1180: | return $this->safeCheck(); |
1181: | } |
1182: | |
1183: | $sql = <<<SQL |
1184: | INSERT INTO _sync_run (run_uuid, run_command, run_arguments_json) |
1185: | VALUES ( |
1186: | :run_uuid, |
1187: | :run_command, |
1188: | :run_arguments_json |
1189: | ); |
1190: | SQL; |
1191: | |
1192: | $stmt = $this->prepare($sql); |
1193: | $stmt->bindValue(':run_uuid', $uuid = Get::binaryUuid(), \SQLITE3_BLOB); |
1194: | $stmt->bindValue(':run_command', $this->Command, \SQLITE3_TEXT); |
1195: | $stmt->bindValue(':run_arguments_json', Json::stringify($this->Arguments), \SQLITE3_TEXT); |
1196: | $stmt->execute(); |
1197: | $stmt->close(); |
1198: | |
1199: | $id = $this->db()->lastInsertRowID(); |
1200: | $this->RunId = $id; |
1201: | $this->RunUuid = $uuid; |
1202: | unset($this->Command, $this->Arguments); |
1203: | |
1204: | foreach ($this->DeferredProviders as $provider) { |
1205: | $this->registerProvider($provider); |
1206: | } |
1207: | unset($this->DeferredProviders); |
1208: | |
1209: | foreach ($this->DeferredEntityTypes as $entity) { |
1210: | $this->registerEntityType($entity); |
1211: | } |
1212: | unset($this->DeferredEntityTypes); |
1213: | |
1214: | foreach ($this->DeferredNamespaces as $prefix => [$uri, $namespace, $helper]) { |
1215: | $this->registerNamespace($prefix, $uri, $namespace, $helper); |
1216: | } |
1217: | unset($this->DeferredNamespaces); |
1218: | |
1219: | return $this->reload(); |
1220: | } |
1221: | |
1222: | |
1223: | |
1224: | |
1225: | private function reload() |
1226: | { |
1227: | $db = $this->db(); |
1228: | $sql = <<<SQL |
1229: | SELECT |
1230: | entity_namespace_prefix, |
1231: | base_uri, |
1232: | php_namespace |
1233: | FROM |
1234: | _sync_entity_namespace |
1235: | ORDER BY |
1236: | LENGTH(php_namespace) DESC; |
1237: | SQL; |
1238: | $stmt = $this->prepare($sql); |
1239: | $result = $this->execute($stmt); |
1240: | $this->NamespacesByPrefix = []; |
1241: | $this->NamespaceUrisByPrefix = []; |
1242: | while (($row = $result->fetchArray(\SQLITE3_NUM)) !== false) { |
1243: | |
1244: | $this->NamespacesByPrefix[$row[0]] = $row[2]; |
1245: | $this->NamespaceUrisByPrefix[$row[0]] = $row[1]; |
1246: | } |
1247: | $result->finalize(); |
1248: | $stmt->close(); |
1249: | |
1250: | return $this; |
1251: | } |
1252: | |
1253: | |
1254: | |
1255: | |
1256: | public function __destruct() |
1257: | { |
1258: | $exitStatus = -1; |
1259: | if (Err::isLoaded() && Err::isShuttingDown()) { |
1260: | $exitStatus = Err::getExitStatus(); |
1261: | } |
1262: | $this->close($exitStatus); |
1263: | } |
1264: | } |
1265: | |