summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/general/plugins/lookup/bitwarden.py
diff options
context:
space:
mode:
Diffstat (limited to 'ansible_collections/community/general/plugins/lookup/bitwarden.py')
-rw-r--r--ansible_collections/community/general/plugins/lookup/bitwarden.py121
1 files changed, 93 insertions, 28 deletions
diff --git a/ansible_collections/community/general/plugins/lookup/bitwarden.py b/ansible_collections/community/general/plugins/lookup/bitwarden.py
index 27de1afe6..2cb2d19a1 100644
--- a/ansible_collections/community/general/plugins/lookup/bitwarden.py
+++ b/ansible_collections/community/general/plugins/lookup/bitwarden.py
@@ -13,7 +13,7 @@ DOCUMENTATION = """
- bw (command line utility)
- be logged into bitwarden
- bitwarden vault unlocked
- - C(BW_SESSION) environment variable set
+ - E(BW_SESSION) environment variable set
short_description: Retrieve secrets from Bitwarden
version_added: 5.4.0
description:
@@ -25,7 +25,11 @@ DOCUMENTATION = """
type: list
elements: str
search:
- description: Field to retrieve, for example C(name) or C(id).
+ description:
+ - Field to retrieve, for example V(name) or V(id).
+ - If set to V(id), only zero or one element can be returned.
+ Use the Jinja C(first) filter to get the only list element.
+ - When O(collection_id) is set, this field can be undefined to retrieve the whole collection records.
type: str
default: name
version_added: 5.7.0
@@ -36,40 +40,57 @@ DOCUMENTATION = """
description: Collection ID to filter results by collection. Leave unset to skip filtering.
type: str
version_added: 6.3.0
+ bw_session:
+ description: Pass session key instead of reading from env.
+ type: str
+ version_added: 8.4.0
"""
EXAMPLES = """
-- name: "Get 'password' from Bitwarden record named 'a_test'"
+- name: "Get 'password' from all Bitwarden records named 'a_test'"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test', field='password') }}
-- name: "Get 'password' from Bitwarden record with id 'bafba515-af11-47e6-abe3-af1200cd18b2'"
+- name: "Get 'password' from Bitwarden record with ID 'bafba515-af11-47e6-abe3-af1200cd18b2'"
ansible.builtin.debug:
msg: >-
- {{ lookup('community.general.bitwarden', 'bafba515-af11-47e6-abe3-af1200cd18b2', search='id', field='password') }}
+ {{ lookup('community.general.bitwarden', 'bafba515-af11-47e6-abe3-af1200cd18b2', search='id', field='password') | first }}
-- name: "Get 'password' from Bitwarden record named 'a_test' from collection"
+- name: "Get 'password' from all Bitwarden records named 'a_test' from collection"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test', field='password', collection_id='bafba515-af11-47e6-abe3-af1200cd18b2') }}
-- name: "Get full Bitwarden record named 'a_test'"
+- name: "Get list of all full Bitwarden records named 'a_test'"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test') }}
-- name: "Get custom field 'api_key' from Bitwarden record named 'a_test'"
+- name: "Get custom field 'api_key' from all Bitwarden records named 'a_test'"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test', field='api_key') }}
+
+- name: "Get 'password' from all Bitwarden records named 'a_test', using given session key"
+ ansible.builtin.debug:
+ msg: >-
+ {{ lookup('community.general.bitwarden', 'a_test', field='password', bw_session='bXZ9B5TXi6...') }}
+
+- name: "Get all Bitwarden records from collection"
+ ansible.builtin.debug:
+ msg: >-
+ {{ lookup('community.general.bitwarden', None, collection_id='bafba515-af11-47e6-abe3-af1200cd18b2') }}
"""
RETURN = """
_raw:
- description: List of requested field or JSON object of list of matches.
+ description:
+ - A one-element list that contains a list of requested fields or JSON objects of matches.
+ - If you use C(query), you get a list of lists. If you use C(lookup) without C(wantlist=true),
+ this always gets reduced to a list of field values or JSON objects.
type: list
- elements: raw
+ elements: list
"""
from subprocess import Popen, PIPE
@@ -88,76 +109,120 @@ class Bitwarden(object):
def __init__(self, path='bw'):
self._cli_path = path
+ self._session = None
@property
def cli_path(self):
return self._cli_path
@property
+ def session(self):
+ return self._session
+
+ @session.setter
+ def session(self, value):
+ self._session = value
+
+ @property
def unlocked(self):
out, err = self._run(['status'], stdin="")
decoded = AnsibleJSONDecoder().raw_decode(out)[0]
return decoded['status'] == 'unlocked'
def _run(self, args, stdin=None, expected_rc=0):
+ if self.session:
+ args += ['--session', self.session]
+
p = Popen([self.cli_path] + args, stdout=PIPE, stderr=PIPE, stdin=PIPE)
out, err = p.communicate(to_bytes(stdin))
rc = p.wait()
if rc != expected_rc:
+ if len(args) > 2 and args[0] == 'get' and args[1] == 'item' and b'Not found.' in err:
+ return 'null', ''
raise BitwardenException(err)
return to_text(out, errors='surrogate_or_strict'), to_text(err, errors='surrogate_or_strict')
- def _get_matches(self, search_value, search_field, collection_id):
+ def _get_matches(self, search_value, search_field, collection_id=None):
"""Return matching records whose search_field is equal to key.
"""
# Prepare set of params for Bitwarden CLI
- params = ['list', 'items', '--search', search_value]
+ if search_value:
+ if search_field == 'id':
+ params = ['get', 'item', search_value]
+ else:
+ params = ['list', 'items', '--search', search_value]
+ if collection_id:
+ params.extend(['--collectionid', collection_id])
+ else:
+ if not collection_id:
+ raise AnsibleError("search_value is required if collection_id is not set.")
- if collection_id:
- params.extend(['--collectionid', collection_id])
+ params = ['list', 'items', '--collectionid', collection_id]
out, err = self._run(params)
# This includes things that matched in different fields.
initial_matches = AnsibleJSONDecoder().raw_decode(out)[0]
+ if search_field == 'id' or not search_value:
+ if initial_matches is None:
+ initial_matches = []
+ else:
+ initial_matches = [initial_matches]
+
# Filter to only include results from the right field.
return [item for item in initial_matches if item[search_field] == search_value]
- def get_field(self, field, search_value, search_field="name", collection_id=None):
+ def get_field(self, field, search_value=None, search_field="name", collection_id=None):
"""Return a list of the specified field for records whose search_field match search_value
and filtered by collection if collection has been provided.
If field is None, return the whole record for each match.
"""
matches = self._get_matches(search_value, search_field, collection_id)
-
- if field in ['autofillOnPageLoad', 'password', 'passwordRevisionDate', 'totp', 'uris', 'username']:
- return [match['login'][field] for match in matches]
- elif not field:
+ if not field:
return matches
- else:
- custom_field_matches = []
- for match in matches:
+ field_matches = []
+ for match in matches:
+ # if there are no custom fields, then `match` has no key 'fields'
+ if 'fields' in match:
+ custom_field_found = False
for custom_field in match['fields']:
- if custom_field['name'] == field:
- custom_field_matches.append(custom_field['value'])
- if matches and not custom_field_matches:
- raise AnsibleError("Custom field {field} does not exist in {search_value}".format(field=field, search_value=search_value))
- return custom_field_matches
+ if field == custom_field['name']:
+ field_matches.append(custom_field['value'])
+ custom_field_found = True
+ break
+ if custom_field_found:
+ continue
+ if 'login' in match and field in match['login']:
+ field_matches.append(match['login'][field])
+ continue
+ if field in match:
+ field_matches.append(match[field])
+ continue
+
+ if matches and not field_matches:
+ raise AnsibleError("field {field} does not exist in {search_value}".format(field=field, search_value=search_value))
+
+ return field_matches
class LookupModule(LookupBase):
- def run(self, terms, variables=None, **kwargs):
+ def run(self, terms=None, variables=None, **kwargs):
self.set_options(var_options=variables, direct=kwargs)
field = self.get_option('field')
search_field = self.get_option('search')
collection_id = self.get_option('collection_id')
+ _bitwarden.session = self.get_option('bw_session')
+
if not _bitwarden.unlocked:
raise AnsibleError("Bitwarden Vault locked. Run 'bw unlock'.")
+ if not terms:
+ return [_bitwarden.get_field(field, None, search_field, collection_id)]
+
return [_bitwarden.get_field(field, term, search_field, collection_id) for term in terms]