1: <?php declare(strict_types=1);
2:
3: namespace Salient\Core;
4:
5: use Salient\Contract\Container\ContainerInterface;
6: use Salient\Contract\Core\Pipeline\EntityPipelineInterface;
7: use Salient\Contract\Core\Pipeline\PipeInterface;
8: use Salient\Contract\Core\Pipeline\PipelineInterface;
9: use Salient\Contract\Core\Pipeline\StreamPipelineInterface;
10: use Salient\Contract\Core\ListConformity;
11: use Salient\Core\Concern\HasChainableMethods;
12: use Salient\Core\Concern\HasMutator;
13: use Salient\Core\Facade\App;
14: use Salient\Utility\Arr;
15: use Closure;
16: use Generator;
17: use LogicException;
18:
19: /**
20: * Sends a payload through a series of pipes
21: *
22: * @template TInput
23: * @template TOutput
24: * @template TArgument
25: *
26: * @implements PipelineInterface<TInput,TOutput,TArgument>
27: * @implements EntityPipelineInterface<TInput,TOutput,TArgument>
28: * @implements StreamPipelineInterface<TInput,TOutput,TArgument>
29: */
30: final class Pipeline implements
31: PipelineInterface,
32: EntityPipelineInterface,
33: StreamPipelineInterface
34: {
35: use HasChainableMethods;
36: use HasMutator;
37:
38: private bool $HasPayload = false;
39: private bool $HasStream;
40: /** @var iterable<TInput>|TInput */
41: private $Payload;
42: /** @var TArgument */
43: private $Arg;
44: /** @var ListConformity::* */
45: private int $PayloadConformity = ListConformity::NONE;
46: /** @var (Closure(TInput $payload, static $pipeline, TArgument $arg): (TInput|TOutput))|null */
47: private ?Closure $After = null;
48: /** @var array<(Closure(TInput $payload, Closure $next, static $pipeline, TArgument $arg): (TInput|TOutput))|(Closure(TOutput $payload, Closure $next, static $pipeline, TArgument $arg): TOutput)|PipeInterface<TInput,TOutput,TArgument>|class-string<PipeInterface<TInput,TOutput,TArgument>>> */
49: private array $Pipes = [];
50: /** @var array<array{array<array-key,array-key|array-key[]>,int-mask-of<ArrayMapper::*>}> */
51: private array $KeyMaps = [];
52: /** @var ArrayMapper[] */
53: private array $ArrayMappers;
54: /** @var (Closure(TInput $result, static $pipeline, TArgument $arg): TOutput)|(Closure(TOutput $result, static $pipeline, TArgument $arg): TOutput)|null */
55: private ?Closure $Then = null;
56: /** @var (Closure(array<TInput> $results, static $pipeline, TArgument $arg): iterable<TOutput>)|(Closure(array<TOutput> $results, static $pipeline, TArgument $arg): iterable<TOutput>)|null */
57: private ?Closure $CollectThen = null;
58: /** @var array<Closure(TOutput $result, static $pipeline, TArgument $arg): mixed> */
59: private array $Cc = [];
60: /** @var (Closure(TOutput, static, TArgument): bool)|null */
61: private ?Closure $Unless = null;
62: private ?ContainerInterface $Container;
63:
64: /**
65: * Creates a new Pipeline object
66: */
67: public function __construct(?ContainerInterface $container = null)
68: {
69: $this->Container = $container;
70: }
71:
72: /**
73: * Creates a new Pipeline object
74: *
75: * @return PipelineInterface<mixed,mixed,mixed>
76: */
77: public static function create(?ContainerInterface $container = null): PipelineInterface
78: {
79: return new self($container);
80: }
81:
82: /**
83: * @internal
84: */
85: public function __clone()
86: {
87: unset($this->ArrayMappers);
88: }
89:
90: /**
91: * @inheritDoc
92: */
93: public function send($payload, $arg = null)
94: {
95: return $this->withPayload($payload, $arg, false);
96: }
97:
98: /**
99: * @inheritDoc
100: */
101: public function stream(iterable $payload, $arg = null)
102: {
103: return $this->withPayload($payload, $arg, true);
104: }
105:
106: /**
107: * @template T0
108: * @template T1
109: *
110: * @param iterable<T0>|T0 $payload
111: * @param T1 $arg
112: * @return static<TInput&T0,TOutput,TArgument&T1>
113: */
114: private function withPayload($payload, $arg, bool $stream)
115: {
116: if ($this->HasPayload) {
117: // @codeCoverageIgnoreStart
118: throw new LogicException('Payload already set');
119: // @codeCoverageIgnoreEnd
120: }
121:
122: /** @var static<T0,TOutput,T1> */
123: $pipeline = $this;
124: $pipeline = $pipeline
125: ->with('HasPayload', true)
126: ->with('HasStream', $stream)
127: ->with('Payload', $payload)
128: ->with('Arg', $arg);
129: /** @var static<TInput&T0,TOutput,TArgument&T1> */
130: return $pipeline;
131: }
132:
133: /**
134: * @inheritDoc
135: */
136: public function withConformity($conformity)
137: {
138: $this->assertHasPayload();
139:
140: return $this->with('PayloadConformity', $conformity);
141: }
142:
143: /**
144: * @inheritDoc
145: */
146: public function getConformity()
147: {
148: $this->assertHasPayload();
149:
150: return $this->PayloadConformity;
151: }
152:
153: /**
154: * @inheritDoc
155: */
156: public function after(Closure $closure)
157: {
158: if ($this->After) {
159: // @codeCoverageIgnoreStart
160: throw new LogicException(static::class . '::after() already applied');
161: // @codeCoverageIgnoreEnd
162: }
163:
164: return $this->with('After', $closure);
165: }
166:
167: /**
168: * @inheritDoc
169: */
170: public function afterIf(Closure $closure)
171: {
172: return $this->After
173: ? $this
174: : $this->after($closure);
175: }
176:
177: /**
178: * @inheritDoc
179: */
180: public function through($pipe)
181: {
182: return $this->with('Pipes', Arr::push($this->Pipes, $pipe));
183: }
184:
185: /**
186: * @inheritDoc
187: */
188: public function throughClosure(Closure $closure)
189: {
190: return $this->through(
191: fn($payload, Closure $next, self $pipeline, $arg) =>
192: $next($closure($payload, $pipeline, $arg))
193: );
194: }
195:
196: /**
197: * @inheritDoc
198: */
199: public function throughKeyMap(array $keyMap, int $flags = ArrayMapper::ADD_UNMAPPED)
200: {
201: $keyMapKey = count($this->KeyMaps);
202: $clone = $this->through(
203: function ($payload, Closure $next, self $pipeline) use ($keyMapKey) {
204: /** @var mixed[] $payload */
205: return $next($pipeline->ArrayMappers[$keyMapKey]->map($payload));
206: }
207: );
208: $clone->KeyMaps[] = [$keyMap, $flags];
209: return $clone;
210: }
211:
212: /**
213: * @inheritDoc
214: */
215: public function then(Closure $closure)
216: {
217: if ($this->Then || $this->CollectThen) {
218: throw new LogicException(static::class . '::then() already applied');
219: }
220:
221: return $this->with('Then', $closure);
222: }
223:
224: /**
225: * @inheritDoc
226: */
227: public function thenIf(Closure $closure)
228: {
229: return $this->Then || $this->CollectThen
230: ? $this
231: : $this->then($closure);
232: }
233:
234: /**
235: * @inheritDoc
236: */
237: public function collectThen(Closure $closure)
238: {
239: $this->assertHasStream();
240:
241: if ($this->Then || $this->CollectThen) {
242: throw new LogicException(static::class . '::then() already applied');
243: }
244:
245: return $this->with('CollectThen', $closure);
246: }
247:
248: /**
249: * @inheritDoc
250: */
251: public function collectThenIf(Closure $closure)
252: {
253: $this->assertHasStream();
254:
255: return $this->Then || $this->CollectThen
256: ? $this
257: : $this->collectThen($closure);
258: }
259:
260: /**
261: * @inheritDoc
262: */
263: public function cc(Closure $closure)
264: {
265: return $this->with('Cc', Arr::push($this->Cc, $closure));
266: }
267:
268: /**
269: * @inheritDoc
270: */
271: public function unless(Closure $filter)
272: {
273: $this->assertHasStream();
274:
275: if ($this->Unless) {
276: throw new LogicException(static::class . '::unless() already applied');
277: }
278:
279: return $this->with('Unless', $filter);
280: }
281:
282: /**
283: * @inheritDoc
284: */
285: public function unlessIf(Closure $filter)
286: {
287: $this->assertHasStream();
288:
289: return $this->Unless
290: ? $this
291: : $this->unless($filter);
292: }
293:
294: /**
295: * @inheritDoc
296: */
297: public function run()
298: {
299: $this->assertHasOnePayload();
300:
301: $result = $this->getClosure()(
302: $this->After
303: ? ($this->After)($this->Payload, $this, $this->Arg)
304: : $this->Payload
305: );
306:
307: if ($this->Cc) {
308: $this->ccResult($result);
309: }
310:
311: return $result;
312: }
313:
314: /**
315: * @inheritDoc
316: */
317: public function runInto(PipelineInterface $next)
318: {
319: return $next->send($this->run(), $this->Arg);
320: }
321:
322: /**
323: * @inheritDoc
324: */
325: public function start(): Generator
326: {
327: $this->assertHasStream();
328:
329: $closure = $this->getClosure();
330: $results = [];
331: foreach ($this->Payload as $key => $payload) {
332: $result = $closure(
333: $this->After
334: ? ($this->After)($payload, $this, $this->Arg)
335: : $payload
336: );
337:
338: if (
339: $this->Unless
340: && ($this->Unless)($result, $this, $this->Arg) !== false
341: ) {
342: continue;
343: }
344:
345: if ($this->CollectThen) {
346: $results[$key] = $result;
347: continue;
348: }
349:
350: if ($this->Cc) {
351: $this->ccResult($result);
352: }
353:
354: yield $key => $result;
355: }
356:
357: if (!$this->CollectThen || !$results) {
358: return;
359: }
360:
361: $results = ($this->CollectThen)($results, $this, $this->Arg);
362: foreach ($results as $key => $result) {
363: if ($this->Cc) {
364: $this->ccResult($result);
365: }
366:
367: yield $key => $result;
368: }
369: }
370:
371: /**
372: * @inheritDoc
373: */
374: public function startInto(PipelineInterface $next)
375: {
376: return $next->stream($this->start(), $this->Arg);
377: }
378:
379: /**
380: * @param mixed $result
381: */
382: private function ccResult($result): void
383: {
384: foreach ($this->Cc as $closure) {
385: $closure($result, $this, $this->Arg);
386: }
387: }
388:
389: private function getClosure(): Closure
390: {
391: $this->ArrayMappers = [];
392: foreach ($this->KeyMaps as [$keyMap, $flags]) {
393: $this->ArrayMappers[] = new ArrayMapper($keyMap, $this->PayloadConformity, $flags);
394: }
395:
396: $closure = $this->Then
397: ? fn($result) => ($this->Then)($result, $this, $this->Arg)
398: : fn($result) => $result;
399:
400: foreach (array_reverse($this->Pipes) as $pipe) {
401: if (is_string($pipe)) {
402: $pipe = $this->Container
403: ? $this->Container->get($pipe)
404: : App::get($pipe);
405:
406: if (!$pipe instanceof PipeInterface) {
407: throw new LogicException(sprintf(
408: '%s does not implement %s',
409: get_class($pipe),
410: PipeInterface::class,
411: ));
412: }
413: }
414:
415: $closure = fn($payload) =>
416: $pipe($payload, $closure, $this, $this->Arg);
417: }
418:
419: return $closure;
420: }
421:
422: /**
423: * @phpstan-assert iterable<TInput> $this->Payload
424: */
425: private function assertHasStream(): void
426: {
427: $this->assertHasPayload(true);
428: }
429:
430: /**
431: * @phpstan-assert TInput $this->Payload
432: */
433: private function assertHasOnePayload(): void
434: {
435: $this->assertHasPayload(false);
436: }
437:
438: private function assertHasPayload(?bool $stream = null): void
439: {
440: if (!$this->HasPayload) {
441: throw new LogicException('No payload');
442: }
443: if ($stream !== null && $this->HasStream !== $stream) {
444: throw new LogicException('Invalid payload');
445: }
446: }
447: }
448: