1: <?php declare(strict_types=1);
2:
3: namespace Salient\Core;
4:
5: use Salient\Contract\Core\Pipeline\EntityPipelineInterface;
6: use Salient\Contract\Core\Pipeline\PipelineInterface;
7: use Salient\Contract\Core\Pipeline\StreamPipelineInterface;
8: use Salient\Core\Concern\ChainableTrait;
9: use Salient\Core\Concern\ImmutableTrait;
10: use Closure;
11: use LogicException;
12:
13: /**
14: * @api
15: *
16: * @template TInput
17: * @template TOutput
18: * @template TArgument
19: *
20: * @implements PipelineInterface<TInput,TOutput,TArgument>
21: * @implements EntityPipelineInterface<TInput,TOutput,TArgument>
22: * @implements StreamPipelineInterface<TInput,TOutput,TArgument>
23: */
24: final class Pipeline implements
25: PipelineInterface,
26: EntityPipelineInterface,
27: StreamPipelineInterface
28: {
29: use ChainableTrait;
30: use ImmutableTrait;
31:
32: private bool $HasPayload = false;
33: private bool $PayloadIsStream;
34: /** @var iterable<TInput>|TInput */
35: private $Payload;
36: /** @var TArgument */
37: private $Arg;
38: /** @var self::* */
39: private int $Conformity = self::CONFORMITY_NONE;
40: /** @var (Closure(TInput $payload, static $pipeline, TArgument $arg): (TInput|TOutput))|null */
41: private ?Closure $After = null;
42: /** @var array<(Closure(TInput $payload, Closure $next, static $pipeline, TArgument $arg): (TInput|TOutput))|(Closure(TOutput $payload, Closure $next, static $pipeline, TArgument $arg): TOutput)> */
43: private array $Pipes = [];
44: /** @var array<array{array<array-key,array-key|array-key[]>,int-mask-of<ArrayMapper::REMOVE_NULL|ArrayMapper::ADD_UNMAPPED|ArrayMapper::ADD_MISSING|ArrayMapper::REQUIRE_MAPPED>}> */
45: private array $KeyMaps = [];
46: /** @var ArrayMapper[] */
47: private array $ArrayMappers;
48: /** @var (Closure(TInput $result, static $pipeline, TArgument $arg): TOutput)|(Closure(TOutput $result, static $pipeline, TArgument $arg): TOutput)|null */
49: private ?Closure $Then = null;
50: /** @var (Closure(array<TInput> $results, static $pipeline, TArgument $arg): iterable<TOutput>)|(Closure(array<TOutput> $results, static $pipeline, TArgument $arg): iterable<TOutput>)|null */
51: private ?Closure $CollectThen = null;
52: /** @var array<Closure(TOutput $result, static $pipeline, TArgument $arg): mixed> */
53: private array $Cc = [];
54: /** @var (Closure(TOutput, static, TArgument): bool)|null */
55: private ?Closure $Unless = null;
56:
57: /**
58: * Get a new pipeline
59: *
60: * @return PipelineInterface<mixed,mixed,mixed>
61: */
62: public static function create(): PipelineInterface
63: {
64: return new self();
65: }
66:
67: /**
68: * @internal
69: */
70: public function __clone()
71: {
72: unset($this->ArrayMappers);
73: }
74:
75: /**
76: * @inheritDoc
77: */
78: public function send($payload, $arg = null)
79: {
80: return $this->withPayload($payload, $arg, false);
81: }
82:
83: /**
84: * @inheritDoc
85: */
86: public function stream(iterable $payload, $arg = null)
87: {
88: return $this->withPayload($payload, $arg, true);
89: }
90:
91: /**
92: * @template T0
93: * @template T1
94: *
95: * @param iterable<T0>|T0 $payload
96: * @param T1 $arg
97: * @return self<TInput&T0,TOutput,TArgument&T1>
98: */
99: private function withPayload($payload, $arg, bool $stream)
100: {
101: if ($this->HasPayload) {
102: throw new LogicException('Payload already set');
103: }
104:
105: /** @var self<T0,TOutput,T1> */
106: // @phpstan-ignore varTag.nativeType
107: $pipeline = $this;
108: $pipeline = $pipeline
109: ->with('HasPayload', true)
110: ->with('PayloadIsStream', $stream)
111: ->with('Payload', $payload)
112: ->with('Arg', $arg);
113: /** @var self<TInput&T0,TOutput,TArgument&T1> */
114: return $pipeline;
115: }
116:
117: /**
118: * @inheritDoc
119: */
120: public function withConformity(int $conformity)
121: {
122: $this->assertHasPayload();
123:
124: return $this->with('Conformity', $conformity);
125: }
126:
127: /**
128: * @inheritDoc
129: */
130: public function getConformity(): int
131: {
132: $this->assertHasPayload();
133:
134: return $this->Conformity;
135: }
136:
137: /**
138: * @inheritDoc
139: */
140: public function after(Closure $closure)
141: {
142: if ($this->After) {
143: throw new LogicException(static::class . '::after() already applied');
144: }
145:
146: return $this->with('After', $closure);
147: }
148:
149: /**
150: * @inheritDoc
151: */
152: public function afterIf(Closure $closure)
153: {
154: return $this->After
155: ? $this
156: : $this->with('After', $closure);
157: }
158:
159: /**
160: * @inheritDoc
161: */
162: public function through(Closure $pipe)
163: {
164: $pipes = $this->Pipes;
165: $pipes[] = $pipe;
166:
167: return $this->with('Pipes', $pipes);
168: }
169:
170: /**
171: * @inheritDoc
172: */
173: public function throughClosure(Closure $closure)
174: {
175: return $this->through(
176: static fn($payload, Closure $next, self $pipeline, $arg) =>
177: $next($closure($payload, $pipeline, $arg))
178: );
179: }
180:
181: /**
182: * @inheritDoc
183: */
184: public function throughKeyMap(array $keyMap, int $flags = ArrayMapper::ADD_UNMAPPED)
185: {
186: $keyMapKey = count($this->KeyMaps);
187: $pipeline = $this->through(
188: static function ($payload, Closure $next, self $pipeline) use ($keyMapKey) {
189: /** @var mixed[] $payload */
190: // @phpstan-ignore varTag.nativeType
191: return $next($pipeline->ArrayMappers[$keyMapKey]->map($payload));
192: }
193: );
194: $pipeline->KeyMaps[] = [$keyMap, $flags];
195: return $pipeline;
196: }
197:
198: /**
199: * @inheritDoc
200: */
201: public function then(Closure $closure)
202: {
203: if ($this->Then || $this->CollectThen) {
204: throw new LogicException(static::class . '::then() already applied');
205: }
206:
207: return $this->with('Then', $closure);
208: }
209:
210: /**
211: * @inheritDoc
212: */
213: public function thenIf(Closure $closure)
214: {
215: return $this->Then || $this->CollectThen
216: ? $this
217: : $this->with('Then', $closure);
218: }
219:
220: /**
221: * @inheritDoc
222: */
223: public function collectThen(Closure $closure)
224: {
225: $this->assertHasStream();
226:
227: if ($this->Then || $this->CollectThen) {
228: throw new LogicException(static::class . '::then() already applied');
229: }
230:
231: return $this->with('CollectThen', $closure);
232: }
233:
234: /**
235: * @inheritDoc
236: */
237: public function collectThenIf(Closure $closure)
238: {
239: $this->assertHasStream();
240:
241: return $this->Then || $this->CollectThen
242: ? $this
243: : $this->with('CollectThen', $closure);
244: }
245:
246: /**
247: * @inheritDoc
248: */
249: public function cc(Closure $closure)
250: {
251: $cc = $this->Cc;
252: $cc[] = $closure;
253:
254: return $this->with('Cc', $cc);
255: }
256:
257: /**
258: * @inheritDoc
259: */
260: public function unless(Closure $filter)
261: {
262: $this->assertHasStream();
263:
264: if ($this->Unless) {
265: throw new LogicException(static::class . '::unless() already applied');
266: }
267:
268: return $this->with('Unless', $filter);
269: }
270:
271: /**
272: * @inheritDoc
273: */
274: public function unlessIf(Closure $filter)
275: {
276: $this->assertHasStream();
277:
278: return $this->Unless
279: ? $this
280: : $this->with('Unless', $filter);
281: }
282:
283: /**
284: * @inheritDoc
285: */
286: public function run()
287: {
288: $this->assertHasOnePayload();
289:
290: $result = $this->getClosure()(
291: $this->After
292: ? ($this->After)($this->Payload, $this, $this->Arg)
293: : $this->Payload
294: );
295:
296: if ($this->Cc) {
297: $this->ccResult($result);
298: }
299:
300: return $result;
301: }
302:
303: /**
304: * @inheritDoc
305: */
306: public function runInto(PipelineInterface $next)
307: {
308: return $next->send($this->run(), $this->Arg);
309: }
310:
311: /**
312: * @inheritDoc
313: */
314: public function start(): iterable
315: {
316: $this->assertHasStream();
317:
318: $closure = $this->getClosure();
319: $results = [];
320: foreach ($this->Payload as $key => $payload) {
321: $result = $closure(
322: $this->After
323: ? ($this->After)($payload, $this, $this->Arg)
324: : $payload
325: );
326:
327: if (
328: $this->Unless
329: && ($this->Unless)($result, $this, $this->Arg) !== false
330: ) {
331: continue;
332: }
333:
334: if ($this->CollectThen) {
335: $results[$key] = $result;
336: continue;
337: }
338:
339: if ($this->Cc) {
340: $this->ccResult($result);
341: }
342:
343: yield $key => $result;
344: }
345:
346: if (!$this->CollectThen || !$results) {
347: return;
348: }
349:
350: $results = ($this->CollectThen)($results, $this, $this->Arg);
351: foreach ($results as $key => $result) {
352: if ($this->Cc) {
353: $this->ccResult($result);
354: }
355:
356: yield $key => $result;
357: }
358: }
359:
360: /**
361: * @inheritDoc
362: */
363: public function startInto(PipelineInterface $next)
364: {
365: return $next->stream($this->start(), $this->Arg);
366: }
367:
368: /**
369: * @param mixed $result
370: */
371: private function ccResult($result): void
372: {
373: foreach ($this->Cc as $closure) {
374: $closure($result, $this, $this->Arg);
375: }
376: }
377:
378: private function getClosure(): Closure
379: {
380: $this->ArrayMappers = [];
381: foreach ($this->KeyMaps as [$keyMap, $flags]) {
382: $this->ArrayMappers[] = new ArrayMapper($keyMap, $this->Conformity, $flags);
383: }
384:
385: $closure = $this->Then
386: ? fn($result) => ($this->Then)($result, $this, $this->Arg)
387: : fn($result) => $result;
388:
389: foreach (array_reverse($this->Pipes) as $pipe) {
390: $closure = fn($payload) =>
391: $pipe($payload, $closure, $this, $this->Arg);
392: }
393:
394: return $closure;
395: }
396:
397: /**
398: * @phpstan-assert iterable<TInput> $this->Payload
399: */
400: private function assertHasStream(): void
401: {
402: $this->assertHasPayload(true);
403: }
404:
405: /**
406: * @phpstan-assert TInput $this->Payload
407: */
408: private function assertHasOnePayload(): void
409: {
410: $this->assertHasPayload(false);
411: }
412:
413: private function assertHasPayload(?bool $stream = null): void
414: {
415: if (!$this->HasPayload) {
416: throw new LogicException('No payload');
417: }
418: if ($stream !== null && $this->PayloadIsStream !== $stream) {
419: throw new LogicException('Invalid payload');
420: }
421: }
422: }
423: