1: <?php declare(strict_types=1);
2:
3: namespace Salient\Core;
4:
5: use Salient\Contract\HasFileDescriptor;
6: use Salient\Core\Exception\ProcessDidNotTerminateException;
7: use Salient\Core\Exception\ProcessException;
8: use Salient\Core\Exception\ProcessFailedException;
9: use Salient\Core\Exception\ProcessTerminatedBySignalException;
10: use Salient\Core\Exception\ProcessTimedOutException;
11: use Salient\Utility\Arr;
12: use Salient\Utility\File;
13: use Salient\Utility\Str;
14: use Salient\Utility\Sys;
15: use Closure;
16: use InvalidArgumentException;
17: use LogicException;
18: use Throwable;
19:
20: /**
21: * @api
22: */
23: final class Process implements HasFileDescriptor
24: {
25: private const READY = 0;
26: private const RUNNING = 1;
27: private const TERMINATED = 2;
28:
29: /**
30: * Microseconds to wait between process status checks
31: */
32: private const POLL_INTERVAL = 10000;
33:
34: /**
35: * Microseconds to wait for stream activity (upper limit)
36: */
37: private const READ_INTERVAL = 200000;
38:
39: private const DEFAULT_OPTIONS = [
40: 'suppress_errors' => true,
41: 'bypass_shell' => true,
42: ];
43:
44: /**
45: * @var array{start_time:float,spawn_interval:float,poll_time:float,poll_count:int,read_time:float,read_count:int,stop_time:float,stop_count:int}
46: */
47: private const DEFAULT_STATS = [
48: 'start_time' => 0.0,
49: 'spawn_interval' => 0.0,
50: 'poll_time' => 0.0,
51: 'poll_count' => 0,
52: 'read_time' => 0.0,
53: 'read_count' => 0,
54: 'stop_time' => 0.0,
55: 'stop_count' => 0,
56: ];
57:
58: private const SIGTERM = 15;
59: private const SIGKILL = 9;
60:
61: /** @var list<string>|string */
62: private $Command;
63: /** @var resource */
64: private $Input;
65: private bool $RewindOnStart;
66: /** @var (Closure(self::STDOUT|self::STDERR, string): mixed)|null */
67: private ?Closure $Callback;
68: private ?string $Cwd;
69: /** @var array<string,string>|null */
70: private ?array $Env;
71: private ?float $Timeout;
72: private ?int $Sec;
73: private int $Usec;
74: private bool $CollectOutput;
75: /** @readonly */
76: private bool $UseOutputFiles;
77: /** @var array<string,bool>|null */
78: private ?array $Options = null;
79:
80: // --
81:
82: private int $State = self::READY;
83: /** @var (Closure(self::STDOUT|self::STDERR, string): mixed)|null */
84: private ?Closure $CurrentCallback = null;
85: private ?string $OutputDir = null;
86: /** @var array<self::STDOUT|self::STDERR,resource> */
87: private array $OutputFiles;
88: /** @var array<self::STDOUT|self::STDERR,int<0,max>> */
89: private array $OutputFilePos;
90: /** @var int|float|null */
91: private $StartTime = null;
92: /** @var resource|null */
93: private $Process = null;
94: private bool $Stopped = false;
95: /** @var array<self::STDOUT|self::STDERR,resource> */
96: private array $Pipes;
97: /** @var array{command:string,pid:int,running:bool,signaled:bool,stopped:bool,exitcode:int,termsig:int,stopsig:int} */
98: private array $ProcessStatus;
99: private int $Pid;
100: private int $ExitStatus;
101: /** @var int|float|null */
102: private $LastPollTime = null;
103: /** @var int|float|null */
104: private $LastReadTime = null;
105: /** @var array<self::STDOUT|self::STDERR,resource> */
106: private array $Output = [];
107: /** @var array<self::STDOUT|self::STDERR,int<0,max>> */
108: private array $OutputPos = [];
109: /** @var array{start_time:float,spawn_interval:float,poll_time:float,poll_count:int,read_time:float,read_count:int,stop_time:float,stop_count:int} */
110: private array $Stats = self::DEFAULT_STATS;
111:
112: /**
113: * @api
114: *
115: * @param list<string> $command
116: * @param resource|string|null $input Copied to a seekable stream if not
117: * already seekable, then rewound before each run.
118: * @param (Closure(Process::STDOUT|Process::STDERR $fd, string $output): mixed)|null $callback
119: * @param array<string,string>|null $env
120: */
121: public function __construct(
122: array $command,
123: $input = null,
124: ?Closure $callback = null,
125: ?string $cwd = null,
126: ?array $env = null,
127: ?float $timeout = null,
128: bool $collectOutput = true,
129: bool $useOutputFiles = false
130: ) {
131: $this->Command = $command;
132: $this->Callback = $callback;
133: $this->Cwd = $cwd;
134: $this->Env = $env;
135: $this->CollectOutput = $collectOutput;
136: $this->UseOutputFiles = $useOutputFiles || Sys::isWindows();
137: $this->applyInput($input);
138: $this->applyTimeout($timeout);
139: }
140:
141: /**
142: * Get a new process for a shell command
143: *
144: * @param resource|string|null $input Copied to a seekable stream if not
145: * already seekable, then rewound before each run.
146: * @param (Closure(Process::STDOUT|Process::STDERR $fd, string $output): mixed)|null $callback
147: * @param array<string,string>|null $env
148: */
149: public static function withShellCommand(
150: string $command,
151: $input = null,
152: ?Closure $callback = null,
153: ?string $cwd = null,
154: ?array $env = null,
155: ?float $timeout = null,
156: bool $collectOutput = true,
157: bool $useOutputFiles = false
158: ): self {
159: $process = new self([], $input, $callback, $cwd, $env, $timeout, $collectOutput, $useOutputFiles);
160: $process->Command = $command;
161: $process->Options = Arr::unset(self::DEFAULT_OPTIONS, 'bypass_shell');
162: return $process;
163: }
164:
165: /**
166: * @internal
167: */
168: public function __destruct()
169: {
170: if ($this->updateStatus()->isRunning()) {
171: $this->stop();
172: }
173: if ($this->UseOutputFiles) {
174: if ($this->OutputFiles ?? null) {
175: $this->closeStreams($this->OutputFiles);
176: }
177: if ($this->OutputDir !== null && is_dir($this->OutputDir)) {
178: File::pruneDir($this->OutputDir, true);
179: }
180: }
181: }
182:
183: /**
184: * Set the input passed to the process
185: *
186: * @param resource|string|null $input Copied to a seekable stream if not
187: * already seekable, then rewound before each run.
188: * @return $this
189: * @throws LogicException if the process is running.
190: */
191: public function setInput($input): self
192: {
193: $this->assertIsNotRunning();
194: $this->applyInput($input);
195: return $this;
196: }
197:
198: /**
199: * @param resource|string|null $input
200: */
201: private function applyInput($input): void
202: {
203: $this->Input = $input === null || is_string($input)
204: ? Str::toStream((string) $input)
205: : File::getSeekableStream($input);
206: $this->RewindOnStart = true;
207: }
208:
209: /**
210: * Pass input directly to the process
211: *
212: * @param resource $input
213: * @return $this
214: * @throws LogicException if the process is running.
215: */
216: public function pipeInput($input): self
217: {
218: $this->assertIsNotRunning();
219: $this->Input = $input;
220: $this->RewindOnStart = false;
221: return $this;
222: }
223:
224: /**
225: * Set the callback that receives output from the process
226: *
227: * @param (Closure(Process::STDOUT|Process::STDERR $fd, string $output): mixed)|null $callback
228: * @return $this
229: * @throws LogicException if the process is running.
230: */
231: public function setCallback(?Closure $callback): self
232: {
233: $this->assertIsNotRunning();
234: $this->Callback = $callback;
235: return $this;
236: }
237:
238: /**
239: * Set the initial working directory of the process
240: *
241: * @return $this
242: * @throws LogicException if the process is running.
243: */
244: public function setCwd(?string $cwd): self
245: {
246: $this->assertIsNotRunning();
247: $this->Cwd = $cwd;
248: return $this;
249: }
250:
251: /**
252: * Set the environment of the process
253: *
254: * @param array<string,string>|null $env
255: * @return $this
256: * @throws LogicException if the process is running.
257: */
258: public function setEnv(?array $env): self
259: {
260: $this->assertIsNotRunning();
261: $this->Env = $env;
262: return $this;
263: }
264:
265: /**
266: * Set the maximum number of seconds to allow the process to run
267: *
268: * @return $this
269: * @throws LogicException if the process is running.
270: */
271: public function setTimeout(?float $timeout): self
272: {
273: $this->assertIsNotRunning();
274: $this->applyTimeout($timeout);
275: return $this;
276: }
277:
278: private function applyTimeout(?float $timeout): void
279: {
280: if ($timeout !== null && $timeout <= 0) {
281: throw new InvalidArgumentException(sprintf(
282: 'Invalid timeout: %.3fs',
283: $timeout,
284: ));
285: }
286:
287: $this->Timeout = $timeout;
288: [$this->Sec, $this->Usec] = $timeout === null
289: ? [null, 0]
290: : [0, min((int) ($timeout * 1000000), self::READ_INTERVAL)];
291: }
292:
293: /**
294: * Disable collection of output written to STDOUT and STDERR by the process
295: *
296: * @return $this
297: * @throws LogicException if the process is running.
298: */
299: public function disableOutputCollection(): self
300: {
301: $this->assertIsNotRunning();
302: $this->CollectOutput = false;
303: return $this;
304: }
305:
306: /**
307: * Enable collection of output written to STDOUT and STDERR by the process
308: *
309: * @return $this
310: * @throws LogicException if the process is running.
311: */
312: public function enableOutputCollection(): self
313: {
314: $this->assertIsNotRunning();
315: $this->CollectOutput = true;
316: return $this;
317: }
318:
319: /**
320: * Run the process and throw an exception if its exit status is non-zero
321: *
322: * @param (Closure(Process::STDOUT|Process::STDERR $fd, string $output): mixed)|null $callback
323: * @return $this
324: * @throws LogicException if the process is running.
325: * @throws ProcessTimedOutException if the process times out.
326: * @throws ProcessTerminatedBySignalException if the process is terminated
327: * by an uncaught signal.
328: * @throws ProcessFailedException if the process returns a non-zero exit
329: * status.
330: */
331: public function runWithoutFail(?Closure $callback = null): self
332: {
333: if ($this->run($callback) !== 0) {
334: throw new ProcessFailedException(
335: 'Process failed with exit status %d: %s',
336: [$this->ExitStatus, $this],
337: );
338: }
339: return $this;
340: }
341:
342: /**
343: * Run the process and return its exit status
344: *
345: * @param (Closure(Process::STDOUT|Process::STDERR $fd, string $output): mixed)|null $callback
346: * @throws LogicException if the process is running.
347: * @throws ProcessTimedOutException if the process times out.
348: * @throws ProcessTerminatedBySignalException if the process is terminated
349: * by an uncaught signal.
350: */
351: public function run(?Closure $callback = null): int
352: {
353: return $this->start($callback)->wait();
354: }
355:
356: /**
357: * Start the process in the background
358: *
359: * @param (Closure(Process::STDOUT|Process::STDERR $fd, string $output): mixed)|null $callback
360: * @return $this
361: * @throws LogicException if the process is running.
362: * @throws ProcessTerminatedBySignalException if the process is terminated
363: * by an uncaught signal.
364: */
365: public function start(?Closure $callback = null): self
366: {
367: $this->assertIsNotRunning();
368:
369: $this->reset();
370: $this->CurrentCallback = $callback ?? $this->Callback;
371:
372: if ($this->RewindOnStart) {
373: File::rewind($this->Input);
374: }
375: $descriptors = [self::STDIN => $this->Input];
376: $handles = [];
377:
378: if ($this->UseOutputFiles) {
379: // Use files in a temporary directory to collect output (necessary
380: // on Windows, where `proc_open()` blocks until the process exits if
381: // standard output pipes are used, and useful in scenarios where
382: // polling for output would be inefficient)
383: $this->OutputDir ??= File::createTempDir();
384: foreach ([self::STDOUT, self::STDERR] as $fd) {
385: $file = $this->OutputDir . '/' . $fd;
386: $descriptors[$fd] = ['file', $file, 'w'];
387: $stream = $this->OutputFiles[$fd] ?? null;
388: // Create output files before the first run, and truncate them
389: // before subsequent runs
390: if (!$stream) {
391: $stream = File::open($file, 'w+');
392: $this->OutputFiles[$fd] = $stream;
393: } else {
394: File::truncate($stream, 0, $file);
395: }
396: if ($this->CollectOutput) {
397: $this->Output[$fd] = $stream;
398: $this->OutputPos[$fd] = 0;
399: $this->OutputFilePos[$fd] = 0;
400: }
401: // Tail output files separately
402: $handles[$fd] = File::open($file, 'r');
403: }
404: } else {
405: $descriptors += [
406: self::STDOUT => ['pipe', 'w'],
407: self::STDERR => ['pipe', 'w'],
408: ];
409: if ($this->CollectOutput) {
410: $this->Output = [
411: self::STDOUT => File::open('php://temp', 'a+'),
412: self::STDERR => File::open('php://temp', 'a+'),
413: ];
414: $this->OutputPos = [
415: self::STDOUT => 0,
416: self::STDERR => 0,
417: ];
418: }
419: }
420:
421: $this->StartTime = hrtime(true);
422: $this->Process = $this->check(
423: @proc_open(
424: $this->Command,
425: $descriptors,
426: $pipes,
427: $this->Cwd,
428: $this->Env,
429: $this->Options ?? self::DEFAULT_OPTIONS,
430: ),
431: 'Error starting process: %s',
432: );
433:
434: /** @var array<self::STDOUT|self::STDERR,resource> $pipes */
435: $now = hrtime(true);
436: $this->Stats['start_time'] = $this->StartTime / 1000;
437: $this->Stats['spawn_interval'] = ($now - $this->StartTime) / 1000;
438:
439: $pipes += $handles;
440: foreach ($pipes as $pipe) {
441: @stream_set_blocking($pipe, false);
442: }
443: $this->Pipes = $pipes;
444: $this->State = self::RUNNING;
445: $this->updateStatus();
446: $this->Pid = $this->ProcessStatus['pid'];
447: return $this;
448: }
449:
450: /**
451: * Wait for the process to exit and return its exit status
452: *
453: * @throws LogicException if the process has not run.
454: * @throws ProcessTimedOutException if the process times out.
455: * @throws ProcessTerminatedBySignalException if the process is terminated
456: * by an uncaught signal.
457: */
458: public function wait(): int
459: {
460: $this->assertHasRun();
461:
462: while ($this->Pipes) {
463: $this->checkTimeout();
464: $this->read();
465: $this->updateStatus(false);
466: }
467:
468: while ($this->isRunning()) {
469: $this->checkTimeout();
470: usleep(self::POLL_INTERVAL);
471: }
472:
473: return $this->ExitStatus;
474: }
475:
476: /**
477: * Check for output written by the process and update its status
478: *
479: * If fewer than {@see Process::POLL_INTERVAL} microseconds have passed
480: * since the process was last polled, a delay is inserted to minimise CPU
481: * usage.
482: *
483: * @return $this
484: * @throws LogicException if the process has not run.
485: * @throws ProcessTimedOutException if the process times out.
486: * @throws ProcessTerminatedBySignalException if the process is terminated
487: * by an uncaught signal.
488: */
489: public function poll(bool $now = false): self
490: {
491: $this->assertHasRun();
492:
493: $this->checkTimeout();
494: if (!$now) {
495: $this->awaitInterval($this->LastPollTime, self::POLL_INTERVAL);
496: }
497: $this->updateStatus();
498:
499: return $this;
500: }
501:
502: /**
503: * Terminate the process if it is still running
504: *
505: * @return $this
506: * @throws LogicException if the process has not run.
507: * @throws ProcessTerminatedBySignalException if the process is terminated
508: * by an uncaught signal.
509: * @throws ProcessDidNotTerminateException if the process does not
510: * terminate.
511: */
512: public function stop(float $timeout = 10): self
513: {
514: $this->assertHasRun();
515:
516: // Work around issue where processes don't receive signals immediately
517: // after launch
518: $this->awaitInterval($this->StartTime, self::POLL_INTERVAL);
519:
520: if (!$this->updateStatus()->isRunning()) {
521: return $this;
522: }
523:
524: try {
525: // Send SIGTERM first, then SIGKILL if the process is still running
526: // after `$timeout` seconds
527: if (
528: $this->doStop(self::SIGTERM, $timeout)
529: || $this->doStop(self::SIGKILL, 1)
530: ) {
531: return $this;
532: }
533: } catch (ProcessException $ex) {
534: // Ignore the exception if the process is no longer running
535: // @phpstan-ignore booleanNot.alwaysFalse
536: if (!$this->updateStatus()->isRunning()) {
537: return $this;
538: }
539: throw $ex;
540: }
541:
542: throw new ProcessDidNotTerminateException(
543: 'Process did not terminate: %s',
544: [$this],
545: );
546: }
547:
548: private function doStop(int $signal, float $timeout): bool
549: {
550: if (!$this->Process) {
551: return true;
552: }
553:
554: $now = hrtime(true);
555: $this->check(
556: @proc_terminate($this->Process, $signal),
557: 'Error terminating process: %s',
558: );
559: $this->Stats['stop_count']++;
560: if (!$this->Stopped) {
561: $this->Stats['stop_time'] = $now / 1000;
562: $this->Stopped = true;
563: }
564:
565: $until = $now + $timeout * 1000000000;
566: do {
567: usleep(self::POLL_INTERVAL);
568: if (!$this->isRunning()) {
569: return true;
570: }
571: } while (hrtime(true) < $until);
572:
573: return false;
574: }
575:
576: /**
577: * Check if the process is running
578: */
579: public function isRunning(): bool
580: {
581: return $this->State === self::RUNNING
582: && $this->maybeUpdateStatus()->State === self::RUNNING;
583: }
584:
585: /**
586: * Check if the process ran and terminated
587: */
588: public function isTerminated(): bool
589: {
590: return $this->State === self::TERMINATED
591: || $this->maybeUpdateStatus()->State === self::TERMINATED;
592: }
593:
594: /**
595: * Get the command spawned by the process
596: *
597: * @return list<string>|string
598: */
599: public function getCommand()
600: {
601: return $this->Command;
602: }
603:
604: /**
605: * Get the process ID of the command spawned by the process
606: *
607: * @throws LogicException if the process has not run.
608: */
609: public function getPid(): int
610: {
611: $this->assertHasRun();
612: return $this->Pid;
613: }
614:
615: /**
616: * Get output written to STDOUT or STDERR by the process
617: *
618: * @param Process::STDOUT|Process::STDERR $fd
619: * @throws LogicException if the process has not run or if output collection
620: * is disabled.
621: */
622: public function getOutput(int $fd = Process::STDOUT): string
623: {
624: return $this->doGetOutput($fd, false, false);
625: }
626:
627: /**
628: * Get output written to STDOUT or STDERR by the process since it was last
629: * read
630: *
631: * @param Process::STDOUT|Process::STDERR $fd
632: * @throws LogicException if the process has not run or if output collection
633: * is disabled.
634: */
635: public function getNewOutput(int $fd = Process::STDOUT): string
636: {
637: return $this->doGetOutput($fd, false, true);
638: }
639:
640: /**
641: * Get text written to STDOUT or STDERR by the process
642: *
643: * @param Process::STDOUT|Process::STDERR $fd
644: * @throws LogicException if the process has not run or if output collection
645: * is disabled.
646: */
647: public function getOutputAsText(int $fd = Process::STDOUT): string
648: {
649: return $this->doGetOutput($fd, true, false);
650: }
651:
652: /**
653: * Get text written to STDOUT or STDERR by the process since it was last
654: * read
655: *
656: * @param Process::STDOUT|Process::STDERR $fd
657: * @throws LogicException if the process has not run or if output collection
658: * is disabled.
659: */
660: public function getNewOutputAsText(int $fd = Process::STDOUT): string
661: {
662: return $this->doGetOutput($fd, true, true);
663: }
664:
665: /**
666: * @param self::STDOUT|self::STDERR $fd
667: */
668: private function doGetOutput(int $fd, bool $text, bool $new): string
669: {
670: $this->assertHasRun();
671:
672: if (!$this->Output) {
673: throw new LogicException('Output collection disabled');
674: }
675:
676: $stream = $this->updateStatus()->Output[$fd];
677: $offset = $new
678: ? $this->OutputPos[$fd]
679: : ($this->UseOutputFiles
680: ? $this->OutputFilePos[$fd]
681: : 0);
682: $output = File::getContents($stream, $offset);
683: /** @var int<0,max> */
684: $pos = File::tell($stream);
685: $this->OutputPos[$fd] = $pos;
686: return $text
687: ? Str::trimNativeEol($output)
688: : $output;
689: }
690:
691: /**
692: * Forget output written to STDOUT and STDERR by the process
693: *
694: * @return $this
695: */
696: public function clearOutput(): self
697: {
698: if (!$this->Output) {
699: return $this;
700: }
701:
702: foreach ([self::STDOUT, self::STDERR] as $fd) {
703: $stream = $this->Output[$fd];
704: if ($this->UseOutputFiles) {
705: /** @var int<0,max> */
706: $pos = File::tell($stream);
707: $this->OutputFilePos[$fd] = $pos;
708: } else {
709: File::truncate($stream);
710: }
711: $this->OutputPos[$fd] = 0;
712: }
713:
714: return $this;
715: }
716:
717: /**
718: * Get the exit status of the process
719: *
720: * @throws ProcessException if the process has not terminated.
721: */
722: public function getExitStatus(): int
723: {
724: $this->assertHasTerminated();
725: return $this->ExitStatus;
726: }
727:
728: /**
729: * Get process statistics
730: *
731: * @return array{start_time:float,spawn_interval:float,poll_time:float,poll_count:int,read_time:float,read_count:int,stop_time:float,stop_count:int}
732: * @throws LogicException if the process has not run.
733: */
734: public function getStats(): array
735: {
736: $this->assertHasRun();
737: return $this->Stats;
738: }
739:
740: /**
741: * @return $this
742: */
743: private function checkTimeout(): self
744: {
745: if (
746: $this->State !== self::RUNNING
747: || $this->Timeout === null
748: || $this->Timeout > (hrtime(true) - $this->StartTime) / 1000000000
749: ) {
750: return $this;
751: }
752:
753: try {
754: $this->stop();
755: } catch (Throwable $ex) {
756: throw new ProcessException(
757: 'Error terminating process that timed out after %.3fs: %s',
758: [$this->Timeout, $this],
759: $ex,
760: );
761: }
762:
763: throw new ProcessTimedOutException(
764: 'Process timed out after %.3fs: %s',
765: [$this->Timeout, $this],
766: );
767: }
768:
769: /**
770: * @return $this
771: */
772: private function maybeUpdateStatus(): self
773: {
774: return $this->checkInterval($this->LastPollTime, self::POLL_INTERVAL)
775: ? $this->updateStatus()
776: : $this;
777: }
778:
779: /**
780: * @return $this
781: */
782: private function updateStatus(bool $read = true): self
783: {
784: if (!$this->Process) {
785: return $this;
786: }
787:
788: $now = hrtime(true);
789: $process = $this->Process;
790: $this->ProcessStatus = $this->check(
791: @proc_get_status($process),
792: 'Error getting process status: %s',
793: );
794: $this->LastPollTime = $now;
795: $this->Stats['poll_time'] = $now / 1000;
796: $this->Stats['poll_count']++;
797:
798: $running = $this->ProcessStatus['running'];
799: if ($read || !$running) {
800: $this->read(false, !$running);
801: }
802: if (!$running) {
803: // Close any pipes left open by `$this->read()`
804: if ($this->Pipes) {
805: // @codeCoverageIgnoreStart
806: $this->closeStreams($this->Pipes);
807: // @codeCoverageIgnoreEnd
808: }
809:
810: // The return value of `proc_close()` is not reliable, so ignore it
811: // and use `error_get_last()` to check for errors
812: error_clear_last();
813: @proc_close($process);
814: if ($error = error_get_last()) {
815: // @codeCoverageIgnoreStart
816: $this->throw('Error closing process: %s', $error);
817: // @codeCoverageIgnoreEnd
818: }
819:
820: $this->ExitStatus = $this->ProcessStatus['exitcode'];
821: $this->State = self::TERMINATED;
822: $this->Process = null;
823: if (
824: $this->ExitStatus === -1
825: && $this->ProcessStatus['signaled']
826: && ($signal = $this->ProcessStatus['termsig']) > 0
827: ) {
828: $this->ExitStatus = 128 + $signal;
829: if (!$this->Stopped || ([
830: self::SIGTERM => false,
831: self::SIGKILL => false,
832: ][$signal] ?? true)) {
833: throw new ProcessTerminatedBySignalException(
834: 'Process terminated by signal %d: %s',
835: [$signal, $this],
836: );
837: }
838: }
839: }
840:
841: return $this;
842: }
843:
844: private function read(bool $wait = true, bool $closeAtEof = false): void
845: {
846: if (!$this->Pipes) {
847: return;
848: }
849:
850: $now = hrtime(true);
851: $read = $this->Pipes;
852: if ($this->UseOutputFiles) {
853: if ($wait) {
854: $usec = $this->Usec === 0
855: ? self::READ_INTERVAL
856: : $this->Usec;
857: $this->awaitInterval($this->LastReadTime, $usec);
858: }
859: } else {
860: $write = null;
861: $except = null;
862: $sec = $wait ? $this->Sec : 0;
863: $usec = $wait ? $this->Usec : 0;
864: File::select($read, $write, $except, $sec, $usec);
865: }
866: foreach ($read as $i => $pipe) {
867: $data = File::getContents($pipe);
868: if ($data !== '') {
869: if ($this->CollectOutput && !$this->UseOutputFiles) {
870: File::writeAll($this->Output[$i], $data);
871: }
872: if ($this->CurrentCallback) {
873: ($this->CurrentCallback)($i, $data);
874: }
875: }
876: if ((!$this->UseOutputFiles || $closeAtEof) && File::eof($pipe)) {
877: File::close($pipe);
878: unset($this->Pipes[$i]);
879: }
880: }
881:
882: $this->LastReadTime = $now;
883: $this->Stats['read_time'] = $now / 1000;
884: $this->Stats['read_count']++;
885: }
886:
887: /**
888: * @param resource[] $streams
889: * @param-out array{} $streams
890: */
891: private function closeStreams(array &$streams): void
892: {
893: foreach ($streams as $stream) {
894: File::close($stream);
895: }
896: $streams = [];
897: }
898:
899: private function reset(): void
900: {
901: $this->CurrentCallback = null;
902: unset($this->OutputFilePos);
903: $this->StartTime = null;
904: $this->Process = null;
905: $this->Stopped = false;
906: unset($this->Pipes);
907: unset($this->ProcessStatus);
908: unset($this->Pid);
909: unset($this->ExitStatus);
910: $this->LastPollTime = null;
911: $this->LastReadTime = null;
912: $this->Output = [];
913: $this->OutputPos = [];
914: $this->Stats = self::DEFAULT_STATS;
915: }
916:
917: /**
918: * Wait until at least $interval microseconds have passed since a given time
919: *
920: * @param int|float|null $time
921: * @return $this
922: */
923: private function awaitInterval($time, int $interval): self
924: {
925: if ($time === null) {
926: return $this;
927: }
928: $now = hrtime(true);
929: $usec = (int) ($interval - ($now - $time) / 1000);
930: if ($usec > 0) {
931: usleep($usec);
932: }
933: return $this;
934: }
935:
936: /**
937: * Check if at least $interval microseconds have passed since a given time
938: *
939: * @param int|float|null $time
940: */
941: private function checkInterval($time, int $interval): bool
942: {
943: if ($time === null) {
944: return true;
945: }
946: $now = hrtime(true);
947: return (int) ($interval - ($now - $time) / 1000) <= 0;
948: }
949:
950: /**
951: * @template T
952: *
953: * @param T|false $result
954: * @return ($result is false ? never : T)
955: */
956: private function check($result, string $message)
957: {
958: if ($result === false) {
959: $this->throw($message);
960: }
961: return $result;
962: }
963:
964: /**
965: * @param array{message:string,...}|null $error
966: * @return never
967: */
968: private function throw(string $message, ?array $error = null): void
969: {
970: $error ??= error_get_last();
971: if ($error) {
972: throw new ProcessException($error['message']);
973: }
974: // @codeCoverageIgnoreStart
975: throw new ProcessException(
976: $message,
977: [$this],
978: );
979: // @codeCoverageIgnoreEnd
980: }
981:
982: private function assertIsNotRunning(): void
983: {
984: if ($this->State === self::RUNNING) {
985: throw new LogicException('Process is running');
986: }
987: }
988:
989: private function assertHasRun(): void
990: {
991: if ($this->State === self::READY) {
992: throw new LogicException('Process has not run');
993: }
994: }
995:
996: private function assertHasTerminated(): void
997: {
998: if ($this->State !== self::TERMINATED) {
999: throw new LogicException('Process has not terminated');
1000: }
1001: }
1002: }
1003: