Source code for autils.devel.process

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2013-2014
# Author: Lucas Meneghel Rodrigues <lmr@redhat.com>

# pylint: disable=C0302

"""Functions dedicated to find and run external commands."""

import contextlib
import errno
import glob
import logging
import os
import re
import select
import shlex
import signal
import subprocess
import threading
import time
from io import BytesIO, UnsupportedOperation

from autils.devel import astring
from autils.devel.wait import wait_for
from autils.file import path

LOG = logging.getLogger(__name__)

# variable=value bash assignment
_RE_BASH_SET_VARIABLE = re.compile(r"[a-zA-Z]\w*=.*")


[docs] class CmdError(Exception): """Exception raised when a command fails execution.""" def __init__( self, command=None, result=None, additional_text=None ): # pylint: disable=W0231 self.command = command self.result = result self.additional_text = additional_text def __str__(self): return ( f"Command '{self.command}' failed.\nstdout: " f"{self.result.stdout!r}\nstderr: " f"{self.result.stderr!r}\nadditional_info: " f"{self.additional_text}" )
[docs] class CmdInputError(Exception): """Raised when the command given is invalid, such as an empty command."""
[docs] def can_sudo(cmd=None): """Check whether sudo is available or if running as root. This function checks if the current process has the ability to run commands with elevated privileges. It first checks if the process is running as root (UID 0), then checks if sudo is installed and functional. :param cmd: Optional command to test sudo capabilities with. If provided, tests whether this specific command can be run with sudo. If not provided, tests basic sudo functionality. :type cmd: str or None :return: True if sudo is available or running as root, False otherwise. :rtype: bool Example:: >>> can_sudo() True >>> can_sudo("ls /root") True """ if not os.getuid(): # Root return True try: # Does sudo binary exists? path.find_command("sudo") except path.CmdNotFoundError: return False try: if cmd: # Am I able to run the cmd or plain sudo id? return not system(cmd, ignore_status=True, sudo=True) if getoutput("id -u", sudo=True).strip() == "0": return True return False except OSError: # Broken sudo binary return False
[docs] def get_capabilities(pid=None): """Gets a list of all capabilities for a process. In case the getpcaps command is not available, and empty list will be returned. It supports getpcaps' two different formats, the current and the so called legacy/ugly. :param pid: the process ID (PID), if one is not given, the current PID is used (given by :func:`os.getpid`) :type pid: int :returns: all capabilities :rtype: list """ if pid is None: pid = os.getpid() try: result = run(f"getpcaps {int(pid)}", ignore_status=True) except FileNotFoundError: return [] if result.exit_status: return [] if result.stderr_text.startswith("Capabilities "): info = result.stderr_text separator = "=" else: info = result.stdout_text separator = ":" return info.split(separator, 1)[1].strip().split(",")
[docs] def has_capability(capability, pid=None): """Check if a process has a given Linux capability. This is a simple wrapper around getpcaps, part of the libcap package. In case the getpcaps command is not available, the capability will be considered *not* to be available. :param capability: The name of the capability (e.g., "cap_sys_admin"). Refer to capabilities(7) man page for more information. Note: capability names are UPPERCASE in capabilities(7) (e.g., CAP_SYS_ADMIN) but must be lowercase in Python (e.g., "cap_sys_admin"). :type capability: str :param pid: The process ID to check. If None, checks the current process. :type pid: int or None :return: True if the capability is available, False otherwise. :rtype: bool Example:: >>> has_capability("cap_chown") True >>> has_capability("cap_sys_admin", pid=1234) False """ return capability in get_capabilities(pid)
[docs] def pid_exists(pid): """Check if a process with the given PID exists. This function uses os.kill with signal 0 to check if a process exists without actually sending a signal to it. :param pid: The process ID number to check. :type pid: int :return: True if the process exists, False otherwise. :rtype: bool Example:: >>> pid_exists(1) True >>> pid_exists(999999) False """ try: os.kill(pid, 0) except OSError as detail: if detail.errno == errno.ESRCH: return False return True
[docs] def safe_kill(pid, signal): # pylint: disable=W0621 """Attempt to send a signal to a process that may or may not exist. This function safely sends a signal to a process, handling cases where the process might not exist or require elevated privileges. :param pid: The process ID to send the signal to. :type pid: int :param signal: The signal number to send (e.g., signal.SIGTERM). :type signal: int :return: True if signal was sent successfully, False otherwise. :rtype: bool Example:: >>> safe_kill(1234, signal.SIGTERM) True """ if not get_owner_id(int(pid)): kill_cmd = f"kill -{int(signal)} {int(pid)}" try: run(kill_cmd, sudo=True) return True except CmdError: return False try: os.kill(pid, signal) return True except Exception: # pylint: disable=W0703 return False
[docs] def get_parent_pid(pid): """Get the parent process ID for a given process. This function reads the /proc filesystem to determine the parent PID. .. note:: This is currently Linux specific. :param pid: The PID of the child process. :type pid: int :return: The parent process ID. :rtype: int :raises IOError: If the /proc entry cannot be read. Example:: >>> get_parent_pid(1234) 1 """ with open(f"/proc/{int(pid)}/stat", "rb") as proc_stat: parent_pid = proc_stat.read().split(b" ")[-49] return int(parent_pid)
def _get_pid_from_proc_pid_stat(proc_path): match = re.match(r"\/proc\/([0-9]+)\/.*", proc_path) if match is not None: return int(match.group(1)) return None
[docs] def get_children_pids(parent_pid, recursive=False): """Get the list of child process IDs for a given parent process. This function scans the /proc filesystem to find all child processes of the specified parent PID. .. note:: This is currently Linux specific. :param parent_pid: The PID of the parent process. :type parent_pid: int :param recursive: If True, also returns grandchildren and all descendants. If False, only returns direct children. :type recursive: bool :return: List of child process IDs. :rtype: list of int Example:: >>> get_children_pids(1) [234, 456, 789] >>> get_children_pids(1, recursive=True) [234, 456, 789, 1011, 1213] """ proc_stats = glob.glob("/proc/[123456789]*/stat") children = [] for proc_stat in proc_stats: try: with open(proc_stat, "rb") as proc_stat_fp: this_parent_pid = int(proc_stat_fp.read().split(b" ")[-49]) except IOError: continue if this_parent_pid == parent_pid: children.append(_get_pid_from_proc_pid_stat(proc_stat)) if recursive: for child in children: children.extend(get_children_pids(child)) return children
[docs] def kill_process_tree(pid, sig=None, send_sigcont=True, timeout=0): """Signal a process and all of its children. If the process does not exist -- return. :param pid: The pid of the process to signal. :type pid: int :param sig: The signal to send to the processes, defaults to :data:`signal.SIGKILL` :type sig: int or None :param send_sigcont: Send SIGCONT to allow killing stopped processes. :type send_sigcont: bool :param timeout: How long to wait for the pid(s) to die (negative=infinity, 0=don't wait, positive=number_of_seconds). :type timeout: int or float :return: List of all PIDs we sent signal to. :rtype: list :raises RuntimeError: If timeout is reached waiting for processes to die. """ def _all_pids_dead(killed_pids): for pid in killed_pids: if pid_exists(pid): return False return True if sig is None: sig = signal.SIGKILL if timeout > 0: start = time.monotonic() if not safe_kill(pid, signal.SIGSTOP): return [pid] killed_pids = [pid] for child in get_children_pids(pid): killed_pids.extend(kill_process_tree(int(child), sig, False)) safe_kill(pid, sig) if send_sigcont: for killed_pid in killed_pids: safe_kill(killed_pid, signal.SIGCONT) if not timeout: return killed_pids if timeout > 0: if not wait_for( _all_pids_dead, timeout + start - time.monotonic(), step=0.01, args=(killed_pids[::-1],), ): raise RuntimeError( f"Timeout reached when waiting for pid {pid} " f"and children to die ({timeout})" ) else: while not _all_pids_dead(killed_pids[::-1]): time.sleep(0.01) return killed_pids
[docs] def kill_process_by_pattern(pattern): """Send SIGTERM signal to processes matching a pattern. This function uses the pkill command to terminate processes whose command line matches the given pattern. :param pattern: Pattern to match against process command lines. This is matched using pkill's -f flag, which matches against the full command line. :type pattern: str Example:: >>> kill_process_by_pattern("firefox") >>> kill_process_by_pattern("python.*test_script") """ cmd = f"pkill -f {pattern}" result = run(cmd, ignore_status=True) if result.exit_status: LOG.error("Failed to run '%s': %s", cmd, result) else: LOG.info("Succeed to run '%s'.", cmd)
[docs] def process_in_ptree_is_defunct(ppid): """Verify if any processes deriving from PPID are in the defunct state. Attempt to verify if parent process and any children from PPID is defunct (zombie) or not. This relies on the GNU version of "ps" and is not guaranteed to work in MacOS. :param ppid: The parent PID of the process to verify. :type ppid: int :return: True if any process in the tree is defunct, False otherwise. :rtype: bool """ defunct = False try: pids = get_children_pids(ppid) except CmdError: # Process doesn't exist return True for pid in pids: cmd = f"ps --no-headers -o cmd {int(pid)}" proc_name = system_output(cmd, ignore_status=True, verbose=False) if "<defunct>" in proc_name: defunct = True break return defunct
[docs] def binary_from_shell_cmd(cmd): """Extract the first binary path from a shell-like command string. This function parses a shell command and returns the first binary/executable found, skipping environment variable assignments. .. note:: This is a naive implementation that handles common patterns like environment variable assignments before the binary name. :param cmd: A shell-like command string to parse. :type cmd: str :return: The first binary/executable found in the command. :rtype: str :raises ValueError: If no binary can be extracted from the command. Example:: >>> binary_from_shell_cmd("binary") 'binary' >>> binary_from_shell_cmd("VAR=VAL binary -args") 'binary' >>> binary_from_shell_cmd("FOO=bar ./script.py") './script.py' """ cmds = shlex.split(cmd) for item in cmds: if not _RE_BASH_SET_VARIABLE.match(item): return item raise ValueError(f"Unable to parse first binary from '{cmd}'")
#: This is kept for compatibility purposes, but is now deprecated and #: will be removed in later versions. Please use :func:`shlex.split` #: instead. cmd_split = shlex.split
[docs] class CmdResult: """Command execution result. :param command: the command line itself :type command: str :param exit_status: exit code of the process :type exit_status: int :param stdout: content of the process stdout :type stdout: bytes :param stderr: content of the process stderr :type stderr: bytes :param duration: elapsed wall clock time running the process :type duration: float :param pid: ID of the process :type pid: int :param encoding: the encoding to use for the text version of stdout and stderr, by default :data:`autils.devel.astring.ENCODING` :type encoding: str """ # pylint: disable=R0913, R0902 def __init__( self, command="", stdout=b"", stderr=b"", exit_status=None, duration=0, pid=None, encoding=None, ): self.command = command self.exit_status = exit_status #: The raw stdout (bytes) self.stdout = stdout #: The raw stderr (bytes) self.stderr = stderr self.duration = duration self.interrupted = False self.pid = pid if encoding is None: encoding = astring.ENCODING self.encoding = encoding def __str__(self): return "\n".join( f"{key}: {getattr(self, key, 'MISSING')!r}" for key in ( "command", "exit_status", "duration", "interrupted", "pid", "encoding", "stdout", "stderr", ) ) @property def stdout_text(self): """Return stdout decoded as text. :return: The stdout content as a string. :rtype: str :raises TypeError: If stdout cannot be decoded. """ if hasattr(self.stdout, "decode"): return self.stdout.decode(self.encoding) if isinstance(self.stdout, str): return self.stdout raise TypeError("Unable to decode stdout into a string-like type") @property def stderr_text(self): """Return stderr decoded as text. :return: The stderr content as a string. :rtype: str :raises TypeError: If stderr cannot be decoded. """ if hasattr(self.stderr, "decode"): return self.stderr.decode(self.encoding) if isinstance(self.stderr, str): return self.stderr raise TypeError("Unable to decode stderr into a string-like type")
[docs] class FDDrainer: """Reads data from a file descriptor in a thread, storing locally.""" # pylint: disable=R0913, R0902 def __init__( self, fd, result, name=None, logger=None, logger_prefix="%s", stream_logger=None, ignore_bg_processes=False, verbose=False, ): """Initialize FDDrainer to read from a file descriptor in a thread. Stores data locally in a file-like :attr:`data` object. :param fd: a file descriptor that will be read (drained) from :type fd: int :param result: a :class:`CmdResult` instance associated with the process used to detect if the process is still running and if there's still data to be read. :type result: CmdResult :param name: a descriptive name that will be passed to the Thread name :type name: str :param logger: the logger that will be used to (interactively) write the content from the file descriptor :type logger: logging.Logger :param logger_prefix: the prefix used when logging the data :type logger_prefix: str :param stream_logger: a logger for streaming output :type stream_logger: logging.Logger :param ignore_bg_processes: When True the process does not wait for child processes which keep opened stdout/stderr streams after the main process finishes (eg. forked daemon which did not closed the stdout/stderr). Note this might result in missing output produced by those daemons after the main thread finishes and also it allows those daemons to be running after the process finishes. :type ignore_bg_processes: bool :param verbose: whether to log in both the logger and stream_logger :type verbose: bool """ self.fd = fd self.name = name self.data = BytesIO() self._result = result self._thread = None self._logger = logger self._logger_prefix = logger_prefix self._stream_logger = stream_logger self._ignore_bg_processes = ignore_bg_processes self._verbose = verbose def _log_line(self, line, newline_for_stream="\n"): line = astring.to_text(line, self._result.encoding, "replace") if self._logger is not None: self._logger.debug(self._logger_prefix, line) if self._stream_logger is not None: self._stream_logger.debug(line + newline_for_stream) def _drainer(self): """Read from fd, storing and optionally logging the output.""" bfr = b"" while True: if self._ignore_bg_processes: has_io = select.select([self.fd], [], [], 1)[0] if not has_io and self._result.exit_status is not None: # Exit if no new data and main process has finished break if not has_io: # Don't read unless there are new data available continue try: tmp = os.read(self.fd, 8192) except OSError: break if not tmp: break self.data.write(tmp) if self._verbose: bfr += tmp lines = bfr.splitlines() for line in lines[:-1]: self._log_line(line) if bfr.endswith(b"\n"): self._log_line(lines[-1]) else: self._log_line(lines[-1], "") bfr = b""
[docs] def start(self): """Start the drainer thread to read from the file descriptor.""" self._thread = threading.Thread(target=self._drainer, name=self.name) self._thread.daemon = True self._thread.start()
[docs] def flush(self): """Wait for drainer thread to complete and flush stream handlers.""" self._thread.join() if self._stream_logger is not None: for handler in self._stream_logger.handlers: # FileHandler has a close() method, which we expect will # flush the file on disk. SocketHandler, MemoryHandler # and other logging handlers (custom ones?) also have # the same interface, so let's try to use it if available stream = getattr(handler, "stream", None) if (stream is not None) and (not stream.closed): if hasattr(stream, "fileno"): try: fileno = stream.fileno() os.fsync(fileno) except UnsupportedOperation: pass if hasattr(handler, "close"): handler.close()
[docs] class SubProcess: """Run a subprocess in the background, collecting stdout/stderr streams.""" # pylint: disable=R0913, R0902 def __init__( self, cmd, verbose=True, shell=False, env=None, sudo=False, ignore_bg_processes=False, encoding=None, logger=None, ): """Create the subprocess object, stdout/err, reader threads and locks. :param cmd: Command line to run. :type cmd: str :param verbose: Whether to log the command run and stdout/stderr. :type verbose: bool :param shell: Whether to run the subprocess in a subshell. :type shell: bool :param env: Use extra environment variables. :type env: dict :param sudo: Whether the command requires admin privileges to run, so that sudo will be prepended to the command. The assumption here is that the user running the command has a sudo configuration such that a password won't be prompted. If that's not the case, the command will straight out fail. :type sudo: bool :param ignore_bg_processes: When True the process does not wait for child processes which keep opened stdout/stderr streams after the main process finishes (eg. forked daemon which did not closed the stdout/stderr). Note this might result in missing output produced by those daemons after the main thread finishes and also it allows those daemons to be running after the process finishes. :type ignore_bg_processes: bool :param encoding: the encoding to use for the text representation of the command result stdout and stderr, by default :data:`autils.devel.astring.ENCODING` :type encoding: str :param logger: User's custom logger, which will be logging the subprocess outputs. When this parameter is not set, the `autils.devel.process` logger will be used. :type logger: logging.Logger :raises ValueError: If incorrect values are given to parameters. """ if encoding is None: encoding = astring.ENCODING if sudo: self.cmd = self._prepend_sudo(cmd, shell) else: self.cmd = cmd self.verbose = verbose self.result = CmdResult(self.cmd, encoding=encoding) self.shell = shell if env: self.env = os.environ.copy() self.env.update(env) else: self.env = None self._popen = None self.logger = logger or LOG self.stdout_logger = self.logger.getChild("stdout") self.stderr_logger = self.logger.getChild("stderr") self.output_logger = self.logger.getChild("output") # Drainers used when reading from the PIPEs and writing to # files and logs self._stdout_drainer = None self._stderr_drainer = None self._ignore_bg_processes = ignore_bg_processes def __repr__(self): if self._popen is None: rc = "(not started)" elif self.result.exit_status is None: rc = "(running)" else: rc = self.result.exit_status return f"{self.__class__.__name__}(cmd={self.cmd!r}, rc={rc!r})" def __str__(self): if self._popen is None: rc = "(not started)" elif self.result.exit_status is None: rc = "(running)" else: rc = f"(finished with exit status={int(self.result.exit_status)})" return f"{self.cmd} {rc}" @staticmethod def _prepend_sudo(cmd, shell): if os.getuid(): try: sudo_cmd = f"{path.find_command('sudo', check_exec=False)} -n" except path.CmdNotFoundError as details: LOG.error(details) LOG.error( "Parameter sudo=True provided, but sudo was " "not found. Please consider adding sudo to " "your OS image" ) return cmd if shell: if " -s" not in sudo_cmd: sudo_cmd = f"{sudo_cmd} -s" cmd = f"{sudo_cmd} {cmd}" return cmd def _init_subprocess(self): def signal_handler(*args): self.result.interrupted = "signal/ctrl+c" self.wait() signal.default_int_handler(*args) if self._popen is not None: return if self.verbose: LOG.info("Running '%s'", self.cmd) if self.shell is False: cmd = shlex.split(self.cmd) else: cmd = self.cmd try: self._popen = subprocess.Popen( # pylint: disable=R1732 cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=self.shell, env=self.env, ) except OSError as details: details.strerror += f" ({self.cmd})" raise details self.start_time = time.monotonic() # pylint: disable=W0201 # prepare fd drainers self._stdout_drainer = FDDrainer( self._popen.stdout.fileno(), self.result, name=f"{self.cmd}-stdout", logger=self.logger, logger_prefix="[stdout] %s", stream_logger=None, ignore_bg_processes=self._ignore_bg_processes, verbose=self.verbose, ) self._stderr_drainer = FDDrainer( self._popen.stderr.fileno(), self.result, name=f"{self.cmd}-stderr", logger=self.logger, logger_prefix="[stderr] %s", stream_logger=None, ignore_bg_processes=self._ignore_bg_processes, verbose=self.verbose, ) # start stdout/stderr threads self._stdout_drainer.start() self._stderr_drainer.start() try: signal.signal(signal.SIGINT, signal_handler) except ValueError: if self.verbose: LOG.info("Command %s running on a thread", self.cmd) def _fill_results(self, rc): self._init_subprocess() self.result.exit_status = rc if not self.result.duration: self.result.duration = time.monotonic() - self.start_time if self.verbose: LOG.info( "Command '%s' finished with %s after %.9fs", self.cmd, rc, self.result.duration, ) self.result.pid = self._popen.pid self._fill_streams() def _fill_streams(self): """Close subprocess stdout and stderr, and put values into result obj.""" # Cleaning up threads if self._stdout_drainer is not None: self._stdout_drainer.flush() if self._stderr_drainer is not None: self._stderr_drainer.flush() # Clean subprocess pipes and populate stdout/err self.result.stdout = self.get_stdout() self.result.stderr = self.get_stderr()
[docs] def start(self): """Start running the subprocess. This method is particularly useful for background processes, since you can start the subprocess and not block your test flow. :return: Subprocess PID. :rtype: int """ self._init_subprocess() return self._popen.pid
[docs] def get_stdout(self): """Get the full stdout of the subprocess so far. :return: Standard output of the process. :rtype: bytes """ self._init_subprocess() return self._stdout_drainer.data.getvalue()
[docs] def get_stderr(self): """Get the full stderr of the subprocess so far. :return: Standard error of the process. :rtype: bytes """ self._init_subprocess() return self._stderr_drainer.data.getvalue()
[docs] def terminate(self): """Send a :attr:`signal.SIGTERM` to the process. Please consider using :meth:`stop` instead if you want to do all that's possible to finalize the process and wait for it to finish. """ self._init_subprocess() self.send_signal(signal.SIGTERM)
[docs] def kill(self): """Send a :attr:`signal.SIGKILL` to the process. Please consider using :meth:`stop` instead if you want to do all that's possible to finalize the process and wait for it to finish. """ self._init_subprocess() self.send_signal(signal.SIGKILL)
[docs] def send_signal(self, sig): """Send the specified signal to the process. :param sig: Signal to send. :type sig: int """ self._init_subprocess() if self.is_sudo_enabled(): pids = get_children_pids(self.get_pid()) pids.append(self.get_pid()) for pid in pids: kill_cmd = f"kill -{int(sig)} {int(pid)}" with contextlib.suppress(Exception): run(kill_cmd, sudo=True) else: self._popen.send_signal(sig)
[docs] def poll(self): """Call the subprocess poll() method, fill results if rc is not None. :return: Return code if process has finished, None otherwise. :rtype: int or None """ self._init_subprocess() rc = self._popen.poll() if rc is not None: self._fill_results(rc) return rc
[docs] def wait(self, timeout=None, sig=signal.SIGTERM): """Wait for subprocess to complete, fill results when done. :param timeout: Time (seconds) we'll wait until the process is finished. If it's not, we'll try to terminate it and it's children using ``sig`` and get a status. When the process refuses to die within 1s we use SIGKILL and report the status (be it exit_code or zombie). :type timeout: float or None :param sig: Signal to send to the process in case it did not end after the specified timeout. :type sig: int :return: Exit status of the process. :rtype: int :raises AssertionError: If the process becomes a zombie. """ def nuke_myself(): timeout = time.monotonic() - self.start_time self.result.interrupted = f"timeout after {timeout:.9f}s" try: kill_process_tree(self.get_pid(), sig, timeout=1) except RuntimeError: try: kill_process_tree(self.get_pid(), signal.SIGKILL, timeout=1) LOG.warning( "Process '%s' refused to die in 1s after " "sending %s to, destroyed it successfully " "using SIGKILL.", self.cmd, sig, ) except RuntimeError: LOG.error( "Process '%s' refused to die in 1s after " "sending %s, followed by SIGKILL, probably " "dealing with a zombie process.", self.cmd, sig, ) self._init_subprocess() rc = None if timeout is None: rc = self._popen.wait() elif timeout > 0.0: timer = threading.Timer(timeout, nuke_myself) try: timer.start() rc = self._popen.wait() finally: timer.cancel() if rc is None: stop_time = time.monotonic() + 1 while time.monotonic() < stop_time: rc = self._popen.poll() if rc is not None: break else: nuke_myself() rc = self._popen.poll() if rc is None: # If all this work fails, we're dealing with a zombie process. raise AssertionError(f"Zombie Process {self._popen.pid}") self._fill_results(rc) return rc
[docs] def stop(self, timeout=None): """Stop background subprocess. Call this method to terminate the background subprocess and wait for its results. :param timeout: Time (seconds) we'll wait until the process is finished. If it's not, we'll try to terminate it and its children using ``sig`` and get a status. When the process refuses to die within 1s we use SIGKILL and report the status (be it exit_code or zombie). :type timeout: float or None :return: Exit status of the process. :rtype: int """ self._init_subprocess() if self.result.exit_status is None: self.terminate() return self.wait(timeout)
[docs] def get_pid(self): """Report PID of this process. :return: Process ID. :rtype: int """ self._init_subprocess() return self._popen.pid
[docs] def get_user_id(self): """Report user id of this process. :return: User ID of the process owner. :rtype: int or None """ self._init_subprocess() return get_owner_id(self.get_pid())
[docs] def is_sudo_enabled(self): """Return whether the subprocess is running with sudo enabled. :return: True if running as root (UID 0), False otherwise. :rtype: bool """ self._init_subprocess() return not self.get_user_id()
[docs] def run(self, timeout=None, sig=signal.SIGTERM): """Start a process and wait for it to end, returning the result attr. If the process was already started using .start(), this will simply wait for it to end. :param timeout: Time (seconds) we'll wait until the process is finished. If it's not, we'll try to terminate it and its children using ``sig`` and get a status. When the process refuses to die within 1s we use SIGKILL and report the status (be it exit_code or zombie). :type timeout: float or None :param sig: Signal to send to the process in case it did not end after the specified timeout. :type sig: int :return: The command result object. :rtype: CmdResult """ self._init_subprocess() self.wait(timeout, sig) return self.result
# pylint: disable=R0913
[docs] def run( cmd, timeout=None, verbose=True, ignore_status=False, shell=False, env=None, sudo=False, ignore_bg_processes=False, encoding=None, logger=None, ): """Run a subprocess, returning a CmdResult object. :param cmd: Command line to run. :type cmd: str :param timeout: Time limit in seconds before attempting to kill the running process. This function will take a few seconds longer than 'timeout' to complete if it has to kill the process. :type timeout: float or None :param verbose: Whether to log the command run and stdout/stderr. :type verbose: bool :param ignore_status: Whether to raise an exception when command returns =! 0 (False), or not (True). :type ignore_status: bool :param shell: Whether to run the command on a subshell. :type shell: bool :param env: Use extra environment variables. :type env: dict :param sudo: Whether the command requires admin privileges to run, so that sudo will be prepended to the command. The assumption here is that the user running the command has a sudo configuration such that a password won't be prompted. If that's not the case, the command will straight out fail. :type sudo: bool :param ignore_bg_processes: Whether to ignore background processes. :type ignore_bg_processes: bool :param encoding: the encoding to use for the text representation of the command result stdout and stderr, by default :data:`autils.devel.astring.ENCODING` :type encoding: str :param logger: User's custom logger, which will be logging the subprocess outputs. When this parameter is not set, the `autils.devel.process` logger will be used. :type logger: logging.Logger :return: A CmdResult object. :rtype: CmdResult :raises CmdInputError: If the command is empty. :raises CmdError: If ``ignore_status=False`` and command fails. """ if not cmd: raise CmdInputError("Invalid empty command") if encoding is None: encoding = astring.ENCODING sp = SubProcess( cmd=cmd, verbose=verbose, shell=shell, env=env, sudo=sudo, ignore_bg_processes=ignore_bg_processes, encoding=encoding, logger=logger, ) cmd_result = sp.run(timeout=timeout) fail_condition = cmd_result.exit_status or cmd_result.interrupted if fail_condition and not ignore_status: raise CmdError(cmd, sp.result) return cmd_result
# pylint: disable=R0913
[docs] def system( cmd, timeout=None, verbose=True, ignore_status=False, shell=False, env=None, sudo=False, ignore_bg_processes=False, encoding=None, logger=None, ): """Run a subprocess, returning its exit code. :param cmd: Command line to run. :type cmd: str :param timeout: Time limit in seconds before attempting to kill the running process. This function will take a few seconds longer than 'timeout' to complete if it has to kill the process. :type timeout: float or None :param verbose: Whether to log the command run and stdout/stderr. :type verbose: bool :param ignore_status: Whether to raise an exception when command returns =! 0 (False), or not (True). :type ignore_status: bool :param shell: Whether to run the command on a subshell. :type shell: bool :param env: Use extra environment variables. :type env: dict :param sudo: Whether the command requires admin privileges to run, so that sudo will be prepended to the command. The assumption here is that the user running the command has a sudo configuration such that a password won't be prompted. If that's not the case, the command will straight out fail. :type sudo: bool :param ignore_bg_processes: Whether to ignore background processes. :type ignore_bg_processes: bool :param encoding: the encoding to use for the text representation of the command result stdout and stderr, by default :data:`autils.devel.astring.ENCODING` :type encoding: str :param logger: User's custom logger, which will be logging the subprocess outputs. When this parameter is not set, the `autils.devel.process` logger will be used. :type logger: logging.Logger :return: Exit code. :rtype: int :raises CmdError: If ``ignore_status=False`` and command fails. """ cmd_result = run( cmd=cmd, timeout=timeout, verbose=verbose, ignore_status=ignore_status, shell=shell, env=env, sudo=sudo, ignore_bg_processes=ignore_bg_processes, encoding=encoding, logger=logger, ) return cmd_result.exit_status
# pylint: disable=R0913
[docs] def system_output( cmd, timeout=None, verbose=True, ignore_status=False, shell=False, env=None, sudo=False, ignore_bg_processes=False, strip_trail_nl=True, encoding=None, logger=None, ): """Run a subprocess, returning its output. :param cmd: Command line to run. :type cmd: str :param timeout: Time limit in seconds before attempting to kill the running process. This function will take a few seconds longer than 'timeout' to complete if it has to kill the process. :type timeout: float or None :param verbose: Whether to log the command run and stdout/stderr. :type verbose: bool :param ignore_status: Whether to raise an exception when command returns =! 0 (False), or not (True). :type ignore_status: bool :param shell: Whether to run the command on a subshell. :type shell: bool :param env: Use extra environment variables. :type env: dict :param sudo: Whether the command requires admin privileges to run, so that sudo will be prepended to the command. The assumption here is that the user running the command has a sudo configuration such that a password won't be prompted. If that's not the case, the command will straight out fail. :type sudo: bool :param ignore_bg_processes: Whether to ignore background processes. :type ignore_bg_processes: bool :param strip_trail_nl: Whether to strip the trailing newline. :type strip_trail_nl: bool :param encoding: the encoding to use for the text representation of the command result stdout and stderr, by default :data:`autils.devel.astring.ENCODING` :type encoding: str :param logger: User's custom logger, which will be logging the subprocess outputs. When this parameter is not set, the `autils.devel.process` logger will be used. :type logger: logging.Logger :return: Command output. :rtype: bytes :raises CmdError: If ``ignore_status=False`` and command fails. """ cmd_result = run( cmd=cmd, timeout=timeout, verbose=verbose, ignore_status=ignore_status, shell=shell, env=env, sudo=sudo, ignore_bg_processes=ignore_bg_processes, encoding=encoding, logger=logger, ) if strip_trail_nl: return cmd_result.stdout.rstrip(b"\n\r") return cmd_result.stdout
# pylint: disable=R0913
[docs] def getoutput( cmd, timeout=None, verbose=False, ignore_status=True, shell=True, env=None, sudo=False, ignore_bg_processes=False, logger=None, ): """Return output (stdout or stderr) of executing cmd in a shell. Because commands module is removed in Python3 and it redirect stderr to stdout, we port commands.getoutput to make code compatible. :param cmd: Command line to run. :type cmd: str :param timeout: Time limit in seconds before attempting to kill the running process. This function will take a few seconds longer than 'timeout' to complete if it has to kill the process. :type timeout: float or None :param verbose: Whether to log the command run and stdout/stderr. :type verbose: bool :param ignore_status: Whether to raise an exception when command returns =! 0 (False), or not (True). :type ignore_status: bool :param shell: Whether to run the command on a subshell. :type shell: bool :param env: Use extra environment variables. :type env: dict :param sudo: Whether the command requires admin privileges to run, so that sudo will be prepended to the command. The assumption here is that the user running the command has a sudo configuration such that a password won't be prompted. If that's not the case, the command will straight out fail. :type sudo: bool :param ignore_bg_processes: Whether to ignore background processes. :type ignore_bg_processes: bool :param logger: User's custom logger, which will be logging the subprocess outputs. When this parameter is not set, the `autils.devel.process` logger will be used. :type logger: logging.Logger :return: Command output (stdout or stderr). :rtype: str """ return getstatusoutput( cmd=cmd, timeout=timeout, verbose=verbose, ignore_status=ignore_status, shell=shell, env=env, sudo=sudo, ignore_bg_processes=ignore_bg_processes, logger=logger, )[1]
# pylint: disable=R0913
[docs] def getstatusoutput( cmd, timeout=None, verbose=False, ignore_status=True, shell=True, env=None, sudo=False, ignore_bg_processes=False, logger=None, ): """Return (status, output) of executing cmd in a shell. Because commands module is removed in Python3 and it redirect stderr to stdout, we port commands.getstatusoutput to make code compatible. :param cmd: Command line to run. :type cmd: str :param timeout: Time limit in seconds before attempting to kill the running process. This function will take a few seconds longer than 'timeout' to complete if it has to kill the process. :type timeout: float or None :param verbose: Whether to log the command run and stdout/stderr. :type verbose: bool :param ignore_status: Whether to raise an exception when command returns =! 0 (False), or not (True). :type ignore_status: bool :param shell: Whether to run the command on a subshell. :type shell: bool :param env: Use extra environment variables. :type env: dict :param sudo: Whether the command requires admin privileges to run, so that sudo will be prepended to the command. The assumption here is that the user running the command has a sudo configuration such that a password won't be prompted. If that's not the case, the command will straight out fail. :type sudo: bool :param ignore_bg_processes: Whether to ignore background processes. :type ignore_bg_processes: bool :param logger: User's custom logger, which will be logging the subprocess outputs. When this parameter is not set, the `autils.devel.process` logger will be used. :type logger: logging.Logger :return: Exit status and command output (stdout and stderr). :rtype: tuple """ cmd_result = run( cmd=cmd, timeout=timeout, verbose=verbose, ignore_status=ignore_status, shell=shell, env=env, sudo=sudo, ignore_bg_processes=ignore_bg_processes, logger=logger, ) text = cmd_result.stdout_text sts = cmd_result.exit_status if text[-1:] == "\n": text = text[:-1] return (sts, text)
[docs] def get_owner_id(pid): """Get the user ID of the process owner. This function reads the /proc filesystem to determine the user ID that owns the specified process. .. note:: This is currently Linux specific. :param pid: The process ID to query. :type pid: int :return: The user ID of the process owner, or None if not found. :rtype: int or None Example:: >>> get_owner_id(1) 0 >>> get_owner_id(999999) None """ try: return os.stat(f"/proc/{int(pid)}/").st_uid except OSError: return None
[docs] def get_command_output_matching(command, pattern): """Run a command and return lines matching a pattern. This function executes a command and searches its output for lines containing the specified pattern, returning all matching lines. :param command: The command to execute. :type command: str :param pattern: Pattern to search for in the output. Matching is done on a line-by-line basis using substring matching. :type pattern: str :return: List of lines from the command output that contain the pattern. :rtype: list of str Example:: >>> get_command_output_matching("ls -la", "txt") ['file1.txt', 'file2.txt'] """ return [line for line in run(command).stdout_text.splitlines() if pattern in line]