path: root/src/arrow/dev/archery/archery/crossbow/
diff options
Diffstat (limited to 'src/arrow/dev/archery/archery/crossbow/')
1 files changed, 1172 insertions, 0 deletions
diff --git a/src/arrow/dev/archery/archery/crossbow/ b/src/arrow/dev/archery/archery/crossbow/
new file mode 100644
index 000000000..0f2309e47
--- /dev/null
+++ b/src/arrow/dev/archery/archery/crossbow/
@@ -0,0 +1,1172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+import re
+import fnmatch
+import glob
+import time
+import logging
+import mimetypes
+import subprocess
+import textwrap
+from io import StringIO
+from pathlib import Path
+from datetime import date
+import jinja2
+from ruamel.yaml import YAML
+ import github3
+ _have_github3 = True
+except ImportError:
+ github3 = object
+ _have_github3 = False
+ import pygit2
+except ImportError:
+ PygitRemoteCallbacks = object
+ PygitRemoteCallbacks = pygit2.RemoteCallbacks
+from ..utils.source import ArrowSources
+for pkg in ["requests", "urllib3", "github3"]:
+ logging.getLogger(pkg).setLevel(logging.WARNING)
+logger = logging.getLogger("crossbow")
+class CrossbowError(Exception):
+ pass
+def _flatten(mapping):
+ """Converts a hierarchical mapping to a flat dictionary"""
+ result = {}
+ for k, v in mapping.items():
+ if isinstance(v, dict):
+ for ik, iv in _flatten(v).items():
+ ik = ik if isinstance(ik, tuple) else (ik,)
+ result[(k,) + ik] = iv
+ elif isinstance(v, list):
+ for ik, iv in enumerate(_flatten(v)):
+ ik = ik if isinstance(ik, tuple) else (ik,)
+ result[(k,) + ik] = iv
+ else:
+ result[(k,)] = v
+ return result
+def _unflatten(mapping):
+ """Converts a flat tuple => object mapping to hierarchical one"""
+ result = {}
+ for path, value in mapping.items():
+ parents, leaf = path[:-1], path[-1]
+ # create the hierarchy until we reach the leaf value
+ temp = result
+ for parent in parents:
+ temp.setdefault(parent, {})
+ temp = temp[parent]
+ # set the leaf value
+ temp[leaf] = value
+ return result
+def _unflatten_tree(files):
+ """Converts a flat path => object mapping to a hierarchical directories
+ Input:
+ {
+ 'path/to/file.a': a_content,
+ 'path/to/file.b': b_content,
+ 'path/file.c': c_content
+ }
+ Output:
+ {
+ 'path': {
+ 'to': {
+ 'file.a': a_content,
+ 'file.b': b_content
+ },
+ 'file.c': c_content
+ }
+ }
+ """
+ files = {tuple(k.split('/')): v for k, v in files.items()}
+ return _unflatten(files)
+def _render_jinja_template(searchpath, template, params):
+ def format_all(items, pattern):
+ return [pattern.format(item) for item in items]
+ loader = jinja2.FileSystemLoader(searchpath)
+ env = jinja2.Environment(loader=loader, trim_blocks=True,
+ lstrip_blocks=True,
+ undefined=jinja2.StrictUndefined)
+ env.filters['format_all'] = format_all
+ template = env.get_template(template)
+ return template.render(**params)
+# configurations for setting up branch skipping
+# - appveyor has a feature to skip builds without an appveyor.yml
+# - travis reads from the master branch and applies the rules
+# - circle requires the configuration to be present on all branch, even ones
+# that are configured to be skipped
+# - azure skips branches without azure-pipelines.yml by default
+# - github skips branches without .github/workflows/ by default
+_default_travis_yml = """
+ only:
+ - master
+ - /.*-travis-.*/
+os: linux
+dist: trusty
+language: generic
+_default_circle_yml = """
+version: 2
+ build:
+ machine: true
+ version: 2
+ build:
+ jobs:
+ - build:
+ filters:
+ branches:
+ only:
+ - /.*-circle-.*/
+_default_tree = {
+ '.travis.yml': _default_travis_yml,
+ '.circleci/config.yml': _default_circle_yml
+class GitRemoteCallbacks(PygitRemoteCallbacks):
+ def __init__(self, token):
+ self.token = token
+ self.attempts = 0
+ super().__init__()
+ def push_update_reference(self, refname, message):
+ pass
+ def update_tips(self, refname, old, new):
+ pass
+ def credentials(self, url, username_from_url, allowed_types):
+ # its a libgit2 bug, that it infinitely retries the authentication
+ self.attempts += 1
+ if self.attempts >= 5:
+ # pygit2 doesn't propagate the exception properly
+ msg = 'Wrong oauth personal access token'
+ print(msg)
+ raise CrossbowError(msg)
+ if (allowed_types &
+ return pygit2.UserPass(self.token, 'x-oauth-basic')
+ else:
+ return None
+def _git_ssh_to_https(url):
+ return url.replace('', '')
+class Repo:
+ """
+ Base class for interaction with local git repositories
+ A high level wrapper used for both reading revision information from
+ arrow's repository and pushing continuous integration tasks to the queue
+ repository.
+ Parameters
+ ----------
+ require_https : boolean, default False
+ Raise exception for SSH origin URLs
+ """
+ def __init__(self, path, github_token=None, remote_url=None,
+ require_https=False):
+ self.path = Path(path)
+ self.github_token = github_token
+ self.require_https = require_https
+ self._remote_url = remote_url
+ self._pygit_repo = None
+ self._github_repo = None # set by as_github_repo()
+ self._updated_refs = []
+ def __str__(self):
+ tpl = textwrap.dedent('''
+ Repo: {remote}@{branch}
+ Commit: {head}
+ ''')
+ return tpl.format(
+ remote=self.remote_url,
+ branch=self.branch.branch_name,
+ head=self.head
+ )
+ @property
+ def repo(self):
+ if self._pygit_repo is None:
+ self._pygit_repo = pygit2.Repository(str(self.path))
+ return self._pygit_repo
+ @property
+ def origin(self):
+ remote = self.repo.remotes['origin']
+ if self.require_https and remote.url.startswith(''):
+ raise CrossbowError("Change SSH origin URL to HTTPS to use "
+ "Crossbow: {}".format(remote.url))
+ return remote
+ def fetch(self):
+ refspec = '+refs/heads/*:refs/remotes/origin/*'
+ self.origin.fetch([refspec])
+ def push(self, refs=None, github_token=None):
+ github_token = github_token or self.github_token
+ if github_token is None:
+ raise RuntimeError(
+ 'Could not determine GitHub token. Please set the '
+ 'CROSSBOW_GITHUB_TOKEN environment variable to a '
+ 'valid GitHub access token or pass one to --github-token.'
+ )
+ callbacks = GitRemoteCallbacks(github_token)
+ refs = refs or []
+ try:
+ self.origin.push(refs + self._updated_refs, callbacks=callbacks)
+ except pygit2.GitError:
+ raise RuntimeError('Failed to push updated references, '
+ 'potentially because of credential issues: {}'
+ .format(self._updated_refs))
+ else:
+ self.updated_refs = []
+ @property
+ def head(self):
+ """Currently checked out commit's sha"""
+ return self.repo.head
+ @property
+ def branch(self):
+ """Currently checked out branch"""
+ try:
+ return self.repo.branches[self.repo.head.shorthand]
+ except KeyError:
+ return None # detached
+ @property
+ def remote(self):
+ """Currently checked out branch's remote counterpart"""
+ try:
+ return self.repo.remotes[self.branch.upstream.remote_name]
+ except (AttributeError, KeyError):
+ return None # cannot detect
+ @property
+ def remote_url(self):
+ """Currently checked out branch's remote counterpart URL
+ If an SSH github url is set, it will be replaced by the https
+ equivalent usable with GitHub OAuth token.
+ """
+ try:
+ return self._remote_url or _git_ssh_to_https(self.remote.url)
+ except AttributeError:
+ return None
+ @property
+ def user_name(self):
+ try:
+ return next(self.repo.config.get_multivar(''))
+ except StopIteration:
+ return os.environ.get('GIT_COMMITTER_NAME', 'unknown')
+ @property
+ def user_email(self):
+ try:
+ return next(self.repo.config.get_multivar(''))
+ except StopIteration:
+ return os.environ.get('GIT_COMMITTER_EMAIL', 'unknown')
+ @property
+ def signature(self):
+ return pygit2.Signature(self.user_name, self.user_email,
+ int(time.time()))
+ def create_tree(self, files):
+ builder = self.repo.TreeBuilder()
+ for filename, content in files.items():
+ if isinstance(content, dict):
+ # create a subtree
+ tree_id = self.create_tree(content)
+ builder.insert(filename, tree_id, pygit2.GIT_FILEMODE_TREE)
+ else:
+ # create a file
+ blob_id = self.repo.create_blob(content)
+ builder.insert(filename, blob_id, pygit2.GIT_FILEMODE_BLOB)
+ tree_id = builder.write()
+ return tree_id
+ def create_commit(self, files, parents=None, message='',
+ reference_name=None):
+ if parents is None:
+ # by default use the main branch as the base of the new branch
+ # required to reuse github actions cache across crossbow tasks
+ commit, _ = self.repo.resolve_refish("master")
+ parents = []
+ tree_id = self.create_tree(files)
+ author = committer = self.signature
+ commit_id = self.repo.create_commit(reference_name, author, committer,
+ message, tree_id, parents)
+ return self.repo[commit_id]
+ def create_branch(self, branch_name, files, parents=None, message='',
+ signature=None):
+ # create commit with the passed tree
+ commit = self.create_commit(files, parents=parents, message=message)
+ # create branch pointing to the previously created commit
+ branch = self.repo.create_branch(branch_name, commit)
+ # append to the pushable references
+ self._updated_refs.append('refs/heads/{}'.format(branch_name))
+ return branch
+ def create_tag(self, tag_name, commit_id, message=''):
+ tag_id = self.repo.create_tag(tag_name, commit_id,
+ pygit2.GIT_OBJ_COMMIT, self.signature,
+ message)
+ # append to the pushable references
+ self._updated_refs.append('refs/tags/{}'.format(tag_name))
+ return self.repo[tag_id]
+ def file_contents(self, commit_id, file):
+ commit = self.repo[commit_id]
+ entry = commit.tree[file]
+ blob = self.repo[]
+ return
+ def _parse_github_user_repo(self):
+ m = re.match(r'.*\/([^\/]+)\/([^\/\.]+)(\.git)?$', self.remote_url)
+ if m is None:
+ raise CrossbowError(
+ "Unable to parse the github owner and repository from the "
+ "repository's remote url '{}'".format(self.remote_url)
+ )
+ user, repo =,
+ return user, repo
+ def as_github_repo(self, github_token=None):
+ """Converts it to a repository object which wraps the GitHub API"""
+ if self._github_repo is None:
+ if not _have_github3:
+ raise ImportError('Must install')
+ github_token = github_token or self.github_token
+ username, reponame = self._parse_github_user_repo()
+ session = github3.session.GitHubSession(
+ default_connect_timeout=10,
+ default_read_timeout=30
+ )
+ github = github3.GitHub(session=session)
+ github.login(token=github_token)
+ self._github_repo = github.repository(username, reponame)
+ return self._github_repo
+ def github_commit(self, sha):
+ repo = self.as_github_repo()
+ return repo.commit(sha)
+ def github_release(self, tag):
+ repo = self.as_github_repo()
+ try:
+ return repo.release_from_tag(tag)
+ except github3.exceptions.NotFoundError:
+ return None
+ def github_upload_asset_requests(self, release, path, name, mime,
+ max_retries=None, retry_backoff=None):
+ if max_retries is None:
+ max_retries = int(os.environ.get('CROSSBOW_MAX_RETRIES', 8))
+ if retry_backoff is None:
+ retry_backoff = int(os.environ.get('CROSSBOW_RETRY_BACKOFF', 5))
+ for i in range(max_retries):
+ try:
+ with open(path, 'rb') as fp:
+ result = release.upload_asset(name=name, asset=fp,
+ content_type=mime)
+ except github3.exceptions.ResponseError as e:
+ logger.error('Attempt {} has failed with message: {}.'
+ .format(i + 1, str(e)))
+ logger.error('Error message {}'.format(e.msg))
+ logger.error('List of errors provided by Github:')
+ for err in e.errors:
+ logger.error(' - {}'.format(err))
+ if e.code == 422:
+ # 422 Validation Failed, probably raised because
+ # ReleaseAsset already exists, so try to remove it before
+ # reattempting the asset upload
+ for asset in release.assets():
+ if == name:
+'Release asset {} already exists, '
+ 'removing it...'.format(name))
+ asset.delete()
+'Asset {} removed.'.format(name))
+ break
+ except github3.exceptions.ConnectionError as e:
+ logger.error('Attempt {} has failed with message: {}.'
+ .format(i + 1, str(e)))
+ else:
+'Attempt {} has finished.'.format(i + 1))
+ return result
+ time.sleep(retry_backoff)
+ raise RuntimeError('Github asset uploading has failed!')
+ def github_upload_asset_curl(self, release, path, name, mime):
+ upload_url, _ = release.upload_url.split('{?')
+ upload_url += '?name={}'.format(name)
+ command = [
+ 'curl',
+ '--fail',
+ '-H', "Authorization: token {}".format(self.github_token),
+ '-H', "Content-Type: {}".format(mime),
+ '--data-binary', '@{}'.format(path),
+ upload_url
+ ]
+ return, shell=False, check=True)
+ def github_overwrite_release_assets(self, tag_name, target_commitish,
+ patterns, method='requests'):
+ # Since github has changed something the asset uploading via requests
+ # got instable, so prefer the cURL alternative.
+ # Potential cause:
+ # sigmavirus24/
+ repo = self.as_github_repo()
+ if not tag_name:
+ raise CrossbowError('Empty tag name')
+ if not target_commitish:
+ raise CrossbowError('Empty target commit for the release tag')
+ # remove the whole release if it already exists
+ try:
+ release = repo.release_from_tag(tag_name)
+ except github3.exceptions.NotFoundError:
+ pass
+ else:
+ release.delete()
+ release = repo.create_release(tag_name, target_commitish)
+ for pattern in patterns:
+ for path in glob.glob(pattern, recursive=True):
+ name = os.path.basename(path)
+ size = os.path.getsize(path)
+ mime = mimetypes.guess_type(name)[0] or 'application/zip'
+ 'Uploading asset `{}` with mimetype {} and size {}...'
+ .format(name, mime, size)
+ )
+ if method == 'requests':
+ self.github_upload_asset_requests(release, path, name=name,
+ mime=mime)
+ elif method == 'curl':
+ self.github_upload_asset_curl(release, path, name=name,
+ mime=mime)
+ else:
+ raise CrossbowError(
+ 'Unsupported upload method {}'.format(method)
+ )
+class Queue(Repo):
+ def _latest_prefix_id(self, prefix):
+ pattern = re.compile(r'[\w\/-]*{}-(\d+)'.format(prefix))
+ matches = list(filter(None, map(pattern.match, self.repo.branches)))
+ if matches:
+ latest = max(int( for m in matches)
+ else:
+ latest = -1
+ return latest
+ def _next_job_id(self, prefix):
+ """Auto increments the branch's identifier based on the prefix"""
+ latest_id = self._latest_prefix_id(prefix)
+ return '{}-{}'.format(prefix, latest_id + 1)
+ def latest_for_prefix(self, prefix):
+ latest_id = self._latest_prefix_id(prefix)
+ if latest_id < 0:
+ raise RuntimeError(
+ 'No job has been submitted with prefix {} yet'.format(prefix)
+ )
+ job_name = '{}-{}'.format(prefix, latest_id)
+ return self.get(job_name)
+ def date_of(self, job):
+ # it'd be better to bound to the queue repository on deserialization
+ # and reorganize these methods to Job
+ branch_name = 'origin/{}'.format(job.branch)
+ branch = self.repo.branches[branch_name]
+ commit = self.repo[]
+ return date.fromtimestamp(commit.commit_time)
+ def jobs(self, pattern):
+ """Return jobs sorted by its identifier in reverse order"""
+ job_names = []
+ for name in self.repo.branches.remote:
+ origin, name = name.split('/', 1)
+ result = re.match(pattern, name)
+ if result:
+ job_names.append(name)
+ for name in sorted(job_names, reverse=True):
+ yield self.get(name)
+ def get(self, job_name):
+ branch_name = 'origin/{}'.format(job_name)
+ branch = self.repo.branches[branch_name]
+ try:
+ content = self.file_contents(, 'job.yml')
+ except KeyError:
+ raise CrossbowError(
+ 'No job is found with name: {}'.format(job_name)
+ )
+ buffer = StringIO(content.decode('utf-8'))
+ job = yaml.load(buffer)
+ job.queue = self
+ return job
+ def put(self, job, prefix='build'):
+ if not isinstance(job, Job):
+ raise CrossbowError('`job` must be an instance of Job')
+ if job.branch is not None:
+ raise CrossbowError('`job.branch` is automatically generated, '
+ 'thus it must be blank')
+ if is None:
+ raise CrossbowError(
+ 'Cannot determine git remote for the Arrow repository to '
+ 'clone or push to, try to push the `{}` branch first to have '
+ 'a remote tracking counterpart.'.format(
+ )
+ if is None:
+ raise CrossbowError(
+ 'Cannot determine the current branch of the Arrow repository '
+ 'to clone or push to, perhaps it is in detached HEAD state. '
+ 'Please checkout a branch.'
+ )
+ # auto increment and set next job id, e.g. build-85
+ job._queue = self
+ job.branch = self._next_job_id(prefix)
+ # create tasks' branches
+ for task_name, task in job.tasks.items():
+ # adding CI's name to the end of the branch in order to use skip
+ # patterns on travis and circleci
+ task.branch = '{}-{}-{}'.format(job.branch,, task_name)
+ params = {
+ **job.params,
+ "arrow":,
+ "queue_remote_url": self.remote_url
+ }
+ files = task.render_files(job.template_searchpath, params=params)
+ branch = self.create_branch(task.branch, files=files)
+ self.create_tag(task.tag,
+ task.commit = str(
+ # create job's branch with its description
+ return self.create_branch(job.branch, files=job.render_files())
+def get_version(root, **kwargs):
+ """
+ Parse function for setuptools_scm that ignores tags for non-C++
+ subprojects, e.g. apache-arrow-js-XXX tags.
+ """
+ from setuptools_scm.git import parse as parse_git_version
+ # query the calculated version based on the git tags
+ kwargs['describe_command'] = (
+ 'git describe --dirty --tags --long --match "apache-arrow-[0-9].*"'
+ )
+ version = parse_git_version(root, **kwargs)
+ tag = str(version.tag)
+ # We may get a development tag for the next version, such as "5.0.0.dev0",
+ # or the tag of an already released version, such as "4.0.0".
+ # In the latter case, we need to increment the version so that the computed
+ # version comes after any patch release (the next feature version after
+ # 4.0.0 is 5.0.0).
+ pattern = r"^(\d+)\.(\d+)\.(\d+)"
+ match = re.match(pattern, tag)
+ major, minor, patch = map(int, match.groups())
+ if 'dev' not in tag:
+ major += 1
+ return "{}.{}.{}.dev{}".format(major, minor, patch, version.distance)
+class Serializable:
+ @classmethod
+ def to_yaml(cls, representer, data):
+ tag = '!{}'.format(cls.__name__)
+ dct = {k: v for k, v in data.__dict__.items() if not k.startswith('_')}
+ return representer.represent_mapping(tag, dct)
+class Target(Serializable):
+ """
+ Describes target repository and revision the builds run against
+ This serializable data container holding information about arrow's
+ git remote, branch, sha and version number as well as some metadata
+ (currently only an email address where the notification should be sent).
+ """
+ def __init__(self, head, branch, remote, version, email=None):
+ self.head = head
+ = email
+ self.branch = branch
+ self.remote = remote
+ self.version = version
+ self.no_rc_version = re.sub(r'-rc\d+\Z', '', version)
+ # Semantic Versioning 1.0.0:
+ #
+ # > A pre-release version number MAY be denoted by appending an
+ # > arbitrary string immediately following the patch version and a
+ # > dash. The string MUST be comprised of only alphanumerics plus
+ # > dash [0-9A-Za-z-].
+ #
+ # Example:
+ #
+ # '0.16.1.dev10' ->
+ # '0.16.1-dev10'
+ self.no_rc_semver_version = \
+ re.sub(r'\.(dev\d+)\Z', r'-\1', self.no_rc_version)
+ @classmethod
+ def from_repo(cls, repo, head=None, branch=None, remote=None, version=None,
+ email=None):
+ """Initialize from a repository
+ Optionally override detected remote, branch, head, and/or version.
+ """
+ assert isinstance(repo, Repo)
+ if head is None:
+ head = str(
+ if branch is None:
+ branch = repo.branch.branch_name
+ if remote is None:
+ remote = repo.remote_url
+ if version is None:
+ version = get_version(repo.path)
+ if email is None:
+ email = repo.user_email
+ return cls(head=head, email=email, branch=branch, remote=remote,
+ version=version)
+class Task(Serializable):
+ """
+ Describes a build task and metadata required to render CI templates
+ A task is represented as a single git commit and branch containing jinja2
+ rendered files (currently appveyor.yml or .travis.yml configurations).
+ A task can't be directly submitted to a queue, must belong to a job.
+ Each task's unique identifier is its branch name, which is generated after
+ submitting the job to a queue.
+ """
+ def __init__(self, ci, template, artifacts=None, params=None):
+ assert ci in {
+ 'circle',
+ 'travis',
+ 'appveyor',
+ 'azure',
+ 'github',
+ 'drone',
+ }
+ = ci
+ self.template = template
+ self.artifacts = artifacts or []
+ self.params = params or {}
+ self.branch = None # filled after adding to a queue
+ self.commit = None # filled after adding to a queue
+ self._queue = None # set by the queue object after put or get
+ self._status = None # status cache
+ self._assets = None # assets cache
+ def render_files(self, searchpath, params=None):
+ params = {**self.params, **(params or {}), "task": self}
+ try:
+ rendered = _render_jinja_template(searchpath, self.template,
+ params=params)
+ except jinja2.TemplateError as e:
+ raise RuntimeError(
+ 'Failed to render template `{}` with {}: {}'.format(
+ self.template, e.__class__.__name__, str(e)
+ )
+ )
+ tree = {**_default_tree, self.filename: rendered}
+ return _unflatten_tree(tree)
+ @property
+ def tag(self):
+ return self.branch
+ @property
+ def filename(self):
+ config_files = {
+ 'circle': '.circleci/config.yml',
+ 'travis': '.travis.yml',
+ 'appveyor': 'appveyor.yml',
+ 'azure': 'azure-pipelines.yml',
+ 'github': '.github/workflows/crossbow.yml',
+ 'drone': '.drone.yml',
+ }
+ return config_files[]
+ def status(self, force_query=False):
+ _status = getattr(self, '_status', None)
+ if force_query or _status is None:
+ github_commit = self._queue.github_commit(self.commit)
+ self._status = TaskStatus(github_commit)
+ return self._status
+ def assets(self, force_query=False, validate_patterns=True):
+ _assets = getattr(self, '_assets', None)
+ if force_query or _assets is None:
+ github_release = self._queue.github_release(self.tag)
+ self._assets = TaskAssets(github_release,
+ artifact_patterns=self.artifacts,
+ validate_patterns=validate_patterns)
+ return self._assets
+class TaskStatus:
+ """
+ Combine the results from status and checks API to a single state.
+ Azure pipelines uses checks API which doesn't provide a combined
+ interface like status API does, so we need to manually combine
+ both the commit statuses and the commit checks coming from
+ different API endpoint
+ Status.state: error, failure, pending or success, default pending
+ CheckRun.status: queued, in_progress or completed, default: queued
+ CheckRun.conclusion: success, failure, neutral, cancelled, timed_out
+ or action_required, only set if
+ CheckRun.status == 'completed'
+ 1. Convert CheckRun's status and conclusion to one of Status.state
+ 2. Merge the states based on the following rules:
+ - failure if any of the contexts report as error or failure
+ - pending if there are no statuses or a context is pending
+ - success if the latest status for all contexts is success
+ error otherwise.
+ Parameters
+ ----------
+ commit : github3.Commit
+ Commit to query the combined status for.
+ Returns
+ -------
+ TaskStatus(
+ combined_state='error|failure|pending|success',
+ github_status='original github status object',
+ github_check_runs='github checks associated with the commit',
+ total_count='number of statuses and checks'
+ )
+ """
+ def __init__(self, commit):
+ status = commit.status()
+ check_runs = list(commit.check_runs())
+ states = [s.state for s in status.statuses]
+ for check in check_runs:
+ if check.status == 'completed':
+ if check.conclusion in {'success', 'failure'}:
+ states.append(check.conclusion)
+ elif check.conclusion in {'cancelled', 'timed_out',
+ 'action_required'}:
+ states.append('error')
+ # omit `neutral` conclusion
+ else:
+ states.append('pending')
+ # it could be more effective, but the following is more descriptive
+ combined_state = 'error'
+ if len(states):
+ if any(state in {'error', 'failure'} for state in states):
+ combined_state = 'failure'
+ elif any(state == 'pending' for state in states):
+ combined_state = 'pending'
+ elif all(state == 'success' for state in states):
+ combined_state = 'success'
+ # show link to the actual build, some of the CI providers implement
+ # the statuses API others implement the checks API, so display both
+ build_links = [s.target_url for s in status.statuses]
+ build_links += [c.html_url for c in check_runs]
+ self.combined_state = combined_state
+ self.github_status = status
+ self.github_check_runs = check_runs
+ self.total_count = len(states)
+ self.build_links = build_links
+class TaskAssets(dict):
+ def __init__(self, github_release, artifact_patterns,
+ validate_patterns=True):
+ # HACK(kszucs): don't expect uploaded assets of no atifacts were
+ # defiened for the tasks in order to spare a bit of github rate limit
+ if not artifact_patterns:
+ return
+ if github_release is None:
+ github_assets = {} # no assets have been uploaded for the task
+ else:
+ github_assets = { a for a in github_release.assets()}
+ if not validate_patterns:
+ # shortcut to avoid pattern validation and just set all artifacts
+ return self.update(github_assets)
+ for pattern in artifact_patterns:
+ # artifact can be a regex pattern
+ compiled = re.compile(f"^{pattern}$")
+ matches = list(
+ filter(None, map(compiled.match, github_assets.keys()))
+ )
+ num_matches = len(matches)
+ # validate artifact pattern matches single asset
+ if num_matches == 0:
+ self[pattern] = None
+ elif num_matches == 1:
+ self[pattern] = github_assets[matches[0].group(0)]
+ else:
+ raise CrossbowError(
+ 'Only a single asset should match pattern `{}`, there are '
+ 'multiple ones: {}'.format(pattern, ', '.join(matches))
+ )
+ def missing_patterns(self):
+ return [pattern for pattern, asset in self.items() if asset is None]
+ def uploaded_assets(self):
+ return [asset for asset in self.values() if asset is not None]
+class Job(Serializable):
+ """Describes multiple tasks against a single target repository"""
+ def __init__(self, target, tasks, params=None, template_searchpath=None):
+ if not tasks:
+ raise ValueError('no tasks were provided for the job')
+ if not all(isinstance(task, Task) for task in tasks.values()):
+ raise ValueError('each `tasks` mus be an instance of Task')
+ if not isinstance(target, Target):
+ raise ValueError('`target` must be an instance of Target')
+ if not isinstance(target, Target):
+ raise ValueError('`target` must be an instance of Target')
+ if not isinstance(params, dict):
+ raise ValueError('`params` must be an instance of dict')
+ = target
+ self.tasks = tasks
+ self.params = params or {} # additional parameters for the tasks
+ self.branch = None # filled after adding to a queue
+ self._queue = None # set by the queue object after put or get
+ if template_searchpath is None:
+ self._template_searchpath = ArrowSources.find().path
+ else:
+ self._template_searchpath = template_searchpath
+ def render_files(self):
+ with StringIO() as buf:
+ yaml.dump(self, buf)
+ content = buf.getvalue()
+ tree = {**_default_tree, "job.yml": content}
+ return _unflatten_tree(tree)
+ def render_tasks(self, params=None):
+ result = {}
+ params = {
+ **self.params,
+ "arrow":,
+ **(params or {})
+ }
+ for task_name, task in self.tasks.items():
+ files = task.render_files(self._template_searchpath, params)
+ result[task_name] = files
+ return result
+ @property
+ def template_searchpath(self):
+ return self._template_searchpath
+ @property
+ def queue(self):
+ assert isinstance(self._queue, Queue)
+ return self._queue
+ @queue.setter
+ def queue(self, queue):
+ assert isinstance(queue, Queue)
+ self._queue = queue
+ for task in self.tasks.values():
+ task._queue = queue
+ @property
+ def email(self):
+ return os.environ.get('CROSSBOW_EMAIL',
+ @property
+ def date(self):
+ return self.queue.date_of(self)
+ def show(self, stream=None):
+ return yaml.dump(self, stream=stream)
+ @classmethod
+ def from_config(cls, config, target, tasks=None, groups=None, params=None):
+ """
+ Intantiate a job from based on a config.
+ Parameters
+ ----------
+ config : dict
+ Deserialized content of tasks.yml
+ target : Target
+ Describes target repository and revision the builds run against.
+ tasks : Optional[List[str]], default None
+ List of glob patterns for matching task names.
+ groups : Optional[List[str]], default None
+ List of exact group names matching predefined task sets in the
+ config.
+ params : Optional[Dict[str, str]], default None
+ Additional rendering parameters for the task templates.
+ Returns
+ -------
+ Job
+ Raises
+ ------
+ Exception:
+ If invalid groups or tasks has been passed.
+ """
+ task_definitions =, groups=groups)
+ # instantiate the tasks
+ tasks = {}
+ versions = {'version': target.version,
+ 'no_rc_version': target.no_rc_version,
+ 'no_rc_semver_version': target.no_rc_semver_version}
+ for task_name, task in task_definitions.items():
+ artifacts = task.pop('artifacts', None) or [] # because of yaml
+ artifacts = [fn.format(**versions) for fn in artifacts]
+ tasks[task_name] = Task(artifacts=artifacts, **task)
+ return cls(target=target, tasks=tasks, params=params,
+ template_searchpath=config.template_searchpath)
+ def is_finished(self):
+ for task in self.tasks.values():
+ status = task.status(force_query=True)
+ if status.combined_state == 'pending':
+ return False
+ return True
+ def wait_until_finished(self, poll_max_minutes=120,
+ poll_interval_minutes=10):
+ started_at = time.time()
+ while True:
+ if self.is_finished():
+ break
+ waited_for_minutes = (time.time() - started_at) / 60
+ if waited_for_minutes > poll_max_minutes:
+ msg = ('Exceeded the maximum amount of time waiting for job '
+ 'to finish, waited for {} minutes.')
+ raise RuntimeError(msg.format(waited_for_minutes))
+'Waiting {} minutes and then checking again'
+ .format(poll_interval_minutes))
+ time.sleep(poll_interval_minutes * 60)
+class Config(dict):
+ def __init__(self, tasks, template_searchpath):
+ super().__init__(tasks)
+ self.template_searchpath = template_searchpath
+ @classmethod
+ def load_yaml(cls, path):
+ path = Path(path)
+ searchpath = path.parent
+ rendered = _render_jinja_template(searchpath,,
+ params={})
+ config = yaml.load(rendered)
+ return cls(config, template_searchpath=searchpath)
+ def show(self, stream=None):
+ return yaml.dump(dict(self), stream=stream)
+ def select(self, tasks=None, groups=None):
+ config_groups = dict(self['groups'])
+ config_tasks = dict(self['tasks'])
+ valid_groups = set(config_groups.keys())
+ valid_tasks = set(config_tasks.keys())
+ group_whitelist = list(groups or [])
+ task_whitelist = list(tasks or [])
+ # validate that the passed groups are defined in the config
+ requested_groups = set(group_whitelist)
+ invalid_groups = requested_groups - valid_groups
+ if invalid_groups:
+ msg = 'Invalid group(s) {!r}. Must be one of {!r}'.format(
+ invalid_groups, valid_groups
+ )
+ raise CrossbowError(msg)
+ # merge the tasks defined in the selected groups
+ task_patterns = [list(config_groups[name]) for name in group_whitelist]
+ task_patterns = set(sum(task_patterns, task_whitelist))
+ # treat the task names as glob patterns to select tasks more easily
+ requested_tasks = set()
+ for pattern in task_patterns:
+ matches = fnmatch.filter(valid_tasks, pattern)
+ if len(matches):
+ requested_tasks.update(matches)
+ else:
+ raise CrossbowError(
+ "Unable to match any tasks for `{}`".format(pattern)
+ )
+ # validate that the passed and matched tasks are defined in the config
+ invalid_tasks = requested_tasks - valid_tasks
+ if invalid_tasks:
+ msg = 'Invalid task(s) {!r}. Must be one of {!r}'.format(
+ invalid_tasks, valid_tasks
+ )
+ raise CrossbowError(msg)
+ return {
+ task_name: config_tasks[task_name] for task_name in requested_tasks
+ }
+ def validate(self):
+ # validate that the task groups are properly referening the tasks
+ for group_name, group in self['groups'].items():
+ for pattern in group:
+ tasks =[pattern])
+ if not tasks:
+ raise CrossbowError(
+ "The pattern `{}` defined for task group `{}` is not "
+ "matching any of the tasks defined in the "
+ "configuration file.".format(pattern, group_name)
+ )
+ # validate that the tasks are constructible
+ for task_name, task in self['tasks'].items():
+ try:
+ Task(**task)
+ except Exception as e:
+ raise CrossbowError(
+ 'Unable to construct a task object from the '
+ 'definition of task `{}`. The original error message '
+ 'is: `{}`'.format(task_name, str(e))
+ )
+ # validate that the defined tasks are renderable, in order to to that
+ # define the required object with dummy data
+ target = Target(
+ head='e279a7e06e61c14868ca7d71dea795420aea6539',
+ branch='master',
+ remote='',
+ version='1.0.0dev123',
+ email=''
+ )
+ for task_name, task in self['tasks'].items():
+ task = Task(**task)
+ files = task.render_files(
+ self.template_searchpath,
+ params=dict(
+ arrow=target,
+ queue_remote_url=''
+ )
+ )
+ if not files:
+ raise CrossbowError('No files have been rendered for task `{}`'
+ .format(task_name))
+# configure yaml serializer
+yaml = YAML()