1: <?php declare(strict_types=1);
2:
3: namespace Salient\Core;
4:
5: use Salient\Contract\Core\FileDescriptor;
6: use Salient\Core\Exception\ProcessException;
7: use Salient\Core\Exception\ProcessFailedException;
8: use Salient\Core\Exception\ProcessTerminatedBySignalException;
9: use Salient\Core\Exception\ProcessTimedOutException;
10: use Salient\Utility\File;
11: use Salient\Utility\Get;
12: use Salient\Utility\Str;
13: use Salient\Utility\Sys;
14: use Closure;
15: use InvalidArgumentException;
16: use LogicException;
17: use RuntimeException;
18: use Throwable;
19:
20: /**
21: * A proc_open() process wrapper
22: */
23: final class Process
24: {
25: private const READY = 0;
26: private const RUNNING = 1;
27: private const TERMINATED = 2;
28:
29: /**
30: * Microseconds to wait between process status checks
31: */
32: private const POLL_INTERVAL = 10000;
33:
34: /**
35: * Microseconds to wait for stream activity
36: *
37: * When {@see Process::$UseOutputFiles} is `false` (the default on platforms
38: * other than Windows), this is an upper limit because
39: * {@see stream_select()} returns as soon as a status change is detected.
40: */
41: private const READ_INTERVAL = 200000;
42:
43: private const DEFAULT_OPTIONS = [
44: 'suppress_errors' => true,
45: 'bypass_shell' => true,
46: ];
47:
48: /**
49: * @var array{start_time:float,spawn_interval:float,poll_time:float,poll_count:int,read_time:float,read_count:int,stop_time:float,stop_count:int}
50: */
51: private const DEFAULT_STATS = [
52: 'start_time' => 0.0,
53: 'spawn_interval' => 0.0,
54: 'poll_time' => 0.0,
55: 'poll_count' => 0,
56: 'read_time' => 0.0,
57: 'read_count' => 0,
58: 'stop_time' => 0.0,
59: 'stop_count' => 0,
60: ];
61:
62: private const SIGTERM = 15;
63: private const SIGKILL = 9;
64:
65: /** @var string[]|string */
66: private $Command;
67: /** @var resource */
68: private $Input;
69: private bool $RewindInput;
70: /** @var (Closure(FileDescriptor::OUT|FileDescriptor::ERR, string): mixed)|null */
71: private ?Closure $Callback;
72: private ?string $Cwd;
73: /** @var array<string,string>|null */
74: private ?array $Env;
75: private ?float $Timeout;
76: private ?int $Sec;
77: private int $Usec;
78: private bool $CollectOutput;
79: /** @readonly */
80: private bool $UseOutputFiles;
81: /** @var array<string,bool>|null */
82: private ?array $Options = null;
83: private int $State = self::READY;
84: /** @var (Closure(FileDescriptor::OUT|FileDescriptor::ERR, string): mixed)|null */
85: private ?Closure $OutputCallback = null;
86: private ?string $OutputDir = null;
87: /** @var array<FileDescriptor::OUT|FileDescriptor::ERR,resource> */
88: private array $OutputFiles;
89: /** @var array<FileDescriptor::OUT|FileDescriptor::ERR,int<0,max>> */
90: private array $OutputFilePos;
91: /** @var int|float|null */
92: private $StartTime = null;
93: /** @var resource|null */
94: private $Process = null;
95: private bool $Stopped = false;
96: /** @var array<FileDescriptor::OUT|FileDescriptor::ERR,resource> */
97: private array $Pipes;
98: /** @var array{command:string,pid:int,running:bool,signaled:bool,stopped:bool,exitcode:int,termsig:int,stopsig:int} */
99: private array $ProcessStatus;
100: private int $Pid;
101: private int $ExitStatus;
102: /** @var int|float|null */
103: private $LastPollTime = null;
104: /** @var int|float|null */
105: private $LastReadTime = null;
106: /** @var int|float|null */
107: private $LastStopTime = null;
108: /** @var array<FileDescriptor::OUT|FileDescriptor::ERR,resource> */
109: private array $Output = [];
110: /** @var array<FileDescriptor::OUT|FileDescriptor::ERR,int<0,max>> */
111: private array $OutputPos = [];
112: /** @var array{start_time:float,spawn_interval:float,poll_time:float,poll_count:int,read_time:float,read_count:int,stop_time:float,stop_count:int} */
113: private array $Stats = self::DEFAULT_STATS;
114:
115: /**
116: * Creates a new Process object
117: *
118: * @param string[] $command
119: * @param resource|string|null $input
120: * @param (Closure(FileDescriptor::OUT|FileDescriptor::ERR $fd, string $output): mixed)|null $callback
121: * @param array<string,string>|null $env
122: */
123: public function __construct(
124: array $command,
125: $input = null,
126: ?Closure $callback = null,
127: ?string $cwd = null,
128: ?array $env = null,
129: ?float $timeout = null,
130: bool $collectOutput = true,
131: bool $useOutputFiles = false
132: ) {
133: $this->Command = $command;
134: $this->Callback = $callback;
135: $this->Cwd = $cwd;
136: $this->Env = $env;
137: $this->UseOutputFiles = $useOutputFiles || Sys::isWindows();
138: $this->CollectOutput = $collectOutput;
139:
140: $this->applyInput($input);
141: $this->applyTimeout($timeout);
142: }
143:
144: /**
145: * Creates a new Process object for a shell command
146: *
147: * @param resource|string|null $input
148: * @param (Closure(FileDescriptor::OUT|FileDescriptor::ERR $fd, string $output): mixed)|null $callback
149: * @param array<string,string>|null $env
150: */
151: public static function withShellCommand(
152: string $command,
153: $input = null,
154: ?Closure $callback = null,
155: ?string $cwd = null,
156: ?array $env = null,
157: ?float $timeout = null,
158: bool $collectOutput = true,
159: bool $useOutputFiles = false
160: ): self {
161: $process = new self([], $input, $callback, $cwd, $env, $timeout, $collectOutput, $useOutputFiles);
162: $process->Command = $command;
163: $process->Options = array_diff_key(self::DEFAULT_OPTIONS, ['bypass_shell' => null]);
164: return $process;
165: }
166:
167: /**
168: * @internal
169: */
170: public function __destruct()
171: {
172: if ($this->updateStatus()->isRunning()) {
173: $this->stop()->assertHasTerminated(true);
174: }
175:
176: if (!$this->UseOutputFiles) {
177: return;
178: }
179:
180: if ($this->OutputFiles ?? null) {
181: $this->closeStreams($this->OutputFiles);
182: }
183:
184: if ($this->OutputDir !== null && is_dir($this->OutputDir)) {
185: File::pruneDir($this->OutputDir, true);
186: }
187: }
188:
189: /**
190: * Pass input to the process, rewinding it before each run and making it
191: * seekable if necessary
192: *
193: * @param resource|string|null $input
194: * @return $this
195: * @throws LogicException if the process is running.
196: */
197: public function setInput($input)
198: {
199: $this->assertIsNotRunning();
200: $this->applyInput($input);
201: return $this;
202: }
203:
204: /**
205: * Pass input to the process without making it seekable or rewinding it
206: * before each run
207: *
208: * @param resource $input
209: * @return $this
210: * @throws LogicException if the process is running.
211: */
212: public function pipeInput($input)
213: {
214: $this->assertIsNotRunning();
215: $this->Input = $input;
216: $this->RewindInput = false;
217: return $this;
218: }
219:
220: /**
221: * Set the callback that receives output from the process
222: *
223: * @param (Closure(FileDescriptor::OUT|FileDescriptor::ERR $fd, string $output): mixed)|null $callback
224: * @return $this
225: * @throws LogicException if the process is running.
226: */
227: public function setCallback(?Closure $callback)
228: {
229: $this->assertIsNotRunning();
230: $this->Callback = $callback;
231: return $this;
232: }
233:
234: /**
235: * Set the initial working directory of the process
236: *
237: * @return $this
238: * @throws LogicException if the process is running.
239: */
240: public function setCwd(?string $cwd)
241: {
242: $this->assertIsNotRunning();
243: $this->Cwd = $cwd;
244: return $this;
245: }
246:
247: /**
248: * Set the environment of the process
249: *
250: * @param array<string,string>|null $env
251: * @return $this
252: * @throws LogicException if the process is running.
253: */
254: public function setEnv(?array $env)
255: {
256: $this->assertIsNotRunning();
257: $this->Env = $env;
258: return $this;
259: }
260:
261: /**
262: * Set the maximum number of seconds to allow the process to run
263: *
264: * @return $this
265: * @throws LogicException if the process is running.
266: */
267: public function setTimeout(?float $timeout)
268: {
269: $this->assertIsNotRunning();
270: $this->Timeout = $timeout;
271: return $this;
272: }
273:
274: /**
275: * Disable collection of output written to STDOUT and STDERR by the process
276: *
277: * @return $this
278: * @throws LogicException if the process is running.
279: */
280: public function disableOutputCollection()
281: {
282: $this->assertIsNotRunning();
283: $this->CollectOutput = false;
284: return $this;
285: }
286:
287: /**
288: * Enable collection of output written to STDOUT and STDERR by the process
289: *
290: * @return $this
291: * @throws LogicException if the process is running.
292: */
293: public function enableOutputCollection()
294: {
295: $this->assertIsNotRunning();
296: $this->CollectOutput = true;
297: return $this;
298: }
299:
300: /**
301: * Run the process, throwing an exception if its exit status is non-zero
302: *
303: * @param (Closure(FileDescriptor::OUT|FileDescriptor::ERR $fd, string $output): mixed)|null $callback
304: * @return $this
305: */
306: public function runWithoutFail(?Closure $callback = null)
307: {
308: if ($this->run($callback) !== 0) {
309: throw new ProcessFailedException(sprintf(
310: 'Process failed with exit status %d: %s',
311: $this->ExitStatus,
312: Get::code($this->Command),
313: ));
314: }
315:
316: return $this;
317: }
318:
319: /**
320: * Run the process and return its exit status
321: *
322: * @param (Closure(FileDescriptor::OUT|FileDescriptor::ERR $fd, string $output): mixed)|null $callback
323: */
324: public function run(?Closure $callback = null): int
325: {
326: return $this->start($callback)->wait();
327: }
328:
329: /**
330: * Run the process without waiting for it to exit
331: *
332: * @param (Closure(FileDescriptor::OUT|FileDescriptor::ERR $fd, string $output): mixed)|null $callback
333: * @return $this
334: */
335: public function start(?Closure $callback = null)
336: {
337: $this->assertIsNotRunning();
338:
339: $this->reset();
340: $this->OutputCallback = $callback ?? $this->Callback;
341:
342: if ($this->RewindInput) {
343: File::rewind($this->Input);
344: }
345:
346: $descriptors = [FileDescriptor::IN => $this->Input];
347: $handles = [];
348:
349: if ($this->UseOutputFiles) {
350: // Use files in a temporary directory to collect output. This is
351: // necessary on Windows, where proc_open() blocks until the process
352: // exits if standard output pipes are used, but is also useful in
353: // scenarios where polling for output would be inefficient.
354: $this->OutputDir ??= File::createTempDir();
355: foreach ([FileDescriptor::OUT, FileDescriptor::ERR] as $fd) {
356: $file = $this->OutputDir . '/' . $fd;
357: $descriptors[$fd] = ['file', $file, 'w'];
358:
359: // Use streams in $this->OutputFiles to:
360: //
361: // - create output files before the first run
362: // - truncate output files before subsequent runs
363: // - service $this->getOutput() etc. during and after each run
364: // (instead of writing the same output to php://temp streams)
365: if ($stream = $this->OutputFiles[$fd] ?? null) {
366: File::truncate($stream, 0, $file);
367: } else {
368: $stream = File::open($file, 'w+');
369: $this->OutputFiles[$fd] = $stream;
370: }
371: if ($this->CollectOutput) {
372: $this->Output[$fd] = $stream;
373: $this->OutputPos[$fd] = 0;
374: $this->OutputFilePos[$fd] = 0;
375: }
376:
377: // Create additional streams to tail output files for this run
378: $handles[$fd] = File::open($file, 'r');
379: }
380: } else {
381: $descriptors += [
382: FileDescriptor::OUT => ['pipe', 'w'],
383: FileDescriptor::ERR => ['pipe', 'w'],
384: ];
385: if ($this->CollectOutput) {
386: $this->Output = [
387: FileDescriptor::OUT => File::open('php://temp', 'a+'),
388: FileDescriptor::ERR => File::open('php://temp', 'a+'),
389: ];
390: $this->OutputPos = [
391: FileDescriptor::OUT => 0,
392: FileDescriptor::ERR => 0,
393: ];
394: }
395: }
396:
397: $this->StartTime = hrtime(true);
398:
399: $process = $this->throwOnFailure(
400: @proc_open(
401: $this->Command,
402: $descriptors,
403: $pipes,
404: $this->Cwd,
405: $this->Env,
406: $this->Options ?? self::DEFAULT_OPTIONS,
407: ),
408: 'Error running process: %s',
409: );
410:
411: $now = hrtime(true);
412: $this->Stats['start_time'] = $this->StartTime / 1000;
413: $this->Stats['spawn_interval'] = ($now - $this->StartTime) / 1000;
414:
415: $pipes += $handles;
416:
417: foreach ($pipes as $pipe) {
418: @stream_set_blocking($pipe, false);
419: }
420:
421: $this->Process = $process;
422: $this->Pipes = $pipes;
423: $this->State = self::RUNNING;
424:
425: $this->updateStatus();
426: $this->Pid = $this->ProcessStatus['pid'];
427:
428: return $this;
429: }
430:
431: /**
432: * Check the process for output and update its status
433: *
434: * If fewer than {@see Process::POLL_INTERVAL} microseconds have passed
435: * since the process was last polled, a delay is inserted to minimise CPU
436: * usage.
437: *
438: * @return $this
439: */
440: public function poll(bool $now = false)
441: {
442: $this->assertHasRun();
443:
444: $this->checkTimeout();
445: if (!$now) {
446: $this->awaitInterval($this->LastPollTime, self::POLL_INTERVAL);
447: }
448: $this->updateStatus();
449:
450: return $this;
451: }
452:
453: /**
454: * Wait for the process to exit and return its exit status
455: */
456: public function wait(): int
457: {
458: $this->assertHasRun();
459:
460: while ($this->Pipes) {
461: $this->checkTimeout();
462: $this->read();
463: $this->updateStatus(false);
464: }
465:
466: while ($this->isRunning()) {
467: $this->checkTimeout();
468: usleep(self::POLL_INTERVAL);
469: }
470:
471: if (
472: $this->ProcessStatus['signaled'] && (
473: !$this->Stopped || !(
474: $this->ProcessStatus['termsig'] === self::SIGTERM
475: || $this->ProcessStatus['termsig'] === self::SIGKILL
476: )
477: )
478: ) {
479: throw new ProcessTerminatedBySignalException(sprintf(
480: 'Process terminated by signal %d: %s',
481: $this->ProcessStatus['termsig'],
482: Get::code($this->Command),
483: ));
484: }
485:
486: return $this->ExitStatus;
487: }
488:
489: /**
490: * Terminate the process if it is still running
491: *
492: * @return $this
493: */
494: public function stop(float $timeout = 10)
495: {
496: $this->assertHasRun();
497:
498: // Work around issue where processes do not receive signals immediately
499: // after launch on some platforms (e.g. macOS + PHP 8.2.18)
500: $this->awaitInterval($this->StartTime, self::POLL_INTERVAL);
501:
502: if (!$this->updateStatus()->isRunning()) {
503: // @codeCoverageIgnoreStart
504: return $this;
505: // @codeCoverageIgnoreEnd
506: }
507:
508: try {
509: // Send SIGTERM first
510: $this->doStop(self::SIGTERM);
511: if ($this->waitForStop($this->LastStopTime + $timeout * 1000000000)) {
512: return $this;
513: }
514:
515: // If the process doesn't stop, fall back to SIGKILL
516: $this->doStop(self::SIGKILL);
517: if ($this->waitForStop($this->LastStopTime + 1000000000)) {
518: return $this;
519: }
520: } catch (ProcessException $ex) {
521: // Ignore the exception if the process is no longer running
522: if (!$this->updateStatus()->isRunning()) {
523: return $this;
524: }
525: throw $ex;
526: }
527:
528: throw new ProcessException(sprintf(
529: 'Process could not be stopped: %s',
530: Get::code($this->Command),
531: ));
532: }
533:
534: /**
535: * Check if the process is running
536: *
537: * @phpstan-impure
538: *
539: * @phpstan-assert-if-true !null $this->Process
540: */
541: public function isRunning(): bool
542: {
543: return
544: $this->State === self::RUNNING
545: && $this->maybeUpdateStatus()->State === self::RUNNING;
546: }
547:
548: /**
549: * Check if the process ran and terminated
550: */
551: public function isTerminated(): bool
552: {
553: return
554: $this->State === self::TERMINATED
555: || $this->maybeUpdateStatus()->State === self::TERMINATED;
556: }
557:
558: /**
559: * Check if the process ran and was terminated by a signal
560: */
561: public function isTerminatedBySignal(): bool
562: {
563: return $this->isTerminated()
564: && $this->ProcessStatus['signaled'];
565: }
566:
567: /**
568: * Get the command passed to proc_open() to spawn the process
569: *
570: * @return string[]|string
571: */
572: public function getCommand()
573: {
574: return $this->Command;
575: }
576:
577: /**
578: * Get the process ID of the spawned process
579: *
580: * @throws LogicException if the process has not run.
581: */
582: public function getPid(): int
583: {
584: $this->assertHasRun();
585:
586: return $this->Pid;
587: }
588:
589: /**
590: * Get output written to STDOUT or STDERR by the process since it started
591: *
592: * @param FileDescriptor::OUT|FileDescriptor::ERR $fd
593: * @throws LogicException if the process has not run or if output
594: * collection is disabled.
595: */
596: public function getOutput(int $fd = FileDescriptor::OUT): string
597: {
598: return $this->doGetOutput($fd, false, false);
599: }
600:
601: /**
602: * Get output written to STDOUT or STDERR by the process since it was last
603: * read
604: *
605: * @param FileDescriptor::OUT|FileDescriptor::ERR $fd
606: * @throws LogicException if the process has not run or if output
607: * collection is disabled.
608: */
609: public function getNewOutput(int $fd = FileDescriptor::OUT): string
610: {
611: return $this->doGetOutput($fd, false, true);
612: }
613:
614: /**
615: * Get text written to STDOUT or STDERR by the process since it started
616: *
617: * @param FileDescriptor::OUT|FileDescriptor::ERR $fd
618: * @throws LogicException if the process has not run or if output
619: * collection is disabled.
620: */
621: public function getText(int $fd = FileDescriptor::OUT): string
622: {
623: return $this->doGetOutput($fd, true, false);
624: }
625:
626: /**
627: * Get text written to STDOUT or STDERR by the process since it was last
628: * read
629: *
630: * @param FileDescriptor::OUT|FileDescriptor::ERR $fd
631: * @throws LogicException if the process has not run or if output
632: * collection is disabled.
633: */
634: public function getNewText(int $fd = FileDescriptor::OUT): string
635: {
636: return $this->doGetOutput($fd, true, true);
637: }
638:
639: /**
640: * @param FileDescriptor::OUT|FileDescriptor::ERR $fd
641: */
642: private function doGetOutput(int $fd, bool $text, bool $new): string
643: {
644: $this->assertHasRun();
645:
646: if (!$this->Output) {
647: throw new LogicException('Output not collected');
648: }
649:
650: $stream = $this->updateStatus()->Output[$fd];
651: $offset = $new
652: ? $this->OutputPos[$fd]
653: : ($this->UseOutputFiles
654: ? $this->OutputFilePos[$fd]
655: : 0);
656: $output = File::getContents($stream, $offset);
657: /** @var int<0,max> */
658: $pos = File::tell($stream);
659: $this->OutputPos[$fd] = $pos;
660: return $text
661: ? Str::trimNativeEol($output)
662: : $output;
663: }
664:
665: /**
666: * Forget output written by the process
667: *
668: * @return $this
669: */
670: public function clearOutput()
671: {
672: if (!$this->Output) {
673: return $this;
674: }
675:
676: foreach ([FileDescriptor::OUT, FileDescriptor::ERR] as $fd) {
677: $stream = $this->Output[$fd];
678: if ($this->UseOutputFiles) {
679: /** @var int<0,max> */
680: $pos = File::tell($stream);
681: $this->OutputFilePos[$fd] = $pos;
682: } else {
683: File::truncate($stream);
684: }
685: $this->OutputPos[$fd] = 0;
686: }
687:
688: return $this;
689: }
690:
691: /**
692: * Get the exit status of the process
693: *
694: * @throws ProcessException if the process has not terminated.
695: */
696: public function getExitStatus(): int
697: {
698: $this->assertHasTerminated();
699:
700: return $this->ExitStatus;
701: }
702:
703: /**
704: * Get process statistics
705: *
706: * @return array{start_time:float,spawn_interval:float,poll_time:float,poll_count:int,read_time:float,read_count:int,stop_time:float,stop_count:int}
707: */
708: public function getStats(): array
709: {
710: $this->assertHasRun();
711:
712: return $this->Stats;
713: }
714:
715: private function reset(): void
716: {
717: $this->OutputCallback = null;
718: unset($this->OutputFilePos);
719: $this->StartTime = null;
720: $this->Process = null;
721: $this->Stopped = false;
722: unset($this->Pipes);
723: unset($this->ProcessStatus);
724: unset($this->Pid);
725: unset($this->ExitStatus);
726: $this->LastPollTime = null;
727: $this->LastReadTime = null;
728: $this->LastStopTime = null;
729: $this->Output = [];
730: $this->OutputPos = [];
731: $this->Stats = self::DEFAULT_STATS;
732: }
733:
734: /**
735: * @return $this
736: */
737: private function maybeUpdateStatus()
738: {
739: if (!$this->checkInterval($this->LastPollTime, self::POLL_INTERVAL)) {
740: return $this;
741: }
742: return $this->updateStatus();
743: }
744:
745: /**
746: * @return $this
747: */
748: private function updateStatus(bool $read = true, bool $wait = false)
749: {
750: if ($this->Process === null) {
751: return $this;
752: }
753:
754: $now = hrtime(true);
755:
756: $this->ProcessStatus = $this->throwOnFailure(
757: @proc_get_status($this->Process),
758: 'Error getting process status: %s',
759: );
760:
761: $this->LastPollTime = $now;
762: $this->Stats['poll_time'] = $now / 1000;
763: $this->Stats['poll_count']++;
764:
765: $running = $this->ProcessStatus['running'];
766:
767: if ($read || !$running) {
768: $this->read($running && $wait, !$running);
769: }
770:
771: if (!$running) {
772: // In the unlikely event that any pipes remain open, close them
773: // before closing the process
774: if ($this->Pipes) {
775: // @codeCoverageIgnoreStart
776: $this->closeStreams($this->Pipes);
777: // @codeCoverageIgnoreEnd
778: }
779:
780: if (is_resource($this->Process)) {
781: // The return value of `proc_close()` is not reliable, so ignore
782: // it and use `error_get_last()` to check for errors
783: error_clear_last();
784: @proc_close($this->Process);
785: $error = error_get_last();
786: if ($error !== null) {
787: // @codeCoverageIgnoreStart
788: $this->throw('Error closing process: %s', $error);
789: // @codeCoverageIgnoreEnd
790: }
791: }
792:
793: $this->Process = null;
794: $this->State = self::TERMINATED;
795: $this->ExitStatus = $this->ProcessStatus['exitcode'];
796:
797: if (
798: $this->ExitStatus === -1
799: && $this->ProcessStatus['signaled']
800: && $this->ProcessStatus['termsig'] > 0
801: ) {
802: $this->ExitStatus = 128 + $this->ProcessStatus['termsig'];
803: }
804: }
805:
806: return $this;
807: }
808:
809: private function read(bool $wait = true, bool $close = false): void
810: {
811: if (!$this->Pipes) {
812: return;
813: }
814:
815: $now = hrtime(true);
816: $read = $this->Pipes;
817:
818: if ($this->UseOutputFiles) {
819: if ($wait) {
820: $interval = $this->Usec === 0
821: ? self::READ_INTERVAL
822: : $this->Usec;
823: $this->awaitInterval($this->LastReadTime, $interval);
824: }
825: } else {
826: $write = null;
827: $except = null;
828: $sec = $wait ? $this->Sec : 0;
829: $usec = $wait ? $this->Usec : 0;
830:
831: $this->throwOnFailure(
832: @stream_select($read, $write, $except, $sec, $usec),
833: 'Error checking for process output: %s',
834: );
835: }
836:
837: foreach ($read as $i => $pipe) {
838: $data = $this->throwOnFailure(
839: @stream_get_contents($pipe),
840: 'Error reading process output: %s',
841: );
842:
843: if ($data !== '') {
844: if ($this->CollectOutput && !$this->UseOutputFiles) {
845: File::write($this->Output[$i], $data);
846: }
847: if ($this->OutputCallback) {
848: ($this->OutputCallback)($i, $data);
849: }
850: }
851:
852: error_clear_last();
853: if ((!$this->UseOutputFiles || $close) && @feof($pipe)) {
854: $error = error_get_last();
855: if ($error !== null) {
856: // @codeCoverageIgnoreStart
857: $this->throw('Error reading process output: %s', $error);
858: // @codeCoverageIgnoreEnd
859: }
860: File::close($pipe);
861: unset($this->Pipes[$i]);
862: }
863: }
864:
865: $this->LastReadTime = $now;
866: $this->Stats['read_time'] = $now / 1000;
867: $this->Stats['read_count']++;
868: }
869:
870: /**
871: * @return $this
872: * @throws ProcessTimedOutException if the process timed out.
873: */
874: private function checkTimeout()
875: {
876: if (
877: $this->State !== self::RUNNING
878: || $this->Timeout === null
879: || $this->Timeout > (hrtime(true) - $this->StartTime) / 1000000000
880: ) {
881: return $this;
882: }
883:
884: try {
885: $this->stop();
886: // @codeCoverageIgnoreStart
887: } catch (Throwable $ex) {
888: throw new ProcessException(sprintf(
889: 'Error terminating process that timed out after %.3fs: %s',
890: $this->Timeout,
891: Get::code($this->Command),
892: ), $ex);
893: // @codeCoverageIgnoreEnd
894: }
895:
896: throw new ProcessTimedOutException(sprintf(
897: 'Process timed out after %.3fs: %s',
898: $this->Timeout,
899: Get::code($this->Command),
900: ));
901: }
902:
903: /**
904: * Wait until at least $interval microseconds have passed since the given
905: * time
906: *
907: * @param int|float|null $time
908: * @return $this
909: */
910: private function awaitInterval($time, int $interval)
911: {
912: if ($time === null) {
913: return $this;
914: }
915: $now = hrtime(true);
916: $usec = (int) ($interval - ($now - $time) / 1000);
917: if ($usec > 0) {
918: usleep($usec);
919: }
920: return $this;
921: }
922:
923: /**
924: * Check if at least $interval microseconds have passed since the given time
925: *
926: * @param int|float|null $time
927: */
928: private function checkInterval($time, int $interval): bool
929: {
930: if ($time === null) {
931: return true;
932: }
933: $now = hrtime(true);
934: return (int) ($interval - ($now - $time) / 1000) <= 0;
935: }
936:
937: private function doStop(int $signal): void
938: {
939: if ($this->Process === null) {
940: return;
941: }
942:
943: $now = hrtime(true);
944:
945: $this->throwOnFailure(
946: @proc_terminate($this->Process, $signal),
947: 'Error terminating process: %s',
948: );
949:
950: $this->LastStopTime = $now;
951: $this->Stats['stop_count']++;
952: if (!$this->Stopped) {
953: $this->Stats['stop_time'] = $now / 1000;
954: $this->Stopped = true;
955: }
956: }
957:
958: private function waitForStop(float $until): bool
959: {
960: do {
961: usleep(self::POLL_INTERVAL);
962: if (!$this->isRunning()) {
963: return true;
964: }
965: } while (hrtime(true) < $until);
966:
967: return false;
968: }
969:
970: /**
971: * @param array<FileDescriptor::*,resource> $streams
972: */
973: private function closeStreams(array &$streams): void
974: {
975: foreach ($streams as $stream) {
976: if (is_resource($stream)) {
977: File::close($stream);
978: }
979: }
980: $streams = [];
981: }
982:
983: private function assertIsNotRunning(): void
984: {
985: if ($this->State === self::RUNNING) {
986: throw new LogicException('Process is running');
987: }
988: }
989:
990: private function assertHasRun(): void
991: {
992: if ($this->State === self::READY) {
993: throw new LogicException('Process has not run');
994: }
995: }
996:
997: private function assertHasTerminated(bool $runtime = false): void
998: {
999: if ($this->State !== self::TERMINATED) {
1000: $exception = $runtime
1001: ? RuntimeException::class
1002: : LogicException::class;
1003: throw new $exception('Process has not terminated');
1004: }
1005: }
1006:
1007: /**
1008: * @param resource|string|null $input
1009: */
1010: private function applyInput($input): void
1011: {
1012: $this->Input = $input === null || is_string($input)
1013: ? Str::toStream((string) $input)
1014: : File::getSeekableStream($input);
1015: $this->RewindInput = true;
1016: }
1017:
1018: private function applyTimeout(?float $timeout): void
1019: {
1020: if ($timeout !== null && $timeout <= 0) {
1021: throw new InvalidArgumentException(
1022: sprintf('Invalid timeout: %.3fs', $timeout)
1023: );
1024: }
1025:
1026: $this->Timeout = $timeout;
1027: [$this->Sec, $this->Usec] = $timeout === null
1028: ? [null, 0]
1029: : [0, min((int) ($timeout * 1000000), self::READ_INTERVAL)];
1030: }
1031:
1032: /**
1033: * @template T
1034: *
1035: * @param T $result
1036: * @return (T is false ? never : T)
1037: * @phpstan-param T|false $result
1038: * @phpstan-return ($result is false ? never : T)
1039: */
1040: private function throwOnFailure($result, string $message)
1041: {
1042: if ($result === false) {
1043: $this->throw($message);
1044: }
1045: return $result;
1046: }
1047:
1048: /**
1049: * @param array{message:string,...}|null $error
1050: * @return never
1051: */
1052: private function throw(string $message, ?array $error = null): void
1053: {
1054: $error ??= error_get_last();
1055: if ($error !== null) {
1056: throw new ProcessException($error['message']);
1057: }
1058:
1059: // @codeCoverageIgnoreStart
1060: throw new ProcessException(
1061: sprintf($message, Get::code($this->Command))
1062: );
1063: // @codeCoverageIgnoreEnd
1064: }
1065: }
1066: