diff options
Diffstat (limited to 'lib/ansible/galaxy/collection/gpg.py')
-rw-r--r-- | lib/ansible/galaxy/collection/gpg.py | 282 |
1 files changed, 282 insertions, 0 deletions
diff --git a/lib/ansible/galaxy/collection/gpg.py b/lib/ansible/galaxy/collection/gpg.py new file mode 100644 index 0000000..8641f0d --- /dev/null +++ b/lib/ansible/galaxy/collection/gpg.py @@ -0,0 +1,282 @@ +# -*- coding: utf-8 -*- +# Copyright: (c) 2022, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +"""Signature verification helpers.""" + +from ansible.errors import AnsibleError +from ansible.galaxy.user_agent import user_agent +from ansible.module_utils.urls import open_url + +import contextlib +import os +import subprocess +import sys +import typing as t + +from dataclasses import dataclass, fields as dc_fields +from functools import partial +from urllib.error import HTTPError, URLError + +if t.TYPE_CHECKING: + from ansible.utils.display import Display + +IS_PY310_PLUS = sys.version_info[:2] >= (3, 10) + +frozen_dataclass = partial(dataclass, frozen=True, **({'slots': True} if IS_PY310_PLUS else {})) + + +def get_signature_from_source(source, display=None): # type: (str, t.Optional[Display]) -> str + if display is not None: + display.vvvv(f"Using signature at {source}") + try: + with open_url( + source, + http_agent=user_agent(), + validate_certs=True, + follow_redirects='safe' + ) as resp: + signature = resp.read() + except (HTTPError, URLError) as e: + raise AnsibleError( + f"Failed to get signature for collection verification from '{source}': {e}" + ) from e + + return signature + + +def run_gpg_verify( + manifest_file, # type: str + signature, # type: str + keyring, # type: str + display, # type: Display +): # type: (...) -> tuple[str, int] + status_fd_read, status_fd_write = os.pipe() + + # running the gpg command will create the keyring if it does not exist + remove_keybox = not os.path.exists(keyring) + + cmd = [ + 'gpg', + f'--status-fd={status_fd_write}', + '--verify', + '--batch', + '--no-tty', + '--no-default-keyring', + f'--keyring={keyring}', + '-', + manifest_file, + ] + cmd_str = ' '.join(cmd) + display.vvvv(f"Running command '{cmd}'") + + try: + p = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + pass_fds=(status_fd_write,), + encoding='utf8', + ) + except (FileNotFoundError, subprocess.SubprocessError) as err: + raise AnsibleError( + f"Failed during GnuPG verification with command '{cmd_str}': {err}" + ) from err + else: + stdout, stderr = p.communicate(input=signature) + finally: + os.close(status_fd_write) + + if remove_keybox: + with contextlib.suppress(OSError): + os.remove(keyring) + + with os.fdopen(status_fd_read) as f: + stdout = f.read() + display.vvvv( + f"stdout: \n{stdout}\nstderr: \n{stderr}\n(exit code {p.returncode})" + ) + return stdout, p.returncode + + +def parse_gpg_errors(status_out): # type: (str) -> t.Iterator[GpgBaseError] + for line in status_out.splitlines(): + if not line: + continue + try: + _dummy, status, remainder = line.split(maxsplit=2) + except ValueError: + _dummy, status = line.split(maxsplit=1) + remainder = None + + try: + cls = GPG_ERROR_MAP[status] + except KeyError: + continue + + fields = [status] + if remainder: + fields.extend( + remainder.split( + None, + len(dc_fields(cls)) - 2 + ) + ) + + yield cls(*fields) + + +@frozen_dataclass +class GpgBaseError(Exception): + status: str + + @classmethod + def get_gpg_error_description(cls) -> str: + """Return the current class description.""" + return ' '.join(cls.__doc__.split()) + + def __post_init__(self): + for field in dc_fields(self): + super(GpgBaseError, self).__setattr__(field.name, field.type(getattr(self, field.name))) + + +@frozen_dataclass +class GpgExpSig(GpgBaseError): + """The signature with the keyid is good, but the signature is expired.""" + keyid: str + username: str + + +@frozen_dataclass +class GpgExpKeySig(GpgBaseError): + """The signature with the keyid is good, but the signature was made by an expired key.""" + keyid: str + username: str + + +@frozen_dataclass +class GpgRevKeySig(GpgBaseError): + """The signature with the keyid is good, but the signature was made by a revoked key.""" + keyid: str + username: str + + +@frozen_dataclass +class GpgBadSig(GpgBaseError): + """The signature with the keyid has not been verified okay.""" + keyid: str + username: str + + +@frozen_dataclass +class GpgErrSig(GpgBaseError): + """"It was not possible to check the signature. This may be caused by + a missing public key or an unsupported algorithm. A RC of 4 + indicates unknown algorithm, a 9 indicates a missing public + key. + """ + keyid: str + pkalgo: int + hashalgo: int + sig_class: str + time: int + rc: int + fpr: str + + +@frozen_dataclass +class GpgNoPubkey(GpgBaseError): + """The public key is not available.""" + keyid: str + + +@frozen_dataclass +class GpgMissingPassPhrase(GpgBaseError): + """No passphrase was supplied.""" + + +@frozen_dataclass +class GpgBadPassphrase(GpgBaseError): + """The supplied passphrase was wrong or not given.""" + keyid: str + + +@frozen_dataclass +class GpgNoData(GpgBaseError): + """No data has been found. Codes for WHAT are: + - 1 :: No armored data. + - 2 :: Expected a packet but did not find one. + - 3 :: Invalid packet found, this may indicate a non OpenPGP + message. + - 4 :: Signature expected but not found. + """ + what: str + + +@frozen_dataclass +class GpgUnexpected(GpgBaseError): + """No data has been found. Codes for WHAT are: + - 1 :: No armored data. + - 2 :: Expected a packet but did not find one. + - 3 :: Invalid packet found, this may indicate a non OpenPGP + message. + - 4 :: Signature expected but not found. + """ + what: str + + +@frozen_dataclass +class GpgError(GpgBaseError): + """This is a generic error status message, it might be followed by error location specific data.""" + location: str + code: int + more: str = "" + + +@frozen_dataclass +class GpgFailure(GpgBaseError): + """This is the counterpart to SUCCESS and used to indicate a program failure.""" + location: str + code: int + + +@frozen_dataclass +class GpgBadArmor(GpgBaseError): + """The ASCII armor is corrupted.""" + + +@frozen_dataclass +class GpgKeyExpired(GpgBaseError): + """The key has expired.""" + timestamp: int + + +@frozen_dataclass +class GpgKeyRevoked(GpgBaseError): + """The used key has been revoked by its owner.""" + + +@frozen_dataclass +class GpgNoSecKey(GpgBaseError): + """The secret key is not available.""" + keyid: str + + +GPG_ERROR_MAP = { + 'EXPSIG': GpgExpSig, + 'EXPKEYSIG': GpgExpKeySig, + 'REVKEYSIG': GpgRevKeySig, + 'BADSIG': GpgBadSig, + 'ERRSIG': GpgErrSig, + 'NO_PUBKEY': GpgNoPubkey, + 'MISSING_PASSPHRASE': GpgMissingPassPhrase, + 'BAD_PASSPHRASE': GpgBadPassphrase, + 'NODATA': GpgNoData, + 'UNEXPECTED': GpgUnexpected, + 'ERROR': GpgError, + 'FAILURE': GpgFailure, + 'BADARMOR': GpgBadArmor, + 'KEYEXPIRED': GpgKeyExpired, + 'KEYREVOKED': GpgKeyRevoked, + 'NO_SECKEY': GpgNoSecKey, +} |