1: | <?php declare(strict_types=1); |
2: | |
3: | namespace Salient\Core; |
4: | |
5: | use Salient\Contract\Container\ContainerInterface; |
6: | use Salient\Contract\Core\Pipeline\EntityPipelineInterface; |
7: | use Salient\Contract\Core\Pipeline\PipeInterface; |
8: | use Salient\Contract\Core\Pipeline\PipelineInterface; |
9: | use Salient\Contract\Core\Pipeline\StreamPipelineInterface; |
10: | use Salient\Contract\Core\ArrayMapperFlag; |
11: | use Salient\Contract\Core\ListConformity; |
12: | use Salient\Core\Concern\HasChainableMethods; |
13: | use Salient\Core\Concern\HasMutator; |
14: | use Salient\Core\Facade\App; |
15: | use Salient\Utility\Arr; |
16: | use Closure; |
17: | use Generator; |
18: | use LogicException; |
19: | |
20: | |
21: | |
22: | |
23: | |
24: | |
25: | |
26: | |
27: | |
28: | |
29: | |
30: | |
31: | final class Pipeline implements |
32: | PipelineInterface, |
33: | EntityPipelineInterface, |
34: | StreamPipelineInterface |
35: | { |
36: | use HasChainableMethods; |
37: | use HasMutator; |
38: | |
39: | private bool $HasPayload = false; |
40: | private bool $HasStream; |
41: | |
42: | private $Payload; |
43: | |
44: | private $Arg; |
45: | |
46: | private int $PayloadConformity = ListConformity::NONE; |
47: | |
48: | private ?Closure $After = null; |
49: | |
50: | private array $Pipes = []; |
51: | |
52: | private array $KeyMaps = []; |
53: | |
54: | private array $ArrayMappers; |
55: | |
56: | private ?Closure $Then = null; |
57: | |
58: | private ?Closure $CollectThen = null; |
59: | |
60: | private array $Cc = []; |
61: | |
62: | private ?Closure $Unless = null; |
63: | private ?ContainerInterface $Container; |
64: | |
65: | |
66: | |
67: | |
68: | public function __construct(?ContainerInterface $container = null) |
69: | { |
70: | $this->Container = $container; |
71: | } |
72: | |
73: | |
74: | |
75: | |
76: | |
77: | |
78: | public static function create(?ContainerInterface $container = null): PipelineInterface |
79: | { |
80: | return new self($container); |
81: | } |
82: | |
83: | |
84: | |
85: | |
86: | public function __clone() |
87: | { |
88: | unset($this->ArrayMappers); |
89: | } |
90: | |
91: | |
92: | |
93: | |
94: | public function send($payload, $arg = null) |
95: | { |
96: | return $this->withPayload($payload, $arg, false); |
97: | } |
98: | |
99: | |
100: | |
101: | |
102: | public function stream(iterable $payload, $arg = null) |
103: | { |
104: | return $this->withPayload($payload, $arg, true); |
105: | } |
106: | |
107: | |
108: | |
109: | |
110: | |
111: | |
112: | |
113: | |
114: | |
115: | private function withPayload($payload, $arg, bool $stream) |
116: | { |
117: | if ($this->HasPayload) { |
118: | |
119: | throw new LogicException('Payload already set'); |
120: | |
121: | } |
122: | |
123: | |
124: | $pipeline = $this; |
125: | $pipeline = $pipeline |
126: | ->with('HasPayload', true) |
127: | ->with('HasStream', $stream) |
128: | ->with('Payload', $payload) |
129: | ->with('Arg', $arg); |
130: | |
131: | return $pipeline; |
132: | } |
133: | |
134: | |
135: | |
136: | |
137: | public function withConformity($conformity) |
138: | { |
139: | $this->assertHasPayload(); |
140: | |
141: | return $this->with('PayloadConformity', $conformity); |
142: | } |
143: | |
144: | |
145: | |
146: | |
147: | public function getConformity() |
148: | { |
149: | $this->assertHasPayload(); |
150: | |
151: | return $this->PayloadConformity; |
152: | } |
153: | |
154: | |
155: | |
156: | |
157: | public function after(Closure $closure) |
158: | { |
159: | if ($this->After) { |
160: | |
161: | throw new LogicException(static::class . '::after() already applied'); |
162: | |
163: | } |
164: | |
165: | return $this->with('After', $closure); |
166: | } |
167: | |
168: | |
169: | |
170: | |
171: | public function afterIf(Closure $closure) |
172: | { |
173: | return $this->After |
174: | ? $this |
175: | : $this->after($closure); |
176: | } |
177: | |
178: | |
179: | |
180: | |
181: | public function through($pipe) |
182: | { |
183: | return $this->with('Pipes', Arr::push($this->Pipes, $pipe)); |
184: | } |
185: | |
186: | |
187: | |
188: | |
189: | public function throughClosure(Closure $closure) |
190: | { |
191: | return $this->through( |
192: | fn($payload, Closure $next, self $pipeline, $arg) => |
193: | $next($closure($payload, $pipeline, $arg)) |
194: | ); |
195: | } |
196: | |
197: | |
198: | |
199: | |
200: | public function throughKeyMap(array $keyMap, int $flags = ArrayMapperFlag::ADD_UNMAPPED) |
201: | { |
202: | $keyMapKey = count($this->KeyMaps); |
203: | $clone = $this->through( |
204: | function ($payload, Closure $next, self $pipeline) use ($keyMapKey) { |
205: | |
206: | return $next($pipeline->ArrayMappers[$keyMapKey]->map($payload)); |
207: | } |
208: | ); |
209: | $clone->KeyMaps[] = [$keyMap, $flags]; |
210: | return $clone; |
211: | } |
212: | |
213: | |
214: | |
215: | |
216: | public function then(Closure $closure) |
217: | { |
218: | if ($this->Then || $this->CollectThen) { |
219: | throw new LogicException(static::class . '::then() already applied'); |
220: | } |
221: | |
222: | return $this->with('Then', $closure); |
223: | } |
224: | |
225: | |
226: | |
227: | |
228: | public function thenIf(Closure $closure) |
229: | { |
230: | return $this->Then || $this->CollectThen |
231: | ? $this |
232: | : $this->then($closure); |
233: | } |
234: | |
235: | |
236: | |
237: | |
238: | public function collectThen(Closure $closure) |
239: | { |
240: | $this->assertHasStream(); |
241: | |
242: | if ($this->Then || $this->CollectThen) { |
243: | throw new LogicException(static::class . '::then() already applied'); |
244: | } |
245: | |
246: | return $this->with('CollectThen', $closure); |
247: | } |
248: | |
249: | |
250: | |
251: | |
252: | public function collectThenIf(Closure $closure) |
253: | { |
254: | $this->assertHasStream(); |
255: | |
256: | return $this->Then || $this->CollectThen |
257: | ? $this |
258: | : $this->collectThen($closure); |
259: | } |
260: | |
261: | |
262: | |
263: | |
264: | public function cc(Closure $closure) |
265: | { |
266: | return $this->with('Cc', Arr::push($this->Cc, $closure)); |
267: | } |
268: | |
269: | |
270: | |
271: | |
272: | public function unless(Closure $filter) |
273: | { |
274: | $this->assertHasStream(); |
275: | |
276: | if ($this->Unless) { |
277: | throw new LogicException(static::class . '::unless() already applied'); |
278: | } |
279: | |
280: | return $this->with('Unless', $filter); |
281: | } |
282: | |
283: | |
284: | |
285: | |
286: | public function unlessIf(Closure $filter) |
287: | { |
288: | $this->assertHasStream(); |
289: | |
290: | return $this->Unless |
291: | ? $this |
292: | : $this->unless($filter); |
293: | } |
294: | |
295: | |
296: | |
297: | |
298: | public function run() |
299: | { |
300: | $this->assertHasOnePayload(); |
301: | |
302: | $result = $this->getClosure()( |
303: | $this->After |
304: | ? ($this->After)($this->Payload, $this, $this->Arg) |
305: | : $this->Payload |
306: | ); |
307: | |
308: | if ($this->Cc) { |
309: | $this->ccResult($result); |
310: | } |
311: | |
312: | return $result; |
313: | } |
314: | |
315: | |
316: | |
317: | |
318: | public function runInto(PipelineInterface $next) |
319: | { |
320: | return $next->send($this->run(), $this->Arg); |
321: | } |
322: | |
323: | |
324: | |
325: | |
326: | public function start(): Generator |
327: | { |
328: | $this->assertHasStream(); |
329: | |
330: | $closure = $this->getClosure(); |
331: | $results = []; |
332: | foreach ($this->Payload as $key => $payload) { |
333: | $result = $closure( |
334: | $this->After |
335: | ? ($this->After)($payload, $this, $this->Arg) |
336: | : $payload |
337: | ); |
338: | |
339: | if ( |
340: | $this->Unless |
341: | && ($this->Unless)($result, $this, $this->Arg) !== false |
342: | ) { |
343: | continue; |
344: | } |
345: | |
346: | if ($this->CollectThen) { |
347: | $results[$key] = $result; |
348: | continue; |
349: | } |
350: | |
351: | if ($this->Cc) { |
352: | $this->ccResult($result); |
353: | } |
354: | |
355: | yield $key => $result; |
356: | } |
357: | |
358: | if (!$this->CollectThen || !$results) { |
359: | return; |
360: | } |
361: | |
362: | $results = ($this->CollectThen)($results, $this, $this->Arg); |
363: | foreach ($results as $key => $result) { |
364: | if ($this->Cc) { |
365: | $this->ccResult($result); |
366: | } |
367: | |
368: | yield $key => $result; |
369: | } |
370: | } |
371: | |
372: | |
373: | |
374: | |
375: | public function startInto(PipelineInterface $next) |
376: | { |
377: | return $next->stream($this->start(), $this->Arg); |
378: | } |
379: | |
380: | |
381: | |
382: | |
383: | private function ccResult($result): void |
384: | { |
385: | foreach ($this->Cc as $closure) { |
386: | $closure($result, $this, $this->Arg); |
387: | } |
388: | } |
389: | |
390: | private function getClosure(): Closure |
391: | { |
392: | $this->ArrayMappers = []; |
393: | foreach ($this->KeyMaps as [$keyMap, $flags]) { |
394: | $this->ArrayMappers[] = new ArrayMapper($keyMap, $this->PayloadConformity, $flags); |
395: | } |
396: | |
397: | $closure = $this->Then |
398: | ? fn($result) => ($this->Then)($result, $this, $this->Arg) |
399: | : fn($result) => $result; |
400: | |
401: | foreach (array_reverse($this->Pipes) as $pipe) { |
402: | if (is_string($pipe)) { |
403: | $pipe = $this->Container |
404: | ? $this->Container->get($pipe) |
405: | : App::get($pipe); |
406: | |
407: | if (!$pipe instanceof PipeInterface) { |
408: | throw new LogicException(sprintf( |
409: | '%s does not implement %s', |
410: | get_class($pipe), |
411: | PipeInterface::class, |
412: | )); |
413: | } |
414: | } |
415: | |
416: | $closure = fn($payload) => |
417: | $pipe($payload, $closure, $this, $this->Arg); |
418: | } |
419: | |
420: | return $closure; |
421: | } |
422: | |
423: | |
424: | |
425: | |
426: | private function assertHasStream(): void |
427: | { |
428: | $this->assertHasPayload(true); |
429: | } |
430: | |
431: | |
432: | |
433: | |
434: | private function assertHasOnePayload(): void |
435: | { |
436: | $this->assertHasPayload(false); |
437: | } |
438: | |
439: | private function assertHasPayload(?bool $stream = null): void |
440: | { |
441: | if (!$this->HasPayload) { |
442: | throw new LogicException('No payload'); |
443: | } |
444: | if ($stream !== null && $this->HasStream !== $stream) { |
445: | throw new LogicException('Invalid payload'); |
446: | } |
447: | } |
448: | } |
449: | |