summaryrefslogtreecommitdiffstats
path: root/testing/mozharness/mozharness/mozilla/repo_manipulation.py
blob: 3a5712fadb7b272cbbcd1464344989504129d3e5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import six

# pylint --py3k: W1648
if six.PY2:
    from ConfigParser import ConfigParser
else:
    from configparser import ConfigParser

import json
import os

from mozharness.base.errors import HgErrorList
from mozharness.base.log import FATAL, INFO
from mozharness.base.vcs.mercurial import MercurialVCS


class MercurialRepoManipulationMixin(object):
    def get_version(self, repo_root, version_file="browser/config/version.txt"):
        version_path = os.path.join(repo_root, version_file)
        contents = self.read_from_file(version_path, error_level=FATAL)
        lines = [l for l in contents.splitlines() if l and not l.startswith("#")]
        return lines[-1].split(".")

    def replace(self, file_name, from_, to_):
        """Replace text in a file."""
        text = self.read_from_file(file_name, error_level=FATAL)
        new_text = text.replace(from_, to_)
        if text == new_text:
            self.fatal("Cannot replace '%s' to '%s' in '%s'" % (from_, to_, file_name))
        self.write_to_file(file_name, new_text, error_level=FATAL)

    def query_hg_revision(self, path):
        """Avoid making 'pull' a required action every run, by being able
        to fall back to figuring out the revision from the cloned repo
        """
        m = MercurialVCS(log_obj=self.log_obj, config=self.config)
        revision = m.get_revision_from_path(path)
        return revision

    def hg_commit(self, cwd, message, user=None, ignore_no_changes=False):
        """Commit changes to hg."""
        cmd = self.query_exe("hg", return_type="list") + ["commit", "-m", message]
        if user:
            cmd.extend(["-u", user])
        success_codes = [0]
        if ignore_no_changes:
            success_codes.append(1)
        self.run_command(
            cmd,
            cwd=cwd,
            error_list=HgErrorList,
            halt_on_failure=True,
            success_codes=success_codes,
        )
        return self.query_hg_revision(cwd)

    def clean_repos(self):
        """We may end up with contaminated local repos at some point, but
        we don't want to have to clobber and reclone from scratch every
        time.

        This is an attempt to clean up the local repos without needing a
        clobber.
        """
        dirs = self.query_abs_dirs()
        hg = self.query_exe("hg", return_type="list")
        hg_repos = self.query_repos()
        hg_strip_error_list = [
            {
                "substr": r"""abort: empty revision set""",
                "level": INFO,
                "explanation": "Nothing to clean up; we're good!",
            }
        ] + HgErrorList
        for repo_config in hg_repos:
            repo_name = repo_config["dest"]
            repo_path = os.path.join(dirs["abs_work_dir"], repo_name)
            if os.path.exists(repo_path):
                # hg up -C to discard uncommitted changes
                self.run_command(
                    hg + ["up", "-C", "-r", repo_config["branch"]],
                    cwd=repo_path,
                    error_list=HgErrorList,
                    halt_on_failure=True,
                )
                # discard unpushed commits
                status = self.retry(
                    self.run_command,
                    args=(
                        hg
                        + [
                            "--config",
                            "extensions.mq=",
                            "strip",
                            "--no-backup",
                            "outgoing()",
                        ],
                    ),
                    kwargs={
                        "cwd": repo_path,
                        "error_list": hg_strip_error_list,
                        "return_type": "num_errors",
                        "success_codes": (0, 255),
                    },
                )
                if status not in [0, 255]:
                    self.fatal("Issues stripping outgoing revisions!")
                # 2nd hg up -C to make sure we're not on a stranded head
                # which can happen when reverting debugsetparents
                self.run_command(
                    hg + ["up", "-C", "-r", repo_config["branch"]],
                    cwd=repo_path,
                    error_list=HgErrorList,
                    halt_on_failure=True,
                )

    def commit_changes(self):
        """Do the commit."""
        hg = self.query_exe("hg", return_type="list")
        for cwd in self.query_commit_dirs():
            self.run_command(hg + ["diff"], cwd=cwd)
            self.hg_commit(
                cwd,
                user=self.config["hg_user"],
                message=self.query_commit_message(),
                ignore_no_changes=self.config.get("ignore_no_changes", False),
            )
        self.info(
            "Now verify |hg out| and |hg out --patch| if you're paranoid, and --push"
        )

    def hg_tag(
        self,
        cwd,
        tags,
        user=None,
        message=None,
        revision=None,
        force=None,
        halt_on_failure=True,
    ):
        if isinstance(tags, six.string_types):
            tags = [tags]
        cmd = self.query_exe("hg", return_type="list") + ["tag"]
        if not message:
            message = "No bug - Tagging %s" % os.path.basename(cwd)
            if revision:
                message = "%s %s" % (message, revision)
            message = "%s with %s" % (message, ", ".join(tags))
            message += " a=release DONTBUILD CLOSED TREE"
        self.info(message)
        cmd.extend(["-m", message])
        if user:
            cmd.extend(["-u", user])
        if revision:
            cmd.extend(["-r", revision])
        if force:
            cmd.append("-f")
        cmd.extend(tags)
        return self.run_command(
            cmd, cwd=cwd, halt_on_failure=halt_on_failure, error_list=HgErrorList
        )

    def query_existing_tags(self, cwd, halt_on_failure=True):
        cmd = self.query_exe("hg", return_type="list") + ["tags"]
        existing_tags = {}
        output = self.get_output_from_command(
            cmd, cwd=cwd, halt_on_failure=halt_on_failure
        )
        for line in output.splitlines():
            parts = line.split(" ")
            if len(parts) > 1:
                # existing_tags = {TAG: REVISION, ...}
                existing_tags[parts[0]] = parts[-1].split(":")[-1]
        self.info(
            "existing_tags:\n{}".format(
                json.dumps(existing_tags, sort_keys=True, indent=4)
            )
        )
        return existing_tags

    def push(self):
        """"""
        error_message = """Push failed!  If there was a push race, try rerunning
the script (--clean-repos --pull --migrate).  The second run will be faster."""
        hg = self.query_exe("hg", return_type="list")
        for cwd in self.query_push_dirs():
            if not cwd:
                self.warning("Skipping %s" % cwd)
                continue
            push_cmd = hg + ["push"] + self.query_push_args(cwd)
            if self.config.get("push_dest"):
                push_cmd.append(self.config["push_dest"])
            status = self.run_command(
                push_cmd,
                cwd=cwd,
                error_list=HgErrorList,
                success_codes=[0, 1],
            )
            if status == 1:
                self.warning("No changes for %s!" % cwd)
            elif status:
                self.fatal(error_message)

    def edit_repo_hg_rc(self, cwd, section, key, value):
        hg_rc = self.read_repo_hg_rc(cwd)
        hg_rc.set(section, key, value)

        with open(self._get_hg_rc_path(cwd), "wb") as f:
            hg_rc.write(f)

    def read_repo_hg_rc(self, cwd):
        hg_rc = ConfigParser()
        hg_rc.read(self._get_hg_rc_path(cwd))
        return hg_rc

    def _get_hg_rc_path(self, cwd):
        return os.path.join(cwd, ".hg", "hgrc")