1: <?php declare(strict_types=1);
2:
3: namespace Salient\Core;
4:
5: use Salient\Contract\Container\ContainerInterface;
6: use Salient\Contract\Core\Pipeline\EntityPipelineInterface;
7: use Salient\Contract\Core\Pipeline\PipeInterface;
8: use Salient\Contract\Core\Pipeline\PipelineInterface;
9: use Salient\Contract\Core\Pipeline\StreamPipelineInterface;
10: use Salient\Contract\Core\ArrayMapperFlag;
11: use Salient\Contract\Core\ListConformity;
12: use Salient\Core\Concern\HasChainableMethods;
13: use Salient\Core\Concern\HasMutator;
14: use Salient\Core\Facade\App;
15: use Salient\Utility\Arr;
16: use Closure;
17: use Generator;
18: use LogicException;
19:
20: /**
21: * Sends a payload through a series of pipes
22: *
23: * @template TInput
24: * @template TOutput
25: * @template TArgument
26: *
27: * @implements PipelineInterface<TInput,TOutput,TArgument>
28: * @implements EntityPipelineInterface<TInput,TOutput,TArgument>
29: * @implements StreamPipelineInterface<TInput,TOutput,TArgument>
30: */
31: final class Pipeline implements
32: PipelineInterface,
33: EntityPipelineInterface,
34: StreamPipelineInterface
35: {
36: use HasChainableMethods;
37: use HasMutator;
38:
39: private bool $HasPayload = false;
40: private bool $HasStream;
41: /** @var iterable<TInput>|TInput */
42: private $Payload;
43: /** @var TArgument */
44: private $Arg;
45: /** @var ListConformity::* */
46: private int $PayloadConformity = ListConformity::NONE;
47: /** @var (Closure(TInput $payload, static $pipeline, TArgument $arg): (TInput|TOutput))|null */
48: private ?Closure $After = null;
49: /** @var array<(Closure(TInput $payload, Closure $next, static $pipeline, TArgument $arg): (TInput|TOutput))|(Closure(TOutput $payload, Closure $next, static $pipeline, TArgument $arg): TOutput)|PipeInterface<TInput,TOutput,TArgument>|class-string<PipeInterface<TInput,TOutput,TArgument>>> */
50: private array $Pipes = [];
51: /** @var array<array{array<array-key,array-key|array-key[]>,int-mask-of<ArrayMapperFlag::*>}> */
52: private array $KeyMaps = [];
53: /** @var ArrayMapper[] */
54: private array $ArrayMappers;
55: /** @var (Closure(TInput $result, static $pipeline, TArgument $arg): TOutput)|(Closure(TOutput $result, static $pipeline, TArgument $arg): TOutput)|null */
56: private ?Closure $Then = null;
57: /** @var (Closure(array<TInput> $results, static $pipeline, TArgument $arg): iterable<TOutput>)|(Closure(array<TOutput> $results, static $pipeline, TArgument $arg): iterable<TOutput>)|null */
58: private ?Closure $CollectThen = null;
59: /** @var array<Closure(TOutput $result, static $pipeline, TArgument $arg): mixed> */
60: private array $Cc = [];
61: /** @var (Closure(TOutput, static, TArgument): bool)|null */
62: private ?Closure $Unless = null;
63: private ?ContainerInterface $Container;
64:
65: /**
66: * Creates a new Pipeline object
67: */
68: public function __construct(?ContainerInterface $container = null)
69: {
70: $this->Container = $container;
71: }
72:
73: /**
74: * Creates a new Pipeline object
75: *
76: * @return PipelineInterface<mixed,mixed,mixed>
77: */
78: public static function create(?ContainerInterface $container = null): PipelineInterface
79: {
80: return new self($container);
81: }
82:
83: /**
84: * @internal
85: */
86: public function __clone()
87: {
88: unset($this->ArrayMappers);
89: }
90:
91: /**
92: * @inheritDoc
93: */
94: public function send($payload, $arg = null)
95: {
96: return $this->withPayload($payload, $arg, false);
97: }
98:
99: /**
100: * @inheritDoc
101: */
102: public function stream(iterable $payload, $arg = null)
103: {
104: return $this->withPayload($payload, $arg, true);
105: }
106:
107: /**
108: * @template T0
109: * @template T1
110: *
111: * @param iterable<T0>|T0 $payload
112: * @param T1 $arg
113: * @return static<TInput&T0,TOutput,TArgument&T1>
114: */
115: private function withPayload($payload, $arg, bool $stream)
116: {
117: if ($this->HasPayload) {
118: // @codeCoverageIgnoreStart
119: throw new LogicException('Payload already set');
120: // @codeCoverageIgnoreEnd
121: }
122:
123: /** @var static<T0,TOutput,T1> */
124: $pipeline = $this;
125: $pipeline = $pipeline
126: ->with('HasPayload', true)
127: ->with('HasStream', $stream)
128: ->with('Payload', $payload)
129: ->with('Arg', $arg);
130: /** @var static<TInput&T0,TOutput,TArgument&T1> */
131: return $pipeline;
132: }
133:
134: /**
135: * @inheritDoc
136: */
137: public function withConformity($conformity)
138: {
139: $this->assertHasPayload();
140:
141: return $this->with('PayloadConformity', $conformity);
142: }
143:
144: /**
145: * @inheritDoc
146: */
147: public function getConformity()
148: {
149: $this->assertHasPayload();
150:
151: return $this->PayloadConformity;
152: }
153:
154: /**
155: * @inheritDoc
156: */
157: public function after(Closure $closure)
158: {
159: if ($this->After) {
160: // @codeCoverageIgnoreStart
161: throw new LogicException(static::class . '::after() already applied');
162: // @codeCoverageIgnoreEnd
163: }
164:
165: return $this->with('After', $closure);
166: }
167:
168: /**
169: * @inheritDoc
170: */
171: public function afterIf(Closure $closure)
172: {
173: return $this->After
174: ? $this
175: : $this->after($closure);
176: }
177:
178: /**
179: * @inheritDoc
180: */
181: public function through($pipe)
182: {
183: return $this->with('Pipes', Arr::push($this->Pipes, $pipe));
184: }
185:
186: /**
187: * @inheritDoc
188: */
189: public function throughClosure(Closure $closure)
190: {
191: return $this->through(
192: fn($payload, Closure $next, self $pipeline, $arg) =>
193: $next($closure($payload, $pipeline, $arg))
194: );
195: }
196:
197: /**
198: * @inheritDoc
199: */
200: public function throughKeyMap(array $keyMap, int $flags = ArrayMapperFlag::ADD_UNMAPPED)
201: {
202: $keyMapKey = count($this->KeyMaps);
203: $clone = $this->through(
204: function ($payload, Closure $next, self $pipeline) use ($keyMapKey) {
205: /** @var mixed[] $payload */
206: return $next($pipeline->ArrayMappers[$keyMapKey]->map($payload));
207: }
208: );
209: $clone->KeyMaps[] = [$keyMap, $flags];
210: return $clone;
211: }
212:
213: /**
214: * @inheritDoc
215: */
216: public function then(Closure $closure)
217: {
218: if ($this->Then || $this->CollectThen) {
219: throw new LogicException(static::class . '::then() already applied');
220: }
221:
222: return $this->with('Then', $closure);
223: }
224:
225: /**
226: * @inheritDoc
227: */
228: public function thenIf(Closure $closure)
229: {
230: return $this->Then || $this->CollectThen
231: ? $this
232: : $this->then($closure);
233: }
234:
235: /**
236: * @inheritDoc
237: */
238: public function collectThen(Closure $closure)
239: {
240: $this->assertHasStream();
241:
242: if ($this->Then || $this->CollectThen) {
243: throw new LogicException(static::class . '::then() already applied');
244: }
245:
246: return $this->with('CollectThen', $closure);
247: }
248:
249: /**
250: * @inheritDoc
251: */
252: public function collectThenIf(Closure $closure)
253: {
254: $this->assertHasStream();
255:
256: return $this->Then || $this->CollectThen
257: ? $this
258: : $this->collectThen($closure);
259: }
260:
261: /**
262: * @inheritDoc
263: */
264: public function cc(Closure $closure)
265: {
266: return $this->with('Cc', Arr::push($this->Cc, $closure));
267: }
268:
269: /**
270: * @inheritDoc
271: */
272: public function unless(Closure $filter)
273: {
274: $this->assertHasStream();
275:
276: if ($this->Unless) {
277: throw new LogicException(static::class . '::unless() already applied');
278: }
279:
280: return $this->with('Unless', $filter);
281: }
282:
283: /**
284: * @inheritDoc
285: */
286: public function unlessIf(Closure $filter)
287: {
288: $this->assertHasStream();
289:
290: return $this->Unless
291: ? $this
292: : $this->unless($filter);
293: }
294:
295: /**
296: * @inheritDoc
297: */
298: public function run()
299: {
300: $this->assertHasOnePayload();
301:
302: $result = $this->getClosure()(
303: $this->After
304: ? ($this->After)($this->Payload, $this, $this->Arg)
305: : $this->Payload
306: );
307:
308: if ($this->Cc) {
309: $this->ccResult($result);
310: }
311:
312: return $result;
313: }
314:
315: /**
316: * @inheritDoc
317: */
318: public function runInto(PipelineInterface $next)
319: {
320: return $next->send($this->run(), $this->Arg);
321: }
322:
323: /**
324: * @inheritDoc
325: */
326: public function start(): Generator
327: {
328: $this->assertHasStream();
329:
330: $closure = $this->getClosure();
331: $results = [];
332: foreach ($this->Payload as $key => $payload) {
333: $result = $closure(
334: $this->After
335: ? ($this->After)($payload, $this, $this->Arg)
336: : $payload
337: );
338:
339: if (
340: $this->Unless
341: && ($this->Unless)($result, $this, $this->Arg) !== false
342: ) {
343: continue;
344: }
345:
346: if ($this->CollectThen) {
347: $results[$key] = $result;
348: continue;
349: }
350:
351: if ($this->Cc) {
352: $this->ccResult($result);
353: }
354:
355: yield $key => $result;
356: }
357:
358: if (!$this->CollectThen || !$results) {
359: return;
360: }
361:
362: $results = ($this->CollectThen)($results, $this, $this->Arg);
363: foreach ($results as $key => $result) {
364: if ($this->Cc) {
365: $this->ccResult($result);
366: }
367:
368: yield $key => $result;
369: }
370: }
371:
372: /**
373: * @inheritDoc
374: */
375: public function startInto(PipelineInterface $next)
376: {
377: return $next->stream($this->start(), $this->Arg);
378: }
379:
380: /**
381: * @param mixed $result
382: */
383: private function ccResult($result): void
384: {
385: foreach ($this->Cc as $closure) {
386: $closure($result, $this, $this->Arg);
387: }
388: }
389:
390: private function getClosure(): Closure
391: {
392: $this->ArrayMappers = [];
393: foreach ($this->KeyMaps as [$keyMap, $flags]) {
394: $this->ArrayMappers[] = new ArrayMapper($keyMap, $this->PayloadConformity, $flags);
395: }
396:
397: $closure = $this->Then
398: ? fn($result) => ($this->Then)($result, $this, $this->Arg)
399: : fn($result) => $result;
400:
401: foreach (array_reverse($this->Pipes) as $pipe) {
402: if (is_string($pipe)) {
403: $pipe = $this->Container
404: ? $this->Container->get($pipe)
405: : App::get($pipe);
406:
407: if (!$pipe instanceof PipeInterface) {
408: throw new LogicException(sprintf(
409: '%s does not implement %s',
410: get_class($pipe),
411: PipeInterface::class,
412: ));
413: }
414: }
415:
416: $closure = fn($payload) =>
417: $pipe($payload, $closure, $this, $this->Arg);
418: }
419:
420: return $closure;
421: }
422:
423: /**
424: * @phpstan-assert iterable<TInput> $this->Payload
425: */
426: private function assertHasStream(): void
427: {
428: $this->assertHasPayload(true);
429: }
430:
431: /**
432: * @phpstan-assert TInput $this->Payload
433: */
434: private function assertHasOnePayload(): void
435: {
436: $this->assertHasPayload(false);
437: }
438:
439: private function assertHasPayload(?bool $stream = null): void
440: {
441: if (!$this->HasPayload) {
442: throw new LogicException('No payload');
443: }
444: if ($stream !== null && $this->HasStream !== $stream) {
445: throw new LogicException('Invalid payload');
446: }
447: }
448: }
449: