diff options
Diffstat (limited to 'ansible_collections/community/general/plugins/lookup/bitwarden.py')
-rw-r--r-- | ansible_collections/community/general/plugins/lookup/bitwarden.py | 121 |
1 files changed, 93 insertions, 28 deletions
diff --git a/ansible_collections/community/general/plugins/lookup/bitwarden.py b/ansible_collections/community/general/plugins/lookup/bitwarden.py index 27de1afe6..2cb2d19a1 100644 --- a/ansible_collections/community/general/plugins/lookup/bitwarden.py +++ b/ansible_collections/community/general/plugins/lookup/bitwarden.py @@ -13,7 +13,7 @@ DOCUMENTATION = """ - bw (command line utility) - be logged into bitwarden - bitwarden vault unlocked - - C(BW_SESSION) environment variable set + - E(BW_SESSION) environment variable set short_description: Retrieve secrets from Bitwarden version_added: 5.4.0 description: @@ -25,7 +25,11 @@ DOCUMENTATION = """ type: list elements: str search: - description: Field to retrieve, for example C(name) or C(id). + description: + - Field to retrieve, for example V(name) or V(id). + - If set to V(id), only zero or one element can be returned. + Use the Jinja C(first) filter to get the only list element. + - When O(collection_id) is set, this field can be undefined to retrieve the whole collection records. type: str default: name version_added: 5.7.0 @@ -36,40 +40,57 @@ DOCUMENTATION = """ description: Collection ID to filter results by collection. Leave unset to skip filtering. type: str version_added: 6.3.0 + bw_session: + description: Pass session key instead of reading from env. + type: str + version_added: 8.4.0 """ EXAMPLES = """ -- name: "Get 'password' from Bitwarden record named 'a_test'" +- name: "Get 'password' from all Bitwarden records named 'a_test'" ansible.builtin.debug: msg: >- {{ lookup('community.general.bitwarden', 'a_test', field='password') }} -- name: "Get 'password' from Bitwarden record with id 'bafba515-af11-47e6-abe3-af1200cd18b2'" +- name: "Get 'password' from Bitwarden record with ID 'bafba515-af11-47e6-abe3-af1200cd18b2'" ansible.builtin.debug: msg: >- - {{ lookup('community.general.bitwarden', 'bafba515-af11-47e6-abe3-af1200cd18b2', search='id', field='password') }} + {{ lookup('community.general.bitwarden', 'bafba515-af11-47e6-abe3-af1200cd18b2', search='id', field='password') | first }} -- name: "Get 'password' from Bitwarden record named 'a_test' from collection" +- name: "Get 'password' from all Bitwarden records named 'a_test' from collection" ansible.builtin.debug: msg: >- {{ lookup('community.general.bitwarden', 'a_test', field='password', collection_id='bafba515-af11-47e6-abe3-af1200cd18b2') }} -- name: "Get full Bitwarden record named 'a_test'" +- name: "Get list of all full Bitwarden records named 'a_test'" ansible.builtin.debug: msg: >- {{ lookup('community.general.bitwarden', 'a_test') }} -- name: "Get custom field 'api_key' from Bitwarden record named 'a_test'" +- name: "Get custom field 'api_key' from all Bitwarden records named 'a_test'" ansible.builtin.debug: msg: >- {{ lookup('community.general.bitwarden', 'a_test', field='api_key') }} + +- name: "Get 'password' from all Bitwarden records named 'a_test', using given session key" + ansible.builtin.debug: + msg: >- + {{ lookup('community.general.bitwarden', 'a_test', field='password', bw_session='bXZ9B5TXi6...') }} + +- name: "Get all Bitwarden records from collection" + ansible.builtin.debug: + msg: >- + {{ lookup('community.general.bitwarden', None, collection_id='bafba515-af11-47e6-abe3-af1200cd18b2') }} """ RETURN = """ _raw: - description: List of requested field or JSON object of list of matches. + description: + - A one-element list that contains a list of requested fields or JSON objects of matches. + - If you use C(query), you get a list of lists. If you use C(lookup) without C(wantlist=true), + this always gets reduced to a list of field values or JSON objects. type: list - elements: raw + elements: list """ from subprocess import Popen, PIPE @@ -88,76 +109,120 @@ class Bitwarden(object): def __init__(self, path='bw'): self._cli_path = path + self._session = None @property def cli_path(self): return self._cli_path @property + def session(self): + return self._session + + @session.setter + def session(self, value): + self._session = value + + @property def unlocked(self): out, err = self._run(['status'], stdin="") decoded = AnsibleJSONDecoder().raw_decode(out)[0] return decoded['status'] == 'unlocked' def _run(self, args, stdin=None, expected_rc=0): + if self.session: + args += ['--session', self.session] + p = Popen([self.cli_path] + args, stdout=PIPE, stderr=PIPE, stdin=PIPE) out, err = p.communicate(to_bytes(stdin)) rc = p.wait() if rc != expected_rc: + if len(args) > 2 and args[0] == 'get' and args[1] == 'item' and b'Not found.' in err: + return 'null', '' raise BitwardenException(err) return to_text(out, errors='surrogate_or_strict'), to_text(err, errors='surrogate_or_strict') - def _get_matches(self, search_value, search_field, collection_id): + def _get_matches(self, search_value, search_field, collection_id=None): """Return matching records whose search_field is equal to key. """ # Prepare set of params for Bitwarden CLI - params = ['list', 'items', '--search', search_value] + if search_value: + if search_field == 'id': + params = ['get', 'item', search_value] + else: + params = ['list', 'items', '--search', search_value] + if collection_id: + params.extend(['--collectionid', collection_id]) + else: + if not collection_id: + raise AnsibleError("search_value is required if collection_id is not set.") - if collection_id: - params.extend(['--collectionid', collection_id]) + params = ['list', 'items', '--collectionid', collection_id] out, err = self._run(params) # This includes things that matched in different fields. initial_matches = AnsibleJSONDecoder().raw_decode(out)[0] + if search_field == 'id' or not search_value: + if initial_matches is None: + initial_matches = [] + else: + initial_matches = [initial_matches] + # Filter to only include results from the right field. return [item for item in initial_matches if item[search_field] == search_value] - def get_field(self, field, search_value, search_field="name", collection_id=None): + def get_field(self, field, search_value=None, search_field="name", collection_id=None): """Return a list of the specified field for records whose search_field match search_value and filtered by collection if collection has been provided. If field is None, return the whole record for each match. """ matches = self._get_matches(search_value, search_field, collection_id) - - if field in ['autofillOnPageLoad', 'password', 'passwordRevisionDate', 'totp', 'uris', 'username']: - return [match['login'][field] for match in matches] - elif not field: + if not field: return matches - else: - custom_field_matches = [] - for match in matches: + field_matches = [] + for match in matches: + # if there are no custom fields, then `match` has no key 'fields' + if 'fields' in match: + custom_field_found = False for custom_field in match['fields']: - if custom_field['name'] == field: - custom_field_matches.append(custom_field['value']) - if matches and not custom_field_matches: - raise AnsibleError("Custom field {field} does not exist in {search_value}".format(field=field, search_value=search_value)) - return custom_field_matches + if field == custom_field['name']: + field_matches.append(custom_field['value']) + custom_field_found = True + break + if custom_field_found: + continue + if 'login' in match and field in match['login']: + field_matches.append(match['login'][field]) + continue + if field in match: + field_matches.append(match[field]) + continue + + if matches and not field_matches: + raise AnsibleError("field {field} does not exist in {search_value}".format(field=field, search_value=search_value)) + + return field_matches class LookupModule(LookupBase): - def run(self, terms, variables=None, **kwargs): + def run(self, terms=None, variables=None, **kwargs): self.set_options(var_options=variables, direct=kwargs) field = self.get_option('field') search_field = self.get_option('search') collection_id = self.get_option('collection_id') + _bitwarden.session = self.get_option('bw_session') + if not _bitwarden.unlocked: raise AnsibleError("Bitwarden Vault locked. Run 'bw unlock'.") + if not terms: + return [_bitwarden.get_field(field, None, search_field, collection_id)] + return [_bitwarden.get_field(field, term, search_field, collection_id) for term in terms] |