1: <?php declare(strict_types=1);
2:
3: namespace Salient\Core;
4:
5: use Salient\Contract\Core\Pipeline\EntityPipelineInterface;
6: use Salient\Contract\Core\Pipeline\PipelineInterface;
7: use Salient\Contract\Core\Pipeline\StreamPipelineInterface;
8: use Salient\Core\Concern\ChainableTrait;
9: use Salient\Core\Concern\ImmutableTrait;
10: use Salient\Utility\Get;
11: use Closure;
12: use LogicException;
13:
14: /**
15: * @api
16: *
17: * @template TInput
18: * @template TOutput
19: * @template TArgument
20: *
21: * @implements PipelineInterface<TInput,TOutput,TArgument>
22: * @implements EntityPipelineInterface<TInput,TOutput,TArgument>
23: * @implements StreamPipelineInterface<TInput,TOutput,TArgument>
24: */
25: final class Pipeline implements
26: PipelineInterface,
27: EntityPipelineInterface,
28: StreamPipelineInterface
29: {
30: use ChainableTrait;
31: use ImmutableTrait;
32:
33: private bool $HasPayload = false;
34: private bool $PayloadIsStream;
35: /** @var iterable<TInput>|TInput */
36: private $Payload;
37: /** @var TArgument */
38: private $Arg;
39: /** @var self::* */
40: private int $Conformity = self::CONFORMITY_NONE;
41: /** @var (Closure(TInput $payload, static $pipeline, TArgument $arg): (TInput|TOutput))|null */
42: private ?Closure $After = null;
43: /** @var array<(Closure(TInput $payload, Closure $next, static $pipeline, TArgument $arg): (TInput|TOutput))|(Closure(TOutput $payload, Closure $next, static $pipeline, TArgument $arg): TOutput)> */
44: private array $Pipes = [];
45: /** @var array<array{array<array-key,array-key|array-key[]>,int-mask-of<ArrayMapper::REMOVE_NULL|ArrayMapper::ADD_UNMAPPED|ArrayMapper::ADD_MISSING|ArrayMapper::REQUIRE_MAPPED>}> */
46: private array $KeyMaps = [];
47: /** @var ArrayMapper[] */
48: private array $ArrayMappers;
49: /** @var (Closure(TInput $result, static $pipeline, TArgument $arg): TOutput)|(Closure(TOutput $result, static $pipeline, TArgument $arg): TOutput)|null */
50: private ?Closure $Then = null;
51: /** @var (Closure(array<TInput> $results, static $pipeline, TArgument $arg): iterable<TOutput>)|(Closure(array<TOutput> $results, static $pipeline, TArgument $arg): iterable<TOutput>)|null */
52: private ?Closure $CollectThen = null;
53: /** @var array<Closure(TOutput $result, static $pipeline, TArgument $arg): mixed> */
54: private array $Cc = [];
55: /** @var (Closure(TOutput, static, TArgument): bool)|null */
56: private ?Closure $Unless = null;
57:
58: /**
59: * Get a new pipeline
60: *
61: * @return PipelineInterface<mixed,mixed,mixed>
62: */
63: public static function create(): PipelineInterface
64: {
65: return new self();
66: }
67:
68: /**
69: * @internal
70: */
71: public function __clone()
72: {
73: unset($this->ArrayMappers);
74: }
75:
76: /**
77: * @inheritDoc
78: */
79: public function send($payload, $arg = null)
80: {
81: return $this->withPayload($payload, $arg, false);
82: }
83:
84: /**
85: * @inheritDoc
86: */
87: public function stream(iterable $payload, $arg = null)
88: {
89: return $this->withPayload($payload, $arg, true);
90: }
91:
92: /**
93: * @template T0
94: * @template T1
95: *
96: * @param iterable<T0>|T0 $payload
97: * @param T1 $arg
98: * @return self<TInput&T0,TOutput,TArgument&T1>
99: */
100: private function withPayload($payload, $arg, bool $stream)
101: {
102: if ($this->HasPayload) {
103: throw new LogicException('Payload already set');
104: }
105:
106: /** @var self<T0,TOutput,T1> */
107: // @phpstan-ignore varTag.nativeType
108: $pipeline = $this;
109: $pipeline = $pipeline
110: ->with('HasPayload', true)
111: ->with('PayloadIsStream', $stream)
112: ->with('Payload', $payload)
113: ->with('Arg', $arg);
114: /** @var self<TInput&T0,TOutput,TArgument&T1> */
115: return $pipeline;
116: }
117:
118: /**
119: * @inheritDoc
120: */
121: public function withConformity(int $conformity)
122: {
123: $this->assertHasPayload();
124:
125: return $this->with('Conformity', $conformity);
126: }
127:
128: /**
129: * @inheritDoc
130: */
131: public function getConformity(): int
132: {
133: $this->assertHasPayload();
134:
135: return $this->Conformity;
136: }
137:
138: /**
139: * @inheritDoc
140: */
141: public function after(Closure $closure)
142: {
143: if ($this->After) {
144: throw new LogicException(static::class . '::after() already applied');
145: }
146:
147: return $this->with('After', $closure);
148: }
149:
150: /**
151: * @inheritDoc
152: */
153: public function afterIf(Closure $closure)
154: {
155: return $this->After
156: ? $this
157: : $this->with('After', $closure);
158: }
159:
160: /**
161: * @inheritDoc
162: */
163: public function through(Closure $pipe)
164: {
165: $pipes = $this->Pipes;
166: $pipes[] = $pipe;
167:
168: return $this->with('Pipes', $pipes);
169: }
170:
171: /**
172: * @inheritDoc
173: */
174: public function throughClosure(Closure $closure)
175: {
176: return $this->through(
177: static fn($payload, Closure $next, self $pipeline, $arg) =>
178: $next($closure($payload, $pipeline, $arg))
179: );
180: }
181:
182: /**
183: * @inheritDoc
184: */
185: public function throughKeyMap(array $keyMap, int $flags = ArrayMapper::ADD_UNMAPPED)
186: {
187: $keyMapKey = count($this->KeyMaps);
188: $pipeline = $this->through(
189: static function ($payload, Closure $next, self $pipeline) use ($keyMapKey) {
190: /** @var mixed[] $payload */
191: // @phpstan-ignore varTag.nativeType
192: return $next($pipeline->ArrayMappers[$keyMapKey]->map($payload));
193: }
194: );
195: $pipeline->KeyMaps[] = [$keyMap, $flags];
196: return $pipeline;
197: }
198:
199: /**
200: * @inheritDoc
201: */
202: public function then(Closure $closure)
203: {
204: if ($this->Then || $this->CollectThen) {
205: throw new LogicException(static::class . '::then() already applied');
206: }
207:
208: return $this->with('Then', $closure);
209: }
210:
211: /**
212: * @inheritDoc
213: */
214: public function thenIf(Closure $closure)
215: {
216: return $this->Then || $this->CollectThen
217: ? $this
218: : $this->with('Then', $closure);
219: }
220:
221: /**
222: * @inheritDoc
223: */
224: public function collectThen(Closure $closure)
225: {
226: $this->assertHasStream();
227:
228: if ($this->Then || $this->CollectThen) {
229: throw new LogicException(static::class . '::then() already applied');
230: }
231:
232: return $this->with('CollectThen', $closure);
233: }
234:
235: /**
236: * @inheritDoc
237: */
238: public function collectThenIf(Closure $closure)
239: {
240: $this->assertHasStream();
241:
242: return $this->Then || $this->CollectThen
243: ? $this
244: : $this->with('CollectThen', $closure);
245: }
246:
247: /**
248: * @inheritDoc
249: */
250: public function cc(Closure $closure)
251: {
252: $cc = $this->Cc;
253: $cc[] = $closure;
254:
255: return $this->with('Cc', $cc);
256: }
257:
258: /**
259: * @inheritDoc
260: */
261: public function unless(Closure $filter)
262: {
263: $this->assertHasStream();
264:
265: if ($this->Unless) {
266: throw new LogicException(static::class . '::unless() already applied');
267: }
268:
269: return $this->with('Unless', $filter);
270: }
271:
272: /**
273: * @inheritDoc
274: */
275: public function unlessIf(Closure $filter)
276: {
277: $this->assertHasStream();
278:
279: return $this->Unless
280: ? $this
281: : $this->with('Unless', $filter);
282: }
283:
284: /**
285: * @inheritDoc
286: */
287: public function run()
288: {
289: $this->assertHasOnePayload();
290:
291: $result = $this->getClosure()(
292: $this->After
293: ? ($this->After)($this->Payload, $this, $this->Arg)
294: : $this->Payload
295: );
296:
297: if ($this->Cc) {
298: $this->ccResult($result);
299: }
300:
301: return $result;
302: }
303:
304: /**
305: * @inheritDoc
306: */
307: public function runInto(PipelineInterface $next)
308: {
309: return $next->send($this->run(), $this->Arg);
310: }
311:
312: /**
313: * @inheritDoc
314: */
315: public function start(): iterable
316: {
317: $this->assertHasStream();
318:
319: $closure = $this->getClosure();
320: $results = [];
321: foreach ($this->Payload as $key => $payload) {
322: $result = $closure(
323: $this->After
324: ? ($this->After)($payload, $this, $this->Arg)
325: : $payload
326: );
327:
328: if (
329: $this->Unless
330: && ($this->Unless)($result, $this, $this->Arg) !== false
331: ) {
332: continue;
333: }
334:
335: if ($this->CollectThen) {
336: if (!is_int($key) && !is_string($key)) {
337: throw new LogicException(sprintf(
338: 'Key must be of type int|string, %s given',
339: Get::type($key),
340: ));
341: }
342: $results[$key] = $result;
343: continue;
344: }
345:
346: if ($this->Cc) {
347: $this->ccResult($result);
348: }
349:
350: yield $key => $result;
351: }
352:
353: if (!$this->CollectThen || !$results) {
354: return;
355: }
356:
357: $results = ($this->CollectThen)($results, $this, $this->Arg);
358: foreach ($results as $key => $result) {
359: if ($this->Cc) {
360: $this->ccResult($result);
361: }
362:
363: yield $key => $result;
364: }
365: }
366:
367: /**
368: * @inheritDoc
369: */
370: public function startInto(PipelineInterface $next)
371: {
372: return $next->stream($this->start(), $this->Arg);
373: }
374:
375: /**
376: * @param mixed $result
377: */
378: private function ccResult($result): void
379: {
380: foreach ($this->Cc as $closure) {
381: $closure($result, $this, $this->Arg);
382: }
383: }
384:
385: private function getClosure(): Closure
386: {
387: $this->ArrayMappers = [];
388: foreach ($this->KeyMaps as [$keyMap, $flags]) {
389: $this->ArrayMappers[] = new ArrayMapper($keyMap, $this->Conformity, $flags);
390: }
391:
392: $closure = $this->Then
393: ? fn($result) => ($this->Then)($result, $this, $this->Arg)
394: : fn($result) => $result;
395:
396: foreach (array_reverse($this->Pipes) as $pipe) {
397: $closure = fn($payload) =>
398: $pipe($payload, $closure, $this, $this->Arg);
399: }
400:
401: return $closure;
402: }
403:
404: /**
405: * @phpstan-assert iterable<TInput> $this->Payload
406: */
407: private function assertHasStream(): void
408: {
409: $this->assertHasPayload(true);
410: }
411:
412: /**
413: * @phpstan-assert TInput $this->Payload
414: */
415: private function assertHasOnePayload(): void
416: {
417: $this->assertHasPayload(false);
418: }
419:
420: private function assertHasPayload(?bool $stream = null): void
421: {
422: if (!$this->HasPayload) {
423: throw new LogicException('No payload');
424: }
425: if ($stream !== null && $this->PayloadIsStream !== $stream) {
426: throw new LogicException('Invalid payload');
427: }
428: }
429: }
430: