1: <?php declare(strict_types=1);
2:
3: namespace Salient\Core;
4:
5: use Salient\Contract\HasFileDescriptor;
6: use Salient\Core\Exception\ProcessDidNotTerminateException;
7: use Salient\Core\Exception\ProcessException;
8: use Salient\Core\Exception\ProcessFailedException;
9: use Salient\Core\Exception\ProcessTerminatedBySignalException;
10: use Salient\Core\Exception\ProcessTimedOutException;
11: use Salient\Utility\Arr;
12: use Salient\Utility\File;
13: use Salient\Utility\Str;
14: use Salient\Utility\Sys;
15: use Closure;
16: use InvalidArgumentException;
17: use LogicException;
18: use Throwable;
19:
20: /**
21: * @api
22: */
23: final class Process implements HasFileDescriptor
24: {
25: private const READY = 0;
26: private const RUNNING = 1;
27: private const TERMINATED = 2;
28:
29: /**
30: * Microseconds to wait between process status checks
31: */
32: private const POLL_INTERVAL = 10000;
33:
34: /**
35: * Microseconds to wait for stream activity (upper limit)
36: */
37: private const READ_INTERVAL = 200000;
38:
39: private const DEFAULT_OPTIONS = [
40: 'suppress_errors' => true,
41: 'bypass_shell' => true,
42: ];
43:
44: /**
45: * @var array{start_time:float,spawn_interval:float,poll_time:float,poll_count:int,read_time:float,read_count:int,stop_time:float,stop_count:int}
46: */
47: private const DEFAULT_STATS = [
48: 'start_time' => 0.0,
49: 'spawn_interval' => 0.0,
50: 'poll_time' => 0.0,
51: 'poll_count' => 0,
52: 'read_time' => 0.0,
53: 'read_count' => 0,
54: 'stop_time' => 0.0,
55: 'stop_count' => 0,
56: ];
57:
58: private const SIGTERM = 15;
59: private const SIGKILL = 9;
60:
61: /** @var list<string>|string */
62: private $Command;
63: /** @var resource */
64: private $Input;
65: private bool $RewindOnStart;
66: /** @var (Closure(self::STDOUT|self::STDERR, string): mixed)|null */
67: private ?Closure $Callback;
68: private ?string $Cwd;
69: /** @var array<string,string>|null */
70: private ?array $Env;
71: private ?float $Timeout;
72: private ?int $Sec;
73: private int $Usec;
74: private bool $CollectOutput;
75: /** @readonly */
76: private bool $UseOutputFiles;
77: /** @var array<string,bool>|null */
78: private ?array $Options = null;
79:
80: // --
81:
82: private int $State = self::READY;
83: /** @var (Closure(self::STDOUT|self::STDERR, string): mixed)|null */
84: private ?Closure $CurrentCallback = null;
85: private ?string $OutputDir = null;
86: /** @var array<self::STDOUT|self::STDERR,resource> */
87: private array $OutputFiles;
88: /** @var array<self::STDOUT|self::STDERR,int<0,max>> */
89: private array $OutputFilePos;
90: /** @var int|float|null */
91: private $StartTime = null;
92: /** @var resource|null */
93: private $Process = null;
94: private bool $Stopped = false;
95: /** @var array<self::STDOUT|self::STDERR,resource> */
96: private array $Pipes;
97: /** @var array{command:string,pid:int,running:bool,signaled:bool,stopped:bool,exitcode:int,termsig:int,stopsig:int} */
98: private array $ProcessStatus;
99: private int $Pid;
100: private int $ExitStatus;
101: /** @var int|float|null */
102: private $LastPollTime = null;
103: /** @var int|float|null */
104: private $LastReadTime = null;
105: /** @var array<self::STDOUT|self::STDERR,resource> */
106: private array $Output = [];
107: /** @var array<self::STDOUT|self::STDERR,int<0,max>> */
108: private array $OutputPos = [];
109: /** @var array{start_time:float,spawn_interval:float,poll_time:float,poll_count:int,read_time:float,read_count:int,stop_time:float,stop_count:int} */
110: private array $Stats = self::DEFAULT_STATS;
111:
112: /**
113: * @api
114: *
115: * @param list<string> $command
116: * @param resource|string|null $input Copied to a seekable stream if not
117: * already seekable, then rewound before each run.
118: * @param (Closure(Process::STDOUT|Process::STDERR $fd, string $output): mixed)|null $callback
119: * @param array<string,string>|null $env
120: */
121: public function __construct(
122: array $command,
123: $input = null,
124: ?Closure $callback = null,
125: ?string $cwd = null,
126: ?array $env = null,
127: ?float $timeout = null,
128: bool $collectOutput = true,
129: bool $useOutputFiles = false
130: ) {
131: $this->Command = $command;
132: $this->Callback = $callback;
133: $this->Cwd = $cwd;
134: $this->Env = $env;
135: $this->CollectOutput = $collectOutput;
136: $this->UseOutputFiles = $useOutputFiles || Sys::isWindows();
137: $this->applyInput($input);
138: $this->applyTimeout($timeout);
139: }
140:
141: /**
142: * Get a new process for a shell command
143: *
144: * @param resource|string|null $input Copied to a seekable stream if not
145: * already seekable, then rewound before each run.
146: * @param (Closure(Process::STDOUT|Process::STDERR $fd, string $output): mixed)|null $callback
147: * @param array<string,string>|null $env
148: */
149: public static function withShellCommand(
150: string $command,
151: $input = null,
152: ?Closure $callback = null,
153: ?string $cwd = null,
154: ?array $env = null,
155: ?float $timeout = null,
156: bool $collectOutput = true,
157: bool $useOutputFiles = false
158: ): self {
159: $process = new self([], $input, $callback, $cwd, $env, $timeout, $collectOutput, $useOutputFiles);
160: $process->Command = $command;
161: $process->Options = Arr::unset(self::DEFAULT_OPTIONS, 'bypass_shell');
162: return $process;
163: }
164:
165: /**
166: * @internal
167: */
168: public function __destruct()
169: {
170: if ($this->updateStatus()->isRunning()) {
171: $this->stop();
172: }
173: if ($this->UseOutputFiles) {
174: if ($this->OutputFiles ?? null) {
175: $this->closeStreams($this->OutputFiles);
176: }
177: if ($this->OutputDir !== null && is_dir($this->OutputDir)) {
178: File::pruneDir($this->OutputDir, true);
179: }
180: }
181: }
182:
183: /**
184: * Set the input passed to the process
185: *
186: * @param resource|string|null $input Copied to a seekable stream if not
187: * already seekable, then rewound before each run.
188: * @return $this
189: * @throws LogicException if the process is running.
190: */
191: public function setInput($input): self
192: {
193: $this->assertIsNotRunning();
194: $this->applyInput($input);
195: return $this;
196: }
197:
198: /**
199: * @param resource|string|null $input
200: */
201: private function applyInput($input): void
202: {
203: $this->Input = $input === null || is_string($input)
204: ? Str::toStream((string) $input)
205: : File::getSeekableStream($input);
206: $this->RewindOnStart = true;
207: }
208:
209: /**
210: * Pass input directly to the process
211: *
212: * @param resource $input
213: * @return $this
214: * @throws LogicException if the process is running.
215: */
216: public function pipeInput($input): self
217: {
218: $this->assertIsNotRunning();
219: $this->Input = $input;
220: $this->RewindOnStart = false;
221: return $this;
222: }
223:
224: /**
225: * Set the callback that receives output from the process
226: *
227: * @param (Closure(Process::STDOUT|Process::STDERR $fd, string $output): mixed)|null $callback
228: * @return $this
229: * @throws LogicException if the process is running.
230: */
231: public function setCallback(?Closure $callback): self
232: {
233: $this->assertIsNotRunning();
234: $this->Callback = $callback;
235: return $this;
236: }
237:
238: /**
239: * Set the initial working directory of the process
240: *
241: * @return $this
242: * @throws LogicException if the process is running.
243: */
244: public function setCwd(?string $cwd): self
245: {
246: $this->assertIsNotRunning();
247: $this->Cwd = $cwd;
248: return $this;
249: }
250:
251: /**
252: * Set the environment of the process
253: *
254: * @param array<string,string>|null $env
255: * @return $this
256: * @throws LogicException if the process is running.
257: */
258: public function setEnv(?array $env): self
259: {
260: $this->assertIsNotRunning();
261: $this->Env = $env;
262: return $this;
263: }
264:
265: /**
266: * Set the maximum number of seconds to allow the process to run
267: *
268: * @return $this
269: * @throws LogicException if the process is running.
270: */
271: public function setTimeout(?float $timeout): self
272: {
273: $this->assertIsNotRunning();
274: $this->applyTimeout($timeout);
275: return $this;
276: }
277:
278: private function applyTimeout(?float $timeout): void
279: {
280: if ($timeout !== null && $timeout <= 0) {
281: throw new InvalidArgumentException(sprintf(
282: 'Invalid timeout: %.3fs',
283: $timeout,
284: ));
285: }
286:
287: $this->Timeout = $timeout;
288: [$this->Sec, $this->Usec] = $timeout === null
289: ? [null, 0]
290: : [0, min((int) ($timeout * 1000000), self::READ_INTERVAL)];
291: }
292:
293: /**
294: * Disable collection of output written to STDOUT and STDERR by the process
295: *
296: * @return $this
297: * @throws LogicException if the process is running.
298: */
299: public function disableOutputCollection(): self
300: {
301: $this->assertIsNotRunning();
302: $this->CollectOutput = false;
303: return $this;
304: }
305:
306: /**
307: * Enable collection of output written to STDOUT and STDERR by the process
308: *
309: * @return $this
310: * @throws LogicException if the process is running.
311: */
312: public function enableOutputCollection(): self
313: {
314: $this->assertIsNotRunning();
315: $this->CollectOutput = true;
316: return $this;
317: }
318:
319: /**
320: * Run the process and throw an exception if its exit status is non-zero
321: *
322: * @param (Closure(Process::STDOUT|Process::STDERR $fd, string $output): mixed)|null $callback
323: * @return $this
324: * @throws LogicException if the process is running.
325: * @throws ProcessTimedOutException if the process times out.
326: * @throws ProcessTerminatedBySignalException if the process is terminated
327: * by an uncaught signal.
328: * @throws ProcessFailedException if the process returns a non-zero exit
329: * status.
330: */
331: public function runWithoutFail(?Closure $callback = null): self
332: {
333: if ($this->run($callback) !== 0) {
334: throw new ProcessFailedException(
335: 'Process failed with exit status %d: %s',
336: [$this->ExitStatus, $this],
337: );
338: }
339: return $this;
340: }
341:
342: /**
343: * Run the process and return its exit status
344: *
345: * @param (Closure(Process::STDOUT|Process::STDERR $fd, string $output): mixed)|null $callback
346: * @throws LogicException if the process is running.
347: * @throws ProcessTimedOutException if the process times out.
348: * @throws ProcessTerminatedBySignalException if the process is terminated
349: * by an uncaught signal.
350: */
351: public function run(?Closure $callback = null): int
352: {
353: return $this->start($callback)->wait();
354: }
355:
356: /**
357: * Start the process in the background
358: *
359: * @param (Closure(Process::STDOUT|Process::STDERR $fd, string $output): mixed)|null $callback
360: * @return $this
361: * @throws LogicException if the process is running.
362: * @throws ProcessTerminatedBySignalException if the process is terminated
363: * by an uncaught signal.
364: */
365: public function start(?Closure $callback = null): self
366: {
367: $this->assertIsNotRunning();
368:
369: $this->reset();
370: $this->CurrentCallback = $callback ?? $this->Callback;
371:
372: if ($this->RewindOnStart) {
373: File::rewind($this->Input);
374: }
375: $descriptors = [self::STDIN => $this->Input];
376: $handles = [];
377:
378: if ($this->UseOutputFiles) {
379: // Use files in a temporary directory to collect output (necessary
380: // on Windows, where `proc_open()` blocks until the process exits if
381: // standard output pipes are used, and useful in scenarios where
382: // polling for output would be inefficient)
383: $this->OutputDir ??= File::createTempDir();
384: foreach ([self::STDOUT, self::STDERR] as $fd) {
385: $file = $this->OutputDir . '/' . $fd;
386: $descriptors[$fd] = ['file', $file, 'w'];
387: $stream = $this->OutputFiles[$fd] ?? null;
388: // Create output files before the first run, and truncate them
389: // before subsequent runs
390: if (!$stream) {
391: $stream = File::open($file, 'w+');
392: $this->OutputFiles[$fd] = $stream;
393: } else {
394: File::truncate($stream, 0, $file);
395: }
396: if ($this->CollectOutput) {
397: $this->Output[$fd] = $stream;
398: $this->OutputPos[$fd] = 0;
399: $this->OutputFilePos[$fd] = 0;
400: }
401: // Tail output files separately
402: $handles[$fd] = File::open($file, 'r');
403: }
404: } else {
405: $descriptors += [
406: self::STDOUT => ['pipe', 'w'],
407: self::STDERR => ['pipe', 'w'],
408: ];
409: if ($this->CollectOutput) {
410: $this->Output = [
411: self::STDOUT => File::open('php://temp', 'a+'),
412: self::STDERR => File::open('php://temp', 'a+'),
413: ];
414: $this->OutputPos = [
415: self::STDOUT => 0,
416: self::STDERR => 0,
417: ];
418: }
419: }
420:
421: $this->StartTime = hrtime(true);
422: $this->Process = $this->check(
423: @proc_open(
424: $this->Command,
425: $descriptors,
426: $pipes,
427: $this->Cwd,
428: $this->Env,
429: $this->Options ?? self::DEFAULT_OPTIONS,
430: ),
431: 'Error starting process: %s',
432: );
433:
434: $now = hrtime(true);
435: $this->Stats['start_time'] = $this->StartTime / 1000;
436: $this->Stats['spawn_interval'] = ($now - $this->StartTime) / 1000;
437:
438: $pipes += $handles;
439: foreach ($pipes as $pipe) {
440: @stream_set_blocking($pipe, false);
441: }
442: $this->Pipes = $pipes;
443: $this->State = self::RUNNING;
444: $this->updateStatus();
445: $this->Pid = $this->ProcessStatus['pid'];
446: return $this;
447: }
448:
449: /**
450: * Wait for the process to exit and return its exit status
451: *
452: * @throws LogicException if the process has not run.
453: * @throws ProcessTimedOutException if the process times out.
454: * @throws ProcessTerminatedBySignalException if the process is terminated
455: * by an uncaught signal.
456: */
457: public function wait(): int
458: {
459: $this->assertHasRun();
460:
461: while ($this->Pipes) {
462: $this->checkTimeout();
463: $this->read();
464: $this->updateStatus(false);
465: }
466:
467: while ($this->isRunning()) {
468: $this->checkTimeout();
469: usleep(self::POLL_INTERVAL);
470: }
471:
472: return $this->ExitStatus;
473: }
474:
475: /**
476: * Check for output written by the process and update its status
477: *
478: * If fewer than {@see Process::POLL_INTERVAL} microseconds have passed
479: * since the process was last polled, a delay is inserted to minimise CPU
480: * usage.
481: *
482: * @return $this
483: * @throws LogicException if the process has not run.
484: * @throws ProcessTimedOutException if the process times out.
485: * @throws ProcessTerminatedBySignalException if the process is terminated
486: * by an uncaught signal.
487: */
488: public function poll(bool $now = false): self
489: {
490: $this->assertHasRun();
491:
492: $this->checkTimeout();
493: if (!$now) {
494: $this->awaitInterval($this->LastPollTime, self::POLL_INTERVAL);
495: }
496: $this->updateStatus();
497:
498: return $this;
499: }
500:
501: /**
502: * Terminate the process if it is still running
503: *
504: * @return $this
505: * @throws LogicException if the process has not run.
506: * @throws ProcessTerminatedBySignalException if the process is terminated
507: * by an uncaught signal.
508: * @throws ProcessDidNotTerminateException if the process does not
509: * terminate.
510: */
511: public function stop(float $timeout = 10): self
512: {
513: $this->assertHasRun();
514:
515: // Work around issue where processes don't receive signals immediately
516: // after launch
517: $this->awaitInterval($this->StartTime, self::POLL_INTERVAL);
518:
519: if (!$this->updateStatus()->isRunning()) {
520: return $this;
521: }
522:
523: try {
524: // Send SIGTERM first, then SIGKILL if the process is still running
525: // after `$timeout` seconds
526: if (
527: $this->doStop(self::SIGTERM, $timeout)
528: || $this->doStop(self::SIGKILL, 1)
529: ) {
530: return $this;
531: }
532: } catch (ProcessException $ex) {
533: // Ignore the exception if the process is no longer running
534: // @phpstan-ignore booleanNot.alwaysFalse
535: if (!$this->updateStatus()->isRunning()) {
536: return $this;
537: }
538: throw $ex;
539: }
540:
541: throw new ProcessDidNotTerminateException(
542: 'Process did not terminate: %s',
543: [$this],
544: );
545: }
546:
547: private function doStop(int $signal, float $timeout): bool
548: {
549: if (!$this->Process) {
550: return true;
551: }
552:
553: $now = hrtime(true);
554: $this->check(
555: @proc_terminate($this->Process, $signal),
556: 'Error terminating process: %s',
557: );
558: $this->Stats['stop_count']++;
559: if (!$this->Stopped) {
560: $this->Stats['stop_time'] = $now / 1000;
561: $this->Stopped = true;
562: }
563:
564: $until = $now + $timeout * 1000000000;
565: do {
566: usleep(self::POLL_INTERVAL);
567: if (!$this->isRunning()) {
568: return true;
569: }
570: } while (hrtime(true) < $until);
571:
572: return false;
573: }
574:
575: /**
576: * Check if the process is running
577: */
578: public function isRunning(): bool
579: {
580: return $this->State === self::RUNNING
581: && $this->maybeUpdateStatus()->State === self::RUNNING;
582: }
583:
584: /**
585: * Check if the process ran and terminated
586: */
587: public function isTerminated(): bool
588: {
589: return $this->State === self::TERMINATED
590: || $this->maybeUpdateStatus()->State === self::TERMINATED;
591: }
592:
593: /**
594: * Get the command spawned by the process
595: *
596: * @return list<string>|string
597: */
598: public function getCommand()
599: {
600: return $this->Command;
601: }
602:
603: /**
604: * Get the process ID of the command spawned by the process
605: *
606: * @throws LogicException if the process has not run.
607: */
608: public function getPid(): int
609: {
610: $this->assertHasRun();
611: return $this->Pid;
612: }
613:
614: /**
615: * Get output written to STDOUT or STDERR by the process
616: *
617: * @param Process::STDOUT|Process::STDERR $fd
618: * @throws LogicException if the process has not run or if output collection
619: * is disabled.
620: */
621: public function getOutput(int $fd = Process::STDOUT): string
622: {
623: return $this->doGetOutput($fd, false, false);
624: }
625:
626: /**
627: * Get output written to STDOUT or STDERR by the process since it was last
628: * read
629: *
630: * @param Process::STDOUT|Process::STDERR $fd
631: * @throws LogicException if the process has not run or if output collection
632: * is disabled.
633: */
634: public function getNewOutput(int $fd = Process::STDOUT): string
635: {
636: return $this->doGetOutput($fd, false, true);
637: }
638:
639: /**
640: * Get text written to STDOUT or STDERR by the process
641: *
642: * @param Process::STDOUT|Process::STDERR $fd
643: * @throws LogicException if the process has not run or if output collection
644: * is disabled.
645: */
646: public function getOutputAsText(int $fd = Process::STDOUT): string
647: {
648: return $this->doGetOutput($fd, true, false);
649: }
650:
651: /**
652: * Get text written to STDOUT or STDERR by the process since it was last
653: * read
654: *
655: * @param Process::STDOUT|Process::STDERR $fd
656: * @throws LogicException if the process has not run or if output collection
657: * is disabled.
658: */
659: public function getNewOutputAsText(int $fd = Process::STDOUT): string
660: {
661: return $this->doGetOutput($fd, true, true);
662: }
663:
664: /**
665: * @param self::STDOUT|self::STDERR $fd
666: */
667: private function doGetOutput(int $fd, bool $text, bool $new): string
668: {
669: $this->assertHasRun();
670:
671: if (!$this->Output) {
672: throw new LogicException('Output collection disabled');
673: }
674:
675: $stream = $this->updateStatus()->Output[$fd];
676: $offset = $new
677: ? $this->OutputPos[$fd]
678: : ($this->UseOutputFiles
679: ? $this->OutputFilePos[$fd]
680: : 0);
681: $output = File::getContents($stream, $offset);
682: /** @var int<0,max> */
683: $pos = File::tell($stream);
684: $this->OutputPos[$fd] = $pos;
685: return $text
686: ? Str::trimNativeEol($output)
687: : $output;
688: }
689:
690: /**
691: * Forget output written to STDOUT and STDERR by the process
692: *
693: * @return $this
694: */
695: public function clearOutput(): self
696: {
697: if (!$this->Output) {
698: return $this;
699: }
700:
701: foreach ([self::STDOUT, self::STDERR] as $fd) {
702: $stream = $this->Output[$fd];
703: if ($this->UseOutputFiles) {
704: /** @var int<0,max> */
705: $pos = File::tell($stream);
706: $this->OutputFilePos[$fd] = $pos;
707: } else {
708: File::truncate($stream);
709: }
710: $this->OutputPos[$fd] = 0;
711: }
712:
713: return $this;
714: }
715:
716: /**
717: * Get the exit status of the process
718: *
719: * @throws ProcessException if the process has not terminated.
720: */
721: public function getExitStatus(): int
722: {
723: $this->assertHasTerminated();
724: return $this->ExitStatus;
725: }
726:
727: /**
728: * Get process statistics
729: *
730: * @return array{start_time:float,spawn_interval:float,poll_time:float,poll_count:int,read_time:float,read_count:int,stop_time:float,stop_count:int}
731: * @throws LogicException if the process has not run.
732: */
733: public function getStats(): array
734: {
735: $this->assertHasRun();
736: return $this->Stats;
737: }
738:
739: /**
740: * @return $this
741: */
742: private function checkTimeout(): self
743: {
744: if (
745: $this->State !== self::RUNNING
746: || $this->Timeout === null
747: || $this->Timeout > (hrtime(true) - $this->StartTime) / 1000000000
748: ) {
749: return $this;
750: }
751:
752: try {
753: $this->stop();
754: } catch (Throwable $ex) {
755: throw new ProcessException(
756: 'Error terminating process that timed out after %.3fs: %s',
757: [$this->Timeout, $this],
758: $ex,
759: );
760: }
761:
762: throw new ProcessTimedOutException(
763: 'Process timed out after %.3fs: %s',
764: [$this->Timeout, $this],
765: );
766: }
767:
768: /**
769: * @return $this
770: */
771: private function maybeUpdateStatus(): self
772: {
773: return $this->checkInterval($this->LastPollTime, self::POLL_INTERVAL)
774: ? $this->updateStatus()
775: : $this;
776: }
777:
778: /**
779: * @return $this
780: */
781: private function updateStatus(bool $read = true): self
782: {
783: if (!$this->Process) {
784: return $this;
785: }
786:
787: $now = hrtime(true);
788: $process = $this->Process;
789: $this->ProcessStatus = $this->check(
790: @proc_get_status($process),
791: 'Error getting process status: %s',
792: );
793: $this->LastPollTime = $now;
794: $this->Stats['poll_time'] = $now / 1000;
795: $this->Stats['poll_count']++;
796:
797: $running = $this->ProcessStatus['running'];
798: if ($read || !$running) {
799: $this->read(false, !$running);
800: }
801: if (!$running) {
802: // Close any pipes left open by `$this->read()`
803: if ($this->Pipes) {
804: // @codeCoverageIgnoreStart
805: $this->closeStreams($this->Pipes);
806: // @codeCoverageIgnoreEnd
807: }
808:
809: // The return value of `proc_close()` is not reliable, so ignore it
810: // and use `error_get_last()` to check for errors
811: error_clear_last();
812: @proc_close($process);
813: if ($error = error_get_last()) {
814: // @codeCoverageIgnoreStart
815: $this->throw('Error closing process: %s', $error);
816: // @codeCoverageIgnoreEnd
817: }
818:
819: $this->ExitStatus = $this->ProcessStatus['exitcode'];
820: $this->State = self::TERMINATED;
821: $this->Process = null;
822: if (
823: $this->ExitStatus === -1
824: && $this->ProcessStatus['signaled']
825: && ($signal = $this->ProcessStatus['termsig']) > 0
826: ) {
827: $this->ExitStatus = 128 + $signal;
828: if (!$this->Stopped || ([
829: self::SIGTERM => false,
830: self::SIGKILL => false,
831: ][$signal] ?? true)) {
832: throw new ProcessTerminatedBySignalException(
833: 'Process terminated by signal %d: %s',
834: [$signal, $this],
835: );
836: }
837: }
838: }
839:
840: return $this;
841: }
842:
843: private function read(bool $wait = true, bool $closeAtEof = false): void
844: {
845: if (!$this->Pipes) {
846: return;
847: }
848:
849: $now = hrtime(true);
850: $read = $this->Pipes;
851: if ($this->UseOutputFiles) {
852: if ($wait) {
853: $usec = $this->Usec === 0
854: ? self::READ_INTERVAL
855: : $this->Usec;
856: $this->awaitInterval($this->LastReadTime, $usec);
857: }
858: } else {
859: $write = null;
860: $except = null;
861: $sec = $wait ? $this->Sec : 0;
862: $usec = $wait ? $this->Usec : 0;
863: File::select($read, $write, $except, $sec, $usec);
864: }
865: foreach ($read as $i => $pipe) {
866: $data = File::getContents($pipe);
867: if ($data !== '') {
868: if ($this->CollectOutput && !$this->UseOutputFiles) {
869: File::writeAll($this->Output[$i], $data);
870: }
871: if ($this->CurrentCallback) {
872: ($this->CurrentCallback)($i, $data);
873: }
874: }
875: if ((!$this->UseOutputFiles || $closeAtEof) && File::eof($pipe)) {
876: File::close($pipe);
877: unset($this->Pipes[$i]);
878: }
879: }
880:
881: $this->LastReadTime = $now;
882: $this->Stats['read_time'] = $now / 1000;
883: $this->Stats['read_count']++;
884: }
885:
886: /**
887: * @param resource[] $streams
888: * @param-out array{} $streams
889: */
890: private function closeStreams(array &$streams): void
891: {
892: foreach ($streams as $stream) {
893: File::close($stream);
894: }
895: $streams = [];
896: }
897:
898: private function reset(): void
899: {
900: $this->CurrentCallback = null;
901: unset($this->OutputFilePos);
902: $this->StartTime = null;
903: $this->Process = null;
904: $this->Stopped = false;
905: unset($this->Pipes);
906: unset($this->ProcessStatus);
907: unset($this->Pid);
908: unset($this->ExitStatus);
909: $this->LastPollTime = null;
910: $this->LastReadTime = null;
911: $this->Output = [];
912: $this->OutputPos = [];
913: $this->Stats = self::DEFAULT_STATS;
914: }
915:
916: /**
917: * Wait until at least $interval microseconds have passed since a given time
918: *
919: * @param int|float|null $time
920: * @return $this
921: */
922: private function awaitInterval($time, int $interval): self
923: {
924: if ($time === null) {
925: return $this;
926: }
927: $now = hrtime(true);
928: $usec = (int) ($interval - ($now - $time) / 1000);
929: if ($usec > 0) {
930: usleep($usec);
931: }
932: return $this;
933: }
934:
935: /**
936: * Check if at least $interval microseconds have passed since a given time
937: *
938: * @param int|float|null $time
939: */
940: private function checkInterval($time, int $interval): bool
941: {
942: if ($time === null) {
943: return true;
944: }
945: $now = hrtime(true);
946: return (int) ($interval - ($now - $time) / 1000) <= 0;
947: }
948:
949: /**
950: * @template T
951: *
952: * @param T|false $result
953: * @return ($result is false ? never : T)
954: */
955: private function check($result, string $message)
956: {
957: if ($result === false) {
958: $this->throw($message);
959: }
960: return $result;
961: }
962:
963: /**
964: * @param array{message:string,...}|null $error
965: * @return never
966: */
967: private function throw(string $message, ?array $error = null): void
968: {
969: $error ??= error_get_last();
970: if ($error) {
971: throw new ProcessException($error['message']);
972: }
973: // @codeCoverageIgnoreStart
974: throw new ProcessException(
975: $message,
976: [$this],
977: );
978: // @codeCoverageIgnoreEnd
979: }
980:
981: private function assertIsNotRunning(): void
982: {
983: if ($this->State === self::RUNNING) {
984: throw new LogicException('Process is running');
985: }
986: }
987:
988: private function assertHasRun(): void
989: {
990: if ($this->State === self::READY) {
991: throw new LogicException('Process has not run');
992: }
993: }
994:
995: private function assertHasTerminated(): void
996: {
997: if ($this->State !== self::TERMINATED) {
998: throw new LogicException('Process has not terminated');
999: }
1000: }
1001: }
1002: