Source code for ansible_compat.runtime

"""Ansible runtime environment maanger."""
import importlib
import json
import logging
import os
import pathlib
import re
import shutil
import subprocess
import tempfile
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union

import packaging
import subprocess_tee

from ansible_compat.config import (
    AnsibleConfig,
    ansible_collections_path,
    parse_ansible_version,
)
from ansible_compat.constants import MSG_INVALID_FQRL
from ansible_compat.errors import (
    AnsibleCommandError,
    AnsibleCompatError,
    InvalidPrerequisiteError,
    MissingAnsibleError,
)
from ansible_compat.loaders import yaml_from_file
from ansible_compat.prerun import get_cache_dir

if TYPE_CHECKING:
    # https://github.com/PyCQA/pylint/issues/3240
    # pylint: disable=unsubscriptable-object
    CompletedProcess = subprocess.CompletedProcess[Any]
else:
    CompletedProcess = subprocess.CompletedProcess

_logger = logging.getLogger(__name__)


[docs]class Runtime: """Ansible Runtime manager.""" _version: Optional[packaging.version.Version] = None cache_dir: Optional[str] = None # pylint: disable=too-many-arguments
[docs] def __init__( self, project_dir: Optional[str] = None, isolated: bool = False, min_required_version: Optional[str] = None, require_module: bool = False, max_retries: int = 0, environ: Optional[Dict[str, str]] = None, ) -> None: """Initialize Ansible runtime environment. :param project_dir: The directory containing the Ansible project. If not mentioned it will be guessed from the current working directory. :param isolated: Assure that installation of collections or roles does not affect Ansible installation, an unique cache directory being used instead. :param min_required_version: Minimal version of Ansible required. If not found, a :class:`RuntimeError` exception is raised. :param: require_module: If set, instantiation will fail if Ansible Python module is missing or is not matching the same version as the Ansible command line. That is useful for consumers that expect to also perform Python imports from Ansible. :param max_retries: Number of times it should retry network operations. Default is 0, no retries. :param environ: Environment dictionary to use, if undefined ``os.environ`` will be copied and used. """ self.project_dir = project_dir or os.getcwd() self.isolated = isolated self.max_retries = max_retries self.environ = environ or os.environ.copy() if isolated: self.cache_dir = get_cache_dir(self.project_dir) self.config = AnsibleConfig() if not self.version_in_range(lower=min_required_version): raise RuntimeError( f"Found incompatible version of ansible runtime {self.version}, instead of {min_required_version} or newer." ) if require_module: self._ensure_module_available()
def _ensure_module_available(self) -> None: """Assure that Ansible Python module is installed and matching CLI version.""" ansible_release_module = None try: ansible_release_module = importlib.import_module("ansible.release") except (ModuleNotFoundError, ImportError): pass if ansible_release_module is None: raise RuntimeError("Unable to find Ansible python module.") ansible_module_version = packaging.version.parse( ansible_release_module.__version__ ) if ansible_module_version != self.version: raise RuntimeError( f"Ansible CLI ({self.version}) and python module" f" ({ansible_module_version}) versions do not match. This " "indicates a broken execution environment." )
[docs] def clean(self) -> None: """Remove content of cache_dir.""" if self.cache_dir: shutil.rmtree(self.cache_dir, ignore_errors=True)
[docs] def exec( self, args: Union[str, List[str]], retry: bool = False, tee: bool = False, env: Optional[Dict[str, str]] = None, cwd: Optional[str] = None, ) -> CompletedProcess: """Execute a command inside an Ansible environment. :param retry: Retry network operations on failures. :param tee: Also pass captured stdout/stderr to system while running. """ if tee: run_func: Callable[..., CompletedProcess] = subprocess_tee.run else: run_func = subprocess.run for _ in range(self.max_retries + 1 if retry else 1): result = run_func( args, universal_newlines=True, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env or self.environ, cwd=cwd, ) if result.returncode == 0: break _logger.warning( "Retrying execution failure %s of: %s", result.returncode, " ".join(args), ) return result
@property def version(self) -> packaging.version.Version: """Return current Version object for Ansible. If version is not mentioned, it returns current version as detected. When version argument is mentioned, it return converts the version string to Version object in order to make it usable in comparisons. """ if self._version: return self._version proc = self.exec(["ansible", "--version"]) if proc.returncode == 0: self._version = parse_ansible_version(proc.stdout) return self._version msg = "Unable to find a working copy of ansible executable." raise MissingAnsibleError(msg, proc=proc)
[docs] def version_in_range( self, lower: Optional[str] = None, upper: Optional[str] = None ) -> bool: """Check if Ansible version is inside a required range. The lower limit is inclusive and the upper one exclusive. """ if lower and self.version < packaging.version.Version(lower): return False if upper and self.version >= packaging.version.Version(upper): return False return True
[docs] def install_collection( self, collection: str, destination: Optional[Union[str, pathlib.Path]] = None, force: bool = False, ) -> None: """Install an Ansible collection. Can accept version constraints like 'foo.bar:>=1.2.3' """ cmd = [ "ansible-galaxy", "collection", "install", "-v", ] # ansible-galaxy before 2.11 fails to upgrade collection unless --force # is present, newer versions do not need it if force or self.version_in_range(upper="2.11"): cmd.append("--force") if destination: cmd.extend(["-p", str(destination)]) cmd.append(f"{collection}") _logger.info("Running %s", " ".join(cmd)) run = self.exec( cmd, retry=True, ) if run.returncode != 0: msg = f"Command returned {run.returncode} code:\n{run.stdout}\n{run.stderr}" _logger.error(msg) raise InvalidPrerequisiteError(msg)
[docs] def install_collection_from_disk( self, path: str, destination: Optional[Union[str, pathlib.Path]] = None ) -> None: """Build and install collection from a given disk path.""" if not self.version_in_range(upper="2.11"): self.install_collection(path, destination=destination, force=True) return # older versions of ansible able unable to install without building with tempfile.TemporaryDirectory() as tmp_dir: cmd = [ "ansible-galaxy", "collection", "build", "--output-path", tmp_dir, path, ] _logger.info("Running %s", " ".join(cmd)) run = self.exec(cmd, retry=False) if run.returncode != 0: _logger.error(run.stdout) raise AnsibleCommandError(run) for archive_file in os.listdir(tmp_dir): self.install_collection( os.path.join(tmp_dir, archive_file), destination=destination, force=True, )
[docs] def install_requirements(self, requirement: str, retry: bool = False) -> None: """Install dependencies from a requirements.yml.""" if not os.path.exists(requirement): return reqs_yaml = yaml_from_file(requirement) if not isinstance(reqs_yaml, (dict, list)): raise InvalidPrerequisiteError( f"{requirement} file is not a valid Ansible requirements file." ) if isinstance(reqs_yaml, list) or 'roles' in reqs_yaml: cmd = [ "ansible-galaxy", "role", "install", "-vr", f"{requirement}", ] if self.cache_dir: cmd.extend(["--roles-path", f"{self.cache_dir}/roles"]) _logger.info("Running %s", " ".join(cmd)) run = self.exec(cmd, retry=retry) if run.returncode != 0: _logger.error(run.stdout) raise AnsibleCommandError(run) # Run galaxy collection install works on v2 requirements.yml if "collections" in reqs_yaml: cmd = [ "ansible-galaxy", "collection", "install", "-vr", f"{requirement}", ] if self.cache_dir: cmd.extend(["-p", f"{self.cache_dir}/collections"]) _logger.info("Running %s", " ".join(cmd)) run = self.exec(cmd, retry=retry) if run.returncode != 0: _logger.error(run.stdout) raise AnsibleCommandError(run)
[docs] def prepare_environment( self, required_collections: Optional[Dict[str, str]] = None, retry: bool = False, install_local: bool = False, ) -> None: """Make dependencies available if needed.""" if required_collections is None: required_collections = {} self.install_requirements("requirements.yml", retry=retry) destination = f"{self.cache_dir}/collections" if self.cache_dir else None for name, min_version in required_collections.items(): self.install_collection( f"{name}:>={min_version}", destination=destination, ) self._prepare_ansible_paths() if not install_local: return if os.path.exists("galaxy.yml"): # molecule scenario within a collection self.install_collection_from_disk(".", destination=destination) elif pathlib.Path().resolve().parent.name == 'roles' and os.path.exists( "../../galaxy.yml" ): # molecule scenario located within roles/<role-name>/molecule inside # a collection self.install_collection_from_disk("../..", destination=destination) else: # no collection, try to recognize and install a standalone role self._install_galaxy_role(self.project_dir, ignore_errors=True)
[docs] def require_collection( # noqa: C901 self, name: str, version: Optional[str] = None, install: bool = True, ) -> None: """Check if a minimal collection version is present or exits. In the future this method may attempt to install a missing or outdated collection before failing. """ try: ns, coll = name.split('.', 1) except ValueError as exc: raise InvalidPrerequisiteError( "Invalid collection name supplied: %s" % name ) from exc paths: List[str] = self.config.collections_paths if not paths or not isinstance(paths, list): raise InvalidPrerequisiteError( f"Unable to determine ansible collection paths. ({paths})" ) if self.cache_dir: # if we have a cache dir, we want to be use that would be preferred # destination when installing a missing collection # https://github.com/PyCQA/pylint/issues/4667 paths.insert(0, f"{self.cache_dir}/collections") # pylint: disable=E1101 for path in paths: collpath = os.path.expanduser( os.path.join(path, 'ansible_collections', ns, coll) ) if os.path.exists(collpath): mpath = os.path.join(collpath, 'MANIFEST.json') if not os.path.exists(mpath): msg = f"Found collection at '{collpath}' but missing MANIFEST.json, cannot get info." _logger.fatal(msg) raise InvalidPrerequisiteError(msg) with open(mpath, 'r') as f: manifest = json.loads(f.read()) found_version = packaging.version.parse( manifest['collection_info']['version'] ) if version and found_version < packaging.version.parse(version): if install: self.install_collection(f"{name}:>={version}") self.require_collection(name, version, install=False) else: msg = f"Found {name} collection {found_version} but {version} or newer is required." _logger.fatal(msg) raise InvalidPrerequisiteError(msg) break else: if install: self.install_collection(f"{name}:>={version}") self.require_collection(name=name, version=version, install=False) else: msg = f"Collection '{name}' not found in '{paths}'" _logger.fatal(msg) raise InvalidPrerequisiteError(msg)
def _prepare_ansible_paths(self) -> None: """Configure Ansible environment variables.""" try: library_paths: List[str] = self.config.default_module_path.copy() roles_path: List[str] = self.config.default_roles_path.copy() collections_path: List[str] = self.config.collections_paths.copy() except AttributeError as exc: raise RuntimeError("Unexpected ansible configuration") from exc alterations_list = [ (library_paths, "plugins/modules", True), (roles_path, "roles", True), ] alterations_list.extend( [ (roles_path, f"{self.cache_dir}/roles", False), (library_paths, f"{self.cache_dir}/modules", False), (collections_path, f"{self.cache_dir}/collections", False), ] if self.isolated else [] ) for path_list, path, must_be_present in alterations_list: if not os.path.exists(path): if must_be_present: continue os.makedirs(path, exist_ok=True) if path not in path_list: path_list.insert(0, path) if library_paths != self.config.DEFAULT_MODULE_PATH: self._update_env('ANSIBLE_LIBRARY', library_paths) if collections_path != self.config.collections_paths: self._update_env(ansible_collections_path(), collections_path) if roles_path != self.config.default_roles_path: self._update_env('ANSIBLE_ROLES_PATH', roles_path) def _install_galaxy_role( self, project_dir: str, role_name_check: int = 0, ignore_errors: bool = False ) -> None: """Detect standalone galaxy role and installs it. :param: role_name_check: logic to used to check role name 0: exit with error if name is not compliant (default) 1: warn if name is not compliant 2: bypass any name checking :param: ignore_errors: if True, bypass installing invalid roles. Our implementation aims to match ansible-galaxy's behaviour for installing roles from a tarball or scm. For example ansible-galaxy will install a role that has both galaxy.yml and meta/main.yml present but empty. Also missing galaxy.yml is accepted but missing meta/main.yml is not. """ yaml = None galaxy_info = {} meta_filename = os.path.join(project_dir, 'meta', 'main.yml') if not os.path.exists(meta_filename): if ignore_errors: return else: yaml = yaml_from_file(meta_filename) if yaml and 'galaxy_info' in yaml: galaxy_info = yaml['galaxy_info'] fqrn = _get_role_fqrn(galaxy_info, project_dir) if role_name_check in [0, 1]: if not re.match(r"[a-z0-9][a-z0-9_]+\.[a-z][a-z0-9_]+$", fqrn): msg = MSG_INVALID_FQRL.format(fqrn) if role_name_check == 1: _logger.warning(msg) else: _logger.error(msg) raise InvalidPrerequisiteError(msg) else: # when 'role-name' is in skip_list, we stick to plain role names if 'role_name' in galaxy_info: role_namespace = _get_galaxy_role_ns(galaxy_info) role_name = _get_galaxy_role_name(galaxy_info) fqrn = f"{role_namespace}{role_name}" else: fqrn = pathlib.Path(project_dir).absolute().name path = pathlib.Path(os.path.expanduser(self.config.default_roles_path[0])) path.mkdir(parents=True, exist_ok=True) link_path = path / fqrn # despite documentation stating that is_file() reports true for symlinks, # it appears that is_dir() reports true instead, so we rely on exists(). target = pathlib.Path(project_dir).absolute() exists = link_path.exists() or link_path.is_symlink() if not exists or os.readlink(link_path) != str(target): if exists: link_path.unlink() link_path.symlink_to(str(target), target_is_directory=True) _logger.info( "Using %s symlink to current repository in order to enable Ansible to find the role using its expected full name.", link_path, ) def _update_env(self, varname: str, value: List[str], default: str = "") -> None: """Update colon based environment variable if needed. New values are prepended to make sure they take precedence. """ if not value: return orig_value = self.environ.get(varname, default) if orig_value: value = [*value, *orig_value.split(':')] value_str = ":".join(value) if value_str != self.environ.get(varname, ""): self.environ[varname] = value_str _logger.info("Set %s=%s", varname, value_str)
def _get_role_fqrn(galaxy_infos: Dict[str, Any], project_dir: str) -> str: """Compute role fqrn.""" role_namespace = _get_galaxy_role_ns(galaxy_infos) role_name = _get_galaxy_role_name(galaxy_infos) if len(role_name) == 0: role_name = pathlib.Path(project_dir).absolute().name role_name = re.sub(r'(ansible-|ansible-role-)', '', role_name).split( ".", maxsplit=2 )[-1] return f"{role_namespace}{role_name}" def _get_galaxy_role_ns(galaxy_infos: Dict[str, Any]) -> str: """Compute role namespace from meta/main.yml, including trailing dot.""" role_namespace = galaxy_infos.get('namespace', "") if len(role_namespace) == 0: role_namespace = galaxy_infos.get('author', "") if not isinstance(role_namespace, str): raise AnsibleCompatError( "Role namespace must be string, not %s" % role_namespace ) # if there's a space in the name space, it's likely author name # and not the galaxy login, so act as if there was no namespace if not role_namespace or re.match(r"^\w+ \w+", role_namespace): role_namespace = "" else: role_namespace = f"{role_namespace}." return role_namespace def _get_galaxy_role_name(galaxy_infos: Dict[str, Any]) -> str: """Compute role name from meta/main.yml.""" return galaxy_infos.get('role_name', "")