summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/general/plugins/lookup/bitwarden.py
blob: 7584cd98a6ceff0adfb448644cc9f8379ccbcf2c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# -*- coding: utf-8 -*-
# Copyright (c) 2022, Jonathan Lung <lungj@heresjono.com>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

DOCUMENTATION = """
    name: bitwarden
    author:
      - Jonathan Lung (@lungj) <lungj@heresjono.com>
    requirements:
      - bw (command line utility)
      - be logged into bitwarden
      - bitwarden vault unlocked
      - E(BW_SESSION) environment variable set
    short_description: Retrieve secrets from Bitwarden
    version_added: 5.4.0
    description:
      - Retrieve secrets from Bitwarden.
    options:
      _terms:
        description: Key(s) to fetch values for from login info.
        required: true
        type: list
        elements: str
      search:
        description:
          - Field to retrieve, for example V(name) or V(id).
          - If set to V(id), only zero or one element can be returned.
            Use the Jinja C(first) filter to get the only list element.
          - If set to V(None) or V(''), or if O(_terms) is empty, records are not filtered by fields.
        type: str
        default: name
        version_added: 5.7.0
      field:
        description: Field to fetch. Leave unset to fetch whole response.
        type: str
      collection_id:
        description: Collection ID to filter results by collection. Leave unset to skip filtering.
        type: str
        version_added: 6.3.0
      organization_id:
        description: Organization ID to filter results by organization. Leave unset to skip filtering.
        type: str
        version_added: 8.5.0
      bw_session:
        description: Pass session key instead of reading from env.
        type: str
        version_added: 8.4.0
"""

EXAMPLES = """
- name: "Get 'password' from all Bitwarden records named 'a_test'"
  ansible.builtin.debug:
    msg: >-
      {{ lookup('community.general.bitwarden', 'a_test', field='password') }}

- name: "Get 'password' from Bitwarden record with ID 'bafba515-af11-47e6-abe3-af1200cd18b2'"
  ansible.builtin.debug:
    msg: >-
      {{ lookup('community.general.bitwarden', 'bafba515-af11-47e6-abe3-af1200cd18b2', search='id', field='password') | first }}

- name: "Get 'password' from all Bitwarden records named 'a_test' from collection"
  ansible.builtin.debug:
    msg: >-
      {{ lookup('community.general.bitwarden', 'a_test', field='password', collection_id='bafba515-af11-47e6-abe3-af1200cd18b2') }}

- name: "Get list of all full Bitwarden records named 'a_test'"
  ansible.builtin.debug:
    msg: >-
      {{ lookup('community.general.bitwarden', 'a_test') }}

- name: "Get custom field 'api_key' from all Bitwarden records named 'a_test'"
  ansible.builtin.debug:
    msg: >-
      {{ lookup('community.general.bitwarden', 'a_test', field='api_key') }}

- name: "Get 'password' from all Bitwarden records named 'a_test', using given session key"
  ansible.builtin.debug:
    msg: >-
      {{ lookup('community.general.bitwarden', 'a_test', field='password', bw_session='bXZ9B5TXi6...') }}

- name: "Get all Bitwarden records from collection"
  ansible.builtin.debug:
    msg: >-
      {{ lookup('community.general.bitwarden', None, collection_id='bafba515-af11-47e6-abe3-af1200cd18b2') }}
"""

RETURN = """
  _raw:
    description:
      - A one-element list that contains a list of requested fields or JSON objects of matches.
      - If you use C(query), you get a list of lists. If you use C(lookup) without C(wantlist=true),
        this always gets reduced to a list of field values or JSON objects.
    type: list
    elements: list
"""

from subprocess import Popen, PIPE

from ansible.errors import AnsibleError
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.parsing.ajson import AnsibleJSONDecoder
from ansible.plugins.lookup import LookupBase


class BitwardenException(AnsibleError):
    pass


class Bitwarden(object):

    def __init__(self, path='bw'):
        self._cli_path = path
        self._session = None

    @property
    def cli_path(self):
        return self._cli_path

    @property
    def session(self):
        return self._session

    @session.setter
    def session(self, value):
        self._session = value

    @property
    def unlocked(self):
        out, err = self._run(['status'], stdin="")
        decoded = AnsibleJSONDecoder().raw_decode(out)[0]
        return decoded['status'] == 'unlocked'

    def _run(self, args, stdin=None, expected_rc=0):
        if self.session:
            args += ['--session', self.session]

        p = Popen([self.cli_path] + args, stdout=PIPE, stderr=PIPE, stdin=PIPE)
        out, err = p.communicate(to_bytes(stdin))
        rc = p.wait()
        if rc != expected_rc:
            if len(args) > 2 and args[0] == 'get' and args[1] == 'item' and b'Not found.' in err:
                return 'null', ''
            raise BitwardenException(err)
        return to_text(out, errors='surrogate_or_strict'), to_text(err, errors='surrogate_or_strict')

    def _get_matches(self, search_value, search_field, collection_id=None, organization_id=None):
        """Return matching records whose search_field is equal to key.
        """

        # Prepare set of params for Bitwarden CLI
        if search_field == 'id':
            params = ['get', 'item', search_value]
        else:
            params = ['list', 'items']
            if search_value:
                params.extend(['--search', search_value])

        if collection_id:
            params.extend(['--collectionid', collection_id])
        if organization_id:
            params.extend(['--organizationid', organization_id])

        out, err = self._run(params)

        # This includes things that matched in different fields.
        initial_matches = AnsibleJSONDecoder().raw_decode(out)[0]

        if search_field == 'id':
            if initial_matches is None:
                initial_matches = []
            else:
                initial_matches = [initial_matches]

        # Filter to only include results from the right field.
        return [item for item in initial_matches if not search_value or item[search_field] == search_value]

    def get_field(self, field, search_value, search_field="name", collection_id=None, organization_id=None):
        """Return a list of the specified field for records whose search_field match search_value
        and filtered by collection if collection has been provided.

        If field is None, return the whole record for each match.
        """
        matches = self._get_matches(search_value, search_field, collection_id, organization_id)
        if not field:
            return matches
        field_matches = []
        for match in matches:
            # if there are no custom fields, then `match` has no key 'fields'
            if 'fields' in match:
                custom_field_found = False
                for custom_field in match['fields']:
                    if field == custom_field['name']:
                        field_matches.append(custom_field['value'])
                        custom_field_found = True
                        break
                if custom_field_found:
                    continue
            if 'login' in match and field in match['login']:
                field_matches.append(match['login'][field])
                continue
            if field in match:
                field_matches.append(match[field])
                continue

        if matches and not field_matches:
            raise AnsibleError("field {field} does not exist in {search_value}".format(field=field, search_value=search_value))

        return field_matches


class LookupModule(LookupBase):

    def run(self, terms=None, variables=None, **kwargs):
        self.set_options(var_options=variables, direct=kwargs)
        field = self.get_option('field')
        search_field = self.get_option('search')
        collection_id = self.get_option('collection_id')
        organization_id = self.get_option('organization_id')
        _bitwarden.session = self.get_option('bw_session')

        if not _bitwarden.unlocked:
            raise AnsibleError("Bitwarden Vault locked. Run 'bw unlock'.")

        if not terms:
            terms = [None]

        return [_bitwarden.get_field(field, term, search_field, collection_id, organization_id) for term in terms]


_bitwarden = Bitwarden()