summaryrefslogtreecommitdiffstats
path: root/python/mozrelease/mozrelease/attribute_builds.py
blob: 094c70e1bf14ebaabd093822dc34b48009befde3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#! /usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.

import argparse
import json
import logging
import mmap
import os
import shutil
import struct
import sys
import tempfile
import urllib.parse
from pathlib import Path

logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s")
log = logging.getLogger()


def write_attribution_data(filepath, data):
    """Insert data into a prepared certificate in a signed PE file.

    Returns False if the file isn't a valid PE file, or if the necessary
    certificate was not found.

    This function assumes that somewhere in the given file's certificate table
    there exists a 1024-byte space which begins with the tag "__MOZCUSTOM__:".
    The given data will be inserted into the file following this tag.

    We don't bother updating the optional header checksum.
    Windows doesn't check it for executables, only drivers and certain DLL's.
    """
    with open(filepath, "r+b") as file:
        mapped = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_WRITE)

        # Get the location of the PE header and the optional header
        pe_header_offset = struct.unpack("<I", mapped[0x3C:0x40])[0]
        optional_header_offset = pe_header_offset + 24

        # Look up the magic number in the optional header,
        # so we know if we have a 32 or 64-bit executable.
        # We need to know that so that we can find the data directories.
        pe_magic_number = struct.unpack(
            "<H", mapped[optional_header_offset : optional_header_offset + 2]
        )[0]
        if pe_magic_number == 0x10B:
            # 32-bit
            cert_dir_entry_offset = optional_header_offset + 128
        elif pe_magic_number == 0x20B:
            # 64-bit. Certain header fields are wider.
            cert_dir_entry_offset = optional_header_offset + 144
        else:
            # Not any known PE format
            mapped.close()
            return False

        # The certificate table offset and length give us the valid range
        # to search through for where we should put our data.
        cert_table_offset = struct.unpack(
            "<I", mapped[cert_dir_entry_offset : cert_dir_entry_offset + 4]
        )[0]
        cert_table_size = struct.unpack(
            "<I", mapped[cert_dir_entry_offset + 4 : cert_dir_entry_offset + 8]
        )[0]

        if cert_table_offset == 0 or cert_table_size == 0:
            # The file isn't signed
            mapped.close()
            return False

        tag = b"__MOZCUSTOM__:"
        tag_index = mapped.find(
            tag, cert_table_offset, cert_table_offset + cert_table_size
        )
        if tag_index == -1:
            mapped.close()
            return False

        # convert to quoted-url byte-string for insertion
        data = urllib.parse.quote(data).encode("utf-8")
        mapped[tag_index + len(tag) : tag_index + len(tag) + len(data)] = data

        return True


def validate_attribution_code(attribution):
    log.info("Checking attribution %s" % attribution)
    return_code = True

    if len(attribution) == 0:
        log.error("Attribution code has 0 length")
        return False

    # Set to match https://searchfox.org/mozilla-central/rev/a92ed79b0bc746159fc31af1586adbfa9e45e264/browser/components/attribution/AttributionCode.jsm#24  # noqa
    MAX_LENGTH = 1010
    if len(attribution) > MAX_LENGTH:
        log.error("Attribution code longer than %s chars" % MAX_LENGTH)
        return_code = False

    # this leaves out empty values like 'foo='
    params = urllib.parse.parse_qsl(attribution)
    used_keys = set()
    for key, value in params:
        # check for invalid keys
        if key not in (
            "source",
            "medium",
            "campaign",
            "content",
            "experiment",
            "variation",
            "ua",
            "dlsource",
        ):
            log.error("Invalid key %s" % key)
            return_code = False

        # avoid ambiguity from repeated keys
        if key in used_keys:
            log.error("Repeated key %s" % key)
            return_code = False
        else:
            used_keys.add(key)

        # TODO the service checks for valid source, should we do that here too ?

    # We have two types of attribution with different requirements:
    # 1) Partner attribution, which requires a few UTM parameters sets
    # 2) Attribution of vanilla builds, which only requires `dlsource`
    #
    # Perhaps in an ideal world we would check what type of build we're
    # attributing to make sure that eg: partner builds don't get `dlsource`
    # instead of what they actually want -- but the likelyhood of that
    # happening is vanishingly small, so it's probably not worth doing.
    if "dlsource" not in used_keys:
        for key in ("source", "medium", "campaign", "content"):
            if key not in used_keys:
                return_code = False

    if return_code is False:
        log.error(
            "Either 'dlsource' must be provided, or all of: 'source', 'medium', 'campaign', and 'content'. Use '(not set)' if one of the latter is not needed."
        )
    return return_code


def main():
    parser = argparse.ArgumentParser(
        description="Add attribution to Windows installer(s).",
        epilog="""
        By default, configuration from envvar ATTRIBUTION_CONFIG is used, with
        expected format
          [{"input": "in/abc.exe", "output": "out/def.exe", "attribution": "abcdef"},
           {"input": "in/ghi.exe", "output": "out/jkl.exe", "attribution": "ghijkl"}]
        for 1 or more attributions. Or the script arguments may be used for a single attribution.

        The attribution code should be a string which is not url-encoded.

        If command line arguments are used instead, one or more `--input` parameters may be provided.
        Each will be written to the `--output` directory provided to a file of the same name as the
        input filename. All inputs will be attributed with the same `--attribution` code.
        """,
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    parser.add_argument(
        "--input",
        default=[],
        action="append",
        help="Source installer to attribute; may be specified multiple times",
    )
    parser.add_argument("--output", help="Location to write the attributed installers")
    parser.add_argument("--attribution", help="Attribution code")
    args = parser.parse_args()

    if os.environ.get("ATTRIBUTION_CONFIG"):
        work = json.loads(os.environ["ATTRIBUTION_CONFIG"])
    elif args.input and args.output and args.attribution:
        work = []
        for i in args.input:
            fn = os.path.basename(i)
            work.append(
                {
                    "input": i,
                    "output": os.path.join(args.output, fn),
                    "attribution": args.attribution,
                }
            )
    else:
        log.error("No configuration found. Set ATTRIBUTION_CONFIG or pass arguments.")
        return 1

    cached_code_checks = []
    for job in work:
        if job["attribution"] not in cached_code_checks:
            status = validate_attribution_code(job["attribution"])
            if status:
                cached_code_checks.append(job["attribution"])
            else:
                log.error("Failed attribution code check")
                return 1

        with tempfile.TemporaryDirectory() as td:
            log.info("Attributing installer %s ..." % job["input"])
            tf = shutil.copy(job["input"], td)
            if write_attribution_data(tf, job["attribution"]):
                Path(job["output"]).parent.mkdir(parents=True, exist_ok=True)
                shutil.move(tf, job["output"])
                log.info("Wrote %s" % job["output"])


if __name__ == "__main__":
    sys.exit(main())