1: | <?php declare(strict_types=1); |
2: | |
3: | namespace Salient\Core; |
4: | |
5: | use Salient\Contract\Core\Pipeline\EntityPipelineInterface; |
6: | use Salient\Contract\Core\Pipeline\PipelineInterface; |
7: | use Salient\Contract\Core\Pipeline\StreamPipelineInterface; |
8: | use Salient\Core\Concern\ChainableTrait; |
9: | use Salient\Core\Concern\ImmutableTrait; |
10: | use Closure; |
11: | use LogicException; |
12: | |
13: | |
14: | |
15: | |
16: | |
17: | |
18: | |
19: | |
20: | |
21: | |
22: | |
23: | |
24: | final class Pipeline implements |
25: | PipelineInterface, |
26: | EntityPipelineInterface, |
27: | StreamPipelineInterface |
28: | { |
29: | use ChainableTrait; |
30: | use ImmutableTrait; |
31: | |
32: | private bool $HasPayload = false; |
33: | private bool $PayloadIsStream; |
34: | |
35: | private $Payload; |
36: | |
37: | private $Arg; |
38: | |
39: | private int $Conformity = self::CONFORMITY_NONE; |
40: | |
41: | private ?Closure $After = null; |
42: | |
43: | private array $Pipes = []; |
44: | |
45: | private array $KeyMaps = []; |
46: | |
47: | private array $ArrayMappers; |
48: | |
49: | private ?Closure $Then = null; |
50: | |
51: | private ?Closure $CollectThen = null; |
52: | |
53: | private array $Cc = []; |
54: | |
55: | private ?Closure $Unless = null; |
56: | |
57: | |
58: | |
59: | |
60: | |
61: | |
62: | public static function create(): PipelineInterface |
63: | { |
64: | return new self(); |
65: | } |
66: | |
67: | |
68: | |
69: | |
70: | public function __clone() |
71: | { |
72: | unset($this->ArrayMappers); |
73: | } |
74: | |
75: | |
76: | |
77: | |
78: | public function send($payload, $arg = null) |
79: | { |
80: | return $this->withPayload($payload, $arg, false); |
81: | } |
82: | |
83: | |
84: | |
85: | |
86: | public function stream(iterable $payload, $arg = null) |
87: | { |
88: | return $this->withPayload($payload, $arg, true); |
89: | } |
90: | |
91: | |
92: | |
93: | |
94: | |
95: | |
96: | |
97: | |
98: | |
99: | private function withPayload($payload, $arg, bool $stream) |
100: | { |
101: | if ($this->HasPayload) { |
102: | throw new LogicException('Payload already set'); |
103: | } |
104: | |
105: | |
106: | |
107: | $pipeline = $this; |
108: | $pipeline = $pipeline |
109: | ->with('HasPayload', true) |
110: | ->with('PayloadIsStream', $stream) |
111: | ->with('Payload', $payload) |
112: | ->with('Arg', $arg); |
113: | |
114: | return $pipeline; |
115: | } |
116: | |
117: | |
118: | |
119: | |
120: | public function withConformity(int $conformity) |
121: | { |
122: | $this->assertHasPayload(); |
123: | |
124: | return $this->with('Conformity', $conformity); |
125: | } |
126: | |
127: | |
128: | |
129: | |
130: | public function getConformity(): int |
131: | { |
132: | $this->assertHasPayload(); |
133: | |
134: | return $this->Conformity; |
135: | } |
136: | |
137: | |
138: | |
139: | |
140: | public function after(Closure $closure) |
141: | { |
142: | if ($this->After) { |
143: | throw new LogicException(static::class . '::after() already applied'); |
144: | } |
145: | |
146: | return $this->with('After', $closure); |
147: | } |
148: | |
149: | |
150: | |
151: | |
152: | public function afterIf(Closure $closure) |
153: | { |
154: | return $this->After |
155: | ? $this |
156: | : $this->with('After', $closure); |
157: | } |
158: | |
159: | |
160: | |
161: | |
162: | public function through(Closure $pipe) |
163: | { |
164: | $pipes = $this->Pipes; |
165: | $pipes[] = $pipe; |
166: | |
167: | return $this->with('Pipes', $pipes); |
168: | } |
169: | |
170: | |
171: | |
172: | |
173: | public function throughClosure(Closure $closure) |
174: | { |
175: | return $this->through( |
176: | static fn($payload, Closure $next, self $pipeline, $arg) => |
177: | $next($closure($payload, $pipeline, $arg)) |
178: | ); |
179: | } |
180: | |
181: | |
182: | |
183: | |
184: | public function throughKeyMap(array $keyMap, int $flags = ArrayMapper::ADD_UNMAPPED) |
185: | { |
186: | $keyMapKey = count($this->KeyMaps); |
187: | $pipeline = $this->through( |
188: | static function ($payload, Closure $next, self $pipeline) use ($keyMapKey) { |
189: | |
190: | |
191: | return $next($pipeline->ArrayMappers[$keyMapKey]->map($payload)); |
192: | } |
193: | ); |
194: | $pipeline->KeyMaps[] = [$keyMap, $flags]; |
195: | return $pipeline; |
196: | } |
197: | |
198: | |
199: | |
200: | |
201: | public function then(Closure $closure) |
202: | { |
203: | if ($this->Then || $this->CollectThen) { |
204: | throw new LogicException(static::class . '::then() already applied'); |
205: | } |
206: | |
207: | return $this->with('Then', $closure); |
208: | } |
209: | |
210: | |
211: | |
212: | |
213: | public function thenIf(Closure $closure) |
214: | { |
215: | return $this->Then || $this->CollectThen |
216: | ? $this |
217: | : $this->with('Then', $closure); |
218: | } |
219: | |
220: | |
221: | |
222: | |
223: | public function collectThen(Closure $closure) |
224: | { |
225: | $this->assertHasStream(); |
226: | |
227: | if ($this->Then || $this->CollectThen) { |
228: | throw new LogicException(static::class . '::then() already applied'); |
229: | } |
230: | |
231: | return $this->with('CollectThen', $closure); |
232: | } |
233: | |
234: | |
235: | |
236: | |
237: | public function collectThenIf(Closure $closure) |
238: | { |
239: | $this->assertHasStream(); |
240: | |
241: | return $this->Then || $this->CollectThen |
242: | ? $this |
243: | : $this->with('CollectThen', $closure); |
244: | } |
245: | |
246: | |
247: | |
248: | |
249: | public function cc(Closure $closure) |
250: | { |
251: | $cc = $this->Cc; |
252: | $cc[] = $closure; |
253: | |
254: | return $this->with('Cc', $cc); |
255: | } |
256: | |
257: | |
258: | |
259: | |
260: | public function unless(Closure $filter) |
261: | { |
262: | $this->assertHasStream(); |
263: | |
264: | if ($this->Unless) { |
265: | throw new LogicException(static::class . '::unless() already applied'); |
266: | } |
267: | |
268: | return $this->with('Unless', $filter); |
269: | } |
270: | |
271: | |
272: | |
273: | |
274: | public function unlessIf(Closure $filter) |
275: | { |
276: | $this->assertHasStream(); |
277: | |
278: | return $this->Unless |
279: | ? $this |
280: | : $this->with('Unless', $filter); |
281: | } |
282: | |
283: | |
284: | |
285: | |
286: | public function run() |
287: | { |
288: | $this->assertHasOnePayload(); |
289: | |
290: | $result = $this->getClosure()( |
291: | $this->After |
292: | ? ($this->After)($this->Payload, $this, $this->Arg) |
293: | : $this->Payload |
294: | ); |
295: | |
296: | if ($this->Cc) { |
297: | $this->ccResult($result); |
298: | } |
299: | |
300: | return $result; |
301: | } |
302: | |
303: | |
304: | |
305: | |
306: | public function runInto(PipelineInterface $next) |
307: | { |
308: | return $next->send($this->run(), $this->Arg); |
309: | } |
310: | |
311: | |
312: | |
313: | |
314: | public function start(): iterable |
315: | { |
316: | $this->assertHasStream(); |
317: | |
318: | $closure = $this->getClosure(); |
319: | $results = []; |
320: | foreach ($this->Payload as $key => $payload) { |
321: | $result = $closure( |
322: | $this->After |
323: | ? ($this->After)($payload, $this, $this->Arg) |
324: | : $payload |
325: | ); |
326: | |
327: | if ( |
328: | $this->Unless |
329: | && ($this->Unless)($result, $this, $this->Arg) !== false |
330: | ) { |
331: | continue; |
332: | } |
333: | |
334: | if ($this->CollectThen) { |
335: | $results[$key] = $result; |
336: | continue; |
337: | } |
338: | |
339: | if ($this->Cc) { |
340: | $this->ccResult($result); |
341: | } |
342: | |
343: | yield $key => $result; |
344: | } |
345: | |
346: | if (!$this->CollectThen || !$results) { |
347: | return; |
348: | } |
349: | |
350: | $results = ($this->CollectThen)($results, $this, $this->Arg); |
351: | foreach ($results as $key => $result) { |
352: | if ($this->Cc) { |
353: | $this->ccResult($result); |
354: | } |
355: | |
356: | yield $key => $result; |
357: | } |
358: | } |
359: | |
360: | |
361: | |
362: | |
363: | public function startInto(PipelineInterface $next) |
364: | { |
365: | return $next->stream($this->start(), $this->Arg); |
366: | } |
367: | |
368: | |
369: | |
370: | |
371: | private function ccResult($result): void |
372: | { |
373: | foreach ($this->Cc as $closure) { |
374: | $closure($result, $this, $this->Arg); |
375: | } |
376: | } |
377: | |
378: | private function getClosure(): Closure |
379: | { |
380: | $this->ArrayMappers = []; |
381: | foreach ($this->KeyMaps as [$keyMap, $flags]) { |
382: | $this->ArrayMappers[] = new ArrayMapper($keyMap, $this->Conformity, $flags); |
383: | } |
384: | |
385: | $closure = $this->Then |
386: | ? fn($result) => ($this->Then)($result, $this, $this->Arg) |
387: | : fn($result) => $result; |
388: | |
389: | foreach (array_reverse($this->Pipes) as $pipe) { |
390: | $closure = fn($payload) => |
391: | $pipe($payload, $closure, $this, $this->Arg); |
392: | } |
393: | |
394: | return $closure; |
395: | } |
396: | |
397: | |
398: | |
399: | |
400: | private function assertHasStream(): void |
401: | { |
402: | $this->assertHasPayload(true); |
403: | } |
404: | |
405: | |
406: | |
407: | |
408: | private function assertHasOnePayload(): void |
409: | { |
410: | $this->assertHasPayload(false); |
411: | } |
412: | |
413: | private function assertHasPayload(?bool $stream = null): void |
414: | { |
415: | if (!$this->HasPayload) { |
416: | throw new LogicException('No payload'); |
417: | } |
418: | if ($stream !== null && $this->PayloadIsStream !== $stream) { |
419: | throw new LogicException('Invalid payload'); |
420: | } |
421: | } |
422: | } |
423: | |