1: | <?php declare(strict_types=1); |
2: | |
3: | namespace Salient\Core; |
4: | |
5: | use Salient\Contract\Container\ContainerInterface; |
6: | use Salient\Contract\Core\Pipeline\EntityPipelineInterface; |
7: | use Salient\Contract\Core\Pipeline\PipeInterface; |
8: | use Salient\Contract\Core\Pipeline\PipelineInterface; |
9: | use Salient\Contract\Core\Pipeline\StreamPipelineInterface; |
10: | use Salient\Contract\Core\ListConformity; |
11: | use Salient\Core\Concern\HasChainableMethods; |
12: | use Salient\Core\Concern\HasMutator; |
13: | use Salient\Core\Facade\App; |
14: | use Salient\Utility\Arr; |
15: | use Closure; |
16: | use Generator; |
17: | use LogicException; |
18: | |
19: | |
20: | |
21: | |
22: | |
23: | |
24: | |
25: | |
26: | |
27: | |
28: | |
29: | |
30: | final class Pipeline implements |
31: | PipelineInterface, |
32: | EntityPipelineInterface, |
33: | StreamPipelineInterface |
34: | { |
35: | use HasChainableMethods; |
36: | use HasMutator; |
37: | |
38: | private bool $HasPayload = false; |
39: | private bool $HasStream; |
40: | |
41: | private $Payload; |
42: | |
43: | private $Arg; |
44: | |
45: | private int $PayloadConformity = ListConformity::NONE; |
46: | |
47: | private ?Closure $After = null; |
48: | |
49: | private array $Pipes = []; |
50: | |
51: | private array $KeyMaps = []; |
52: | |
53: | private array $ArrayMappers; |
54: | |
55: | private ?Closure $Then = null; |
56: | |
57: | private ?Closure $CollectThen = null; |
58: | |
59: | private array $Cc = []; |
60: | |
61: | private ?Closure $Unless = null; |
62: | private ?ContainerInterface $Container; |
63: | |
64: | |
65: | |
66: | |
67: | public function __construct(?ContainerInterface $container = null) |
68: | { |
69: | $this->Container = $container; |
70: | } |
71: | |
72: | |
73: | |
74: | |
75: | |
76: | |
77: | public static function create(?ContainerInterface $container = null): PipelineInterface |
78: | { |
79: | return new self($container); |
80: | } |
81: | |
82: | |
83: | |
84: | |
85: | public function __clone() |
86: | { |
87: | unset($this->ArrayMappers); |
88: | } |
89: | |
90: | |
91: | |
92: | |
93: | public function send($payload, $arg = null) |
94: | { |
95: | return $this->withPayload($payload, $arg, false); |
96: | } |
97: | |
98: | |
99: | |
100: | |
101: | public function stream(iterable $payload, $arg = null) |
102: | { |
103: | return $this->withPayload($payload, $arg, true); |
104: | } |
105: | |
106: | |
107: | |
108: | |
109: | |
110: | |
111: | |
112: | |
113: | |
114: | private function withPayload($payload, $arg, bool $stream) |
115: | { |
116: | if ($this->HasPayload) { |
117: | |
118: | throw new LogicException('Payload already set'); |
119: | |
120: | } |
121: | |
122: | |
123: | $pipeline = $this; |
124: | $pipeline = $pipeline |
125: | ->with('HasPayload', true) |
126: | ->with('HasStream', $stream) |
127: | ->with('Payload', $payload) |
128: | ->with('Arg', $arg); |
129: | |
130: | return $pipeline; |
131: | } |
132: | |
133: | |
134: | |
135: | |
136: | public function withConformity($conformity) |
137: | { |
138: | $this->assertHasPayload(); |
139: | |
140: | return $this->with('PayloadConformity', $conformity); |
141: | } |
142: | |
143: | |
144: | |
145: | |
146: | public function getConformity() |
147: | { |
148: | $this->assertHasPayload(); |
149: | |
150: | return $this->PayloadConformity; |
151: | } |
152: | |
153: | |
154: | |
155: | |
156: | public function after(Closure $closure) |
157: | { |
158: | if ($this->After) { |
159: | |
160: | throw new LogicException(static::class . '::after() already applied'); |
161: | |
162: | } |
163: | |
164: | return $this->with('After', $closure); |
165: | } |
166: | |
167: | |
168: | |
169: | |
170: | public function afterIf(Closure $closure) |
171: | { |
172: | return $this->After |
173: | ? $this |
174: | : $this->after($closure); |
175: | } |
176: | |
177: | |
178: | |
179: | |
180: | public function through($pipe) |
181: | { |
182: | return $this->with('Pipes', Arr::push($this->Pipes, $pipe)); |
183: | } |
184: | |
185: | |
186: | |
187: | |
188: | public function throughClosure(Closure $closure) |
189: | { |
190: | return $this->through( |
191: | fn($payload, Closure $next, self $pipeline, $arg) => |
192: | $next($closure($payload, $pipeline, $arg)) |
193: | ); |
194: | } |
195: | |
196: | |
197: | |
198: | |
199: | public function throughKeyMap(array $keyMap, int $flags = ArrayMapper::ADD_UNMAPPED) |
200: | { |
201: | $keyMapKey = count($this->KeyMaps); |
202: | $clone = $this->through( |
203: | function ($payload, Closure $next, self $pipeline) use ($keyMapKey) { |
204: | |
205: | return $next($pipeline->ArrayMappers[$keyMapKey]->map($payload)); |
206: | } |
207: | ); |
208: | $clone->KeyMaps[] = [$keyMap, $flags]; |
209: | return $clone; |
210: | } |
211: | |
212: | |
213: | |
214: | |
215: | public function then(Closure $closure) |
216: | { |
217: | if ($this->Then || $this->CollectThen) { |
218: | throw new LogicException(static::class . '::then() already applied'); |
219: | } |
220: | |
221: | return $this->with('Then', $closure); |
222: | } |
223: | |
224: | |
225: | |
226: | |
227: | public function thenIf(Closure $closure) |
228: | { |
229: | return $this->Then || $this->CollectThen |
230: | ? $this |
231: | : $this->then($closure); |
232: | } |
233: | |
234: | |
235: | |
236: | |
237: | public function collectThen(Closure $closure) |
238: | { |
239: | $this->assertHasStream(); |
240: | |
241: | if ($this->Then || $this->CollectThen) { |
242: | throw new LogicException(static::class . '::then() already applied'); |
243: | } |
244: | |
245: | return $this->with('CollectThen', $closure); |
246: | } |
247: | |
248: | |
249: | |
250: | |
251: | public function collectThenIf(Closure $closure) |
252: | { |
253: | $this->assertHasStream(); |
254: | |
255: | return $this->Then || $this->CollectThen |
256: | ? $this |
257: | : $this->collectThen($closure); |
258: | } |
259: | |
260: | |
261: | |
262: | |
263: | public function cc(Closure $closure) |
264: | { |
265: | return $this->with('Cc', Arr::push($this->Cc, $closure)); |
266: | } |
267: | |
268: | |
269: | |
270: | |
271: | public function unless(Closure $filter) |
272: | { |
273: | $this->assertHasStream(); |
274: | |
275: | if ($this->Unless) { |
276: | throw new LogicException(static::class . '::unless() already applied'); |
277: | } |
278: | |
279: | return $this->with('Unless', $filter); |
280: | } |
281: | |
282: | |
283: | |
284: | |
285: | public function unlessIf(Closure $filter) |
286: | { |
287: | $this->assertHasStream(); |
288: | |
289: | return $this->Unless |
290: | ? $this |
291: | : $this->unless($filter); |
292: | } |
293: | |
294: | |
295: | |
296: | |
297: | public function run() |
298: | { |
299: | $this->assertHasOnePayload(); |
300: | |
301: | $result = $this->getClosure()( |
302: | $this->After |
303: | ? ($this->After)($this->Payload, $this, $this->Arg) |
304: | : $this->Payload |
305: | ); |
306: | |
307: | if ($this->Cc) { |
308: | $this->ccResult($result); |
309: | } |
310: | |
311: | return $result; |
312: | } |
313: | |
314: | |
315: | |
316: | |
317: | public function runInto(PipelineInterface $next) |
318: | { |
319: | return $next->send($this->run(), $this->Arg); |
320: | } |
321: | |
322: | |
323: | |
324: | |
325: | public function start(): Generator |
326: | { |
327: | $this->assertHasStream(); |
328: | |
329: | $closure = $this->getClosure(); |
330: | $results = []; |
331: | foreach ($this->Payload as $key => $payload) { |
332: | $result = $closure( |
333: | $this->After |
334: | ? ($this->After)($payload, $this, $this->Arg) |
335: | : $payload |
336: | ); |
337: | |
338: | if ( |
339: | $this->Unless |
340: | && ($this->Unless)($result, $this, $this->Arg) !== false |
341: | ) { |
342: | continue; |
343: | } |
344: | |
345: | if ($this->CollectThen) { |
346: | $results[$key] = $result; |
347: | continue; |
348: | } |
349: | |
350: | if ($this->Cc) { |
351: | $this->ccResult($result); |
352: | } |
353: | |
354: | yield $key => $result; |
355: | } |
356: | |
357: | if (!$this->CollectThen || !$results) { |
358: | return; |
359: | } |
360: | |
361: | $results = ($this->CollectThen)($results, $this, $this->Arg); |
362: | foreach ($results as $key => $result) { |
363: | if ($this->Cc) { |
364: | $this->ccResult($result); |
365: | } |
366: | |
367: | yield $key => $result; |
368: | } |
369: | } |
370: | |
371: | |
372: | |
373: | |
374: | public function startInto(PipelineInterface $next) |
375: | { |
376: | return $next->stream($this->start(), $this->Arg); |
377: | } |
378: | |
379: | |
380: | |
381: | |
382: | private function ccResult($result): void |
383: | { |
384: | foreach ($this->Cc as $closure) { |
385: | $closure($result, $this, $this->Arg); |
386: | } |
387: | } |
388: | |
389: | private function getClosure(): Closure |
390: | { |
391: | $this->ArrayMappers = []; |
392: | foreach ($this->KeyMaps as [$keyMap, $flags]) { |
393: | $this->ArrayMappers[] = new ArrayMapper($keyMap, $this->PayloadConformity, $flags); |
394: | } |
395: | |
396: | $closure = $this->Then |
397: | ? fn($result) => ($this->Then)($result, $this, $this->Arg) |
398: | : fn($result) => $result; |
399: | |
400: | foreach (array_reverse($this->Pipes) as $pipe) { |
401: | if (is_string($pipe)) { |
402: | $pipe = $this->Container |
403: | ? $this->Container->get($pipe) |
404: | : App::get($pipe); |
405: | |
406: | if (!$pipe instanceof PipeInterface) { |
407: | throw new LogicException(sprintf( |
408: | '%s does not implement %s', |
409: | get_class($pipe), |
410: | PipeInterface::class, |
411: | )); |
412: | } |
413: | } |
414: | |
415: | $closure = fn($payload) => |
416: | $pipe($payload, $closure, $this, $this->Arg); |
417: | } |
418: | |
419: | return $closure; |
420: | } |
421: | |
422: | |
423: | |
424: | |
425: | private function assertHasStream(): void |
426: | { |
427: | $this->assertHasPayload(true); |
428: | } |
429: | |
430: | |
431: | |
432: | |
433: | private function assertHasOnePayload(): void |
434: | { |
435: | $this->assertHasPayload(false); |
436: | } |
437: | |
438: | private function assertHasPayload(?bool $stream = null): void |
439: | { |
440: | if (!$this->HasPayload) { |
441: | throw new LogicException('No payload'); |
442: | } |
443: | if ($stream !== null && $this->HasStream !== $stream) { |
444: | throw new LogicException('Invalid payload'); |
445: | } |
446: | } |
447: | } |
448: | |