1
0
Fork 0
bind9/contrib/gitchangelog/gitchangelog.py
Daniel Baumann f66ff7eae6
Adding upstream version 1:9.20.9.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
2025-06-21 13:32:37 +02:00

2281 lines
64 KiB
Python
Executable file

#!/usr/bin/env python3
############################################################################
# Copyright (c) 2018, Valentin Lab
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Securactive nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL SECURACTIVE BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# SPDX-License-Identifier: BSD-3-Clause
#
############################################################################
from __future__ import print_function
from __future__ import absolute_import
import locale
import re
import os
import os.path
import sys
import glob
import textwrap
import datetime
import collections
import traceback
import contextlib
import itertools
import errno
from subprocess import Popen, PIPE
try:
import pystache
except ImportError: ## pragma: no cover
pystache = None
try:
import mako
except ImportError: ## pragma: no cover
mako = None
__version__ = "%%version%%" ## replaced by autogen.sh
EBUG = None
##
## Platform and python compatibility
##
PY_VERSION = float("%d.%d" % sys.version_info[0:2])
PY3 = PY_VERSION >= 3
try:
basestring
except NameError:
basestring = str ## pylint: disable=redefined-builtin
WIN32 = sys.platform == "win32"
if WIN32:
PLT_CFG = {
"close_fds": False,
}
else:
PLT_CFG = {
"close_fds": True,
}
##
##
##
if WIN32 and not PY3:
## Sorry about the following, all this code is to ensure full
## compatibility with python 2.7 under windows about sending unicode
## command-line
import ctypes
import subprocess
import _subprocess
from ctypes import (
byref,
windll,
c_char_p,
c_wchar_p,
c_void_p,
Structure,
sizeof,
c_wchar,
WinError,
)
from ctypes.wintypes import BYTE, WORD, LPWSTR, BOOL, DWORD, LPVOID, HANDLE
##
## Types
##
CREATE_UNICODE_ENVIRONMENT = 0x00000400
LPCTSTR = c_char_p
LPTSTR = c_wchar_p
LPSECURITY_ATTRIBUTES = c_void_p
LPBYTE = ctypes.POINTER(BYTE)
class STARTUPINFOW(Structure):
_fields_ = [
("cb", DWORD),
("lpReserved", LPWSTR),
("lpDesktop", LPWSTR),
("lpTitle", LPWSTR),
("dwX", DWORD),
("dwY", DWORD),
("dwXSize", DWORD),
("dwYSize", DWORD),
("dwXCountChars", DWORD),
("dwYCountChars", DWORD),
("dwFillAtrribute", DWORD),
("dwFlags", DWORD),
("wShowWindow", WORD),
("cbReserved2", WORD),
("lpReserved2", LPBYTE),
("hStdInput", HANDLE),
("hStdOutput", HANDLE),
("hStdError", HANDLE),
]
LPSTARTUPINFOW = ctypes.POINTER(STARTUPINFOW)
class PROCESS_INFORMATION(Structure):
_fields_ = [
("hProcess", HANDLE),
("hThread", HANDLE),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
]
LPPROCESS_INFORMATION = ctypes.POINTER(PROCESS_INFORMATION)
class DUMMY_HANDLE(ctypes.c_void_p):
def __init__(self, *a, **kw):
super(DUMMY_HANDLE, self).__init__(*a, **kw)
self.closed = False
def Close(self):
if not self.closed:
windll.kernel32.CloseHandle(self)
self.closed = True
def __int__(self):
return self.value
CreateProcessW = windll.kernel32.CreateProcessW
CreateProcessW.argtypes = [
LPCTSTR,
LPTSTR,
LPSECURITY_ATTRIBUTES,
LPSECURITY_ATTRIBUTES,
BOOL,
DWORD,
LPVOID,
LPCTSTR,
LPSTARTUPINFOW,
LPPROCESS_INFORMATION,
]
CreateProcessW.restype = BOOL
##
## Patched functions/classes
##
def CreateProcess(
executable,
args,
_p_attr,
_t_attr,
inherit_handles,
creation_flags,
env,
cwd,
startup_info,
):
"""Create a process supporting unicode executable and args for win32
Python implementation of CreateProcess using CreateProcessW for Win32
"""
si = STARTUPINFOW(
dwFlags=startup_info.dwFlags,
wShowWindow=startup_info.wShowWindow,
cb=sizeof(STARTUPINFOW),
## XXXvlab: not sure of the casting here to ints.
hStdInput=int(startup_info.hStdInput),
hStdOutput=int(startup_info.hStdOutput),
hStdError=int(startup_info.hStdError),
)
wenv = None
if env is not None:
## LPCWSTR seems to be c_wchar_p, so let's say CWSTR is c_wchar
env = (
unicode("").join([unicode("%s=%s\0") % (k, v) for k, v in env.items()])
) + unicode("\0")
wenv = (c_wchar * len(env))()
wenv.value = env
pi = PROCESS_INFORMATION()
creation_flags |= CREATE_UNICODE_ENVIRONMENT
if CreateProcessW(
executable,
args,
None,
None,
inherit_handles,
creation_flags,
wenv,
cwd,
byref(si),
byref(pi),
):
return (
DUMMY_HANDLE(pi.hProcess),
DUMMY_HANDLE(pi.hThread),
pi.dwProcessId,
pi.dwThreadId,
)
raise WinError()
class Popen(subprocess.Popen):
"""This superseeds Popen and corrects a bug in cPython 2.7 implem"""
def _execute_child(
self,
args,
executable,
preexec_fn,
close_fds,
cwd,
env,
universal_newlines,
startupinfo,
creationflags,
shell,
to_close,
p2cread,
p2cwrite,
c2pread,
c2pwrite,
errread,
errwrite,
):
"""Code from part of _execute_child from Python 2.7 (9fbb65e)
There are only 2 little changes concerning the construction of
the the final string in shell mode: we preempt the creation of
the command string when shell is True, because original function
will try to encode unicode args which we want to avoid to be able to
sending it as-is to ``CreateProcess``.
"""
if not isinstance(args, subprocess.types.StringTypes):
args = subprocess.list2cmdline(args)
if startupinfo is None:
startupinfo = subprocess.STARTUPINFO()
if shell:
startupinfo.dwFlags |= _subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = _subprocess.SW_HIDE
comspec = os.environ.get("COMSPEC", unicode("cmd.exe"))
args = unicode('{} /c "{}"').format(comspec, args)
if (
_subprocess.GetVersion() >= 0x80000000
or os.path.basename(comspec).lower() == "command.com"
):
w9xpopen = self._find_w9xpopen()
args = unicode('"%s" %s') % (w9xpopen, args)
creationflags |= _subprocess.CREATE_NEW_CONSOLE
super(Popen, self)._execute_child(
args,
executable,
preexec_fn,
close_fds,
cwd,
env,
universal_newlines,
startupinfo,
creationflags,
False,
to_close,
p2cread,
p2cwrite,
c2pread,
c2pwrite,
errread,
errwrite,
)
_subprocess.CreateProcess = CreateProcess
##
## Help and usage strings
##
usage_msg = """
%(exname)s {-h|--help}
%(exname)s {-v|--version}
%(exname)s [--debug|-d] [REVLIST]"""
description_msg = """\
Run this command in a git repository to output a formatted changelog
"""
epilog_msg = """\
%(exname)s uses a config file to filter meaningful commit or do some
formatting in commit messages thanks to a config file.
Config file location will be resolved in this order:
- in shell environment variable GITCHANGELOG_CONFIG_FILENAME
- in git configuration: ``git config gitchangelog.rc-path``
- as '.%(exname)s.rc' in the root of the current git repository
"""
##
## Shell command helper functions
##
def stderr(msg):
print(msg, file=sys.stderr)
def err(msg):
stderr("Error: " + msg)
def warn(msg):
stderr("Warning: " + msg)
def die(msg=None, errlvl=1):
if msg:
stderr(msg)
sys.exit(errlvl)
class ShellError(Exception):
def __init__(self, msg, errlvl=None, command=None, out=None, err=None):
self.errlvl = errlvl
self.command = command
self.out = out
self.err = err
super(ShellError, self).__init__(msg)
@contextlib.contextmanager
def set_cwd(directory):
curdir = os.getcwd()
os.chdir(directory)
try:
yield
finally:
os.chdir(curdir)
def format_last_exception(prefix=" | "):
"""Format the last exception for display it in tests.
This allows to raise custom exception, without loosing the context of what
caused the problem in the first place:
>>> def f():
... raise Exception("Something terrible happened")
>>> try: ## doctest: +ELLIPSIS
... f()
... except Exception:
... formated_exception = format_last_exception()
... raise ValueError('Oups, an error occured:\\n%s'
... % formated_exception)
Traceback (most recent call last):
...
ValueError: Oups, an error occured:
| Traceback (most recent call last):
...
| Exception: Something terrible happened
"""
return "\n".join(
str(prefix + line) for line in traceback.format_exc().strip().split("\n")
)
##
## config file functions
##
_config_env = {
"WIN32": WIN32,
"PY3": PY3,
}
def available_in_config(f):
_config_env[f.__name__] = f
return f
def load_config_file(filename, default_filename=None, fail_if_not_present=True):
"""Loads data from a config file."""
config = _config_env.copy()
for fname in [default_filename, filename]:
if fname and os.path.exists(fname):
if not os.path.isfile(fname):
die("config file path '%s' exists but is not a file !" % (fname,))
content = file_get_contents(fname)
try:
code = compile(content, fname, "exec")
exec(code, config) ## pylint: disable=exec-used
except SyntaxError as e:
die(
"Syntax error in config file: %s\n%s"
"File %s, line %i"
% (
str(e),
(indent(e.text.rstrip(), " | ") + "\n") if e.text else "",
e.filename,
e.lineno,
)
)
else:
if fail_if_not_present:
die("%s config file is not found and is required." % (fname,))
return config
##
## Text functions
##
@available_in_config
class TextProc(object):
def __init__(self, fun):
self.fun = fun
if hasattr(fun, "__name__"):
self.__name__ = fun.__name__
def __call__(self, text):
return self.fun(text)
def __or__(self, value):
if isinstance(value, TextProc):
return TextProc(lambda text: value.fun(self.fun(text)))
import inspect
(_frame, filename, lineno, _function_name, lines, _index) = inspect.stack()[1]
raise SyntaxError(
"Invalid syntax in config file",
(
filename,
lineno,
0,
"Invalid chain with a non TextProc element %r:\n%s"
% (value, indent("".join(lines).strip(), " | ")),
),
)
def set_if_empty(text, msg="No commit message."):
if len(text):
return text
return msg
@TextProc
def ucfirst(msg):
if len(msg) == 0:
return msg
return msg[0].upper() + msg[1:]
@TextProc
def final_dot(msg):
if len(msg) and msg[-1].isalnum():
return msg + "."
return msg
def indent(text, chars=" ", first=None):
"""Return text string indented with the given chars
>>> string = 'This is first line.\\nThis is second line\\n'
>>> print(indent(string, chars="| ")) # doctest: +NORMALIZE_WHITESPACE
| This is first line.
| This is second line
|
>>> print(indent(string, first="- ")) # doctest: +NORMALIZE_WHITESPACE
- This is first line.
This is second line
>>> string = 'This is first line.\\n\\nThis is second line'
>>> print(indent(string, first="- ")) # doctest: +NORMALIZE_WHITESPACE
- This is first line.
<BLANKLINE>
This is second line
"""
if first:
first_line = text.split("\n")[0]
rest = "\n".join(text.split("\n")[1:])
return "\n".join([(first + first_line).rstrip(), indent(rest, chars=chars)])
return "\n".join([(chars + line).rstrip() for line in text.split("\n")])
def paragraph_wrap(text, regexp="\n\n", separator="\n"):
r"""Wrap text by making sure that paragraph are separated correctly
>>> string = 'This is first paragraph which is quite long don\'t you \
... think ? Well, I think so.\n\nThis is second paragraph\n'
>>> print(paragraph_wrap(string)) # doctest: +NORMALIZE_WHITESPACE
This is first paragraph which is quite long don't you think ? Well, I
think so.
This is second paragraph
Notice that that each paragraph has been wrapped separately.
"""
regexp = re.compile(regexp, re.MULTILINE)
return separator.join(
"\n".join(textwrap.wrap(paragraph.strip(), break_on_hyphens=False))
for paragraph in regexp.split(text)
).strip()
def curryfy(f):
return lambda *a, **kw: TextProc(lambda txt: f(txt, *a, **kw))
## these are curryfied version of their lower case definition
Indent = curryfy(indent)
Wrap = curryfy(paragraph_wrap)
ReSub = lambda p, r, **k: TextProc(lambda txt: re.sub(p, r, txt, **k))
noop = TextProc(lambda txt: txt)
strip = TextProc(lambda txt: txt.strip())
SetIfEmpty = curryfy(set_if_empty)
for _label in (
"Indent",
"Wrap",
"ReSub",
"noop",
"final_dot",
"ucfirst",
"strip",
"SetIfEmpty",
):
_config_env[_label] = locals()[_label]
##
## File
##
def file_get_contents(filename):
with open(filename) as f:
out = f.read()
if not PY3:
if not isinstance(out, unicode):
out = out.decode(_preferred_encoding)
## remove encoding declaration (for some reason, python 2.7
## don't like it).
out = re.sub(
r"^(\s*#.*\s*)coding[:=]\s*([-\w.]+\s*;?\s*)", r"\1", out, re.DOTALL
)
return out
def file_put_contents(filename, string):
"""Write string to filename."""
if PY3:
fopen = open(filename, "w", newline="")
else:
fopen = open(filename, "wb")
with fopen as f:
f.write(string)
##
## Inferring revision
##
def _file_regex_match(filename, pattern, **kw):
if not os.path.isfile(filename):
raise IOError("Can't open file '%s'." % filename)
file_content = file_get_contents(filename)
match = re.search(pattern, file_content, **kw)
if match is None:
stderr("file content: %r" % file_content)
if isinstance(pattern, type(re.compile(""))):
pattern = pattern.pattern
raise ValueError(
"Regex %s did not match any substring in '%s'." % (pattern, filename)
)
return match
@available_in_config
def FileFirstRegexMatch(filename, pattern):
def _call():
match = _file_regex_match(filename, pattern)
dct = match.groupdict()
if dct:
if "rev" not in dct:
warn(
"Named pattern used, but no one are named 'rev'. "
"Using full match."
)
return match.group(0)
if dct["rev"] is None:
die("Named pattern used, but it was not valued.")
return dct["rev"]
return match.group(0)
return _call
@available_in_config
def Caret(l):
def _call():
return "^%s" % eval_if_callable(l)
return _call
##
## System functions
##
## Note that locale.getpreferredencoding() does NOT follow
## PYTHONIOENCODING by default, but ``sys.stdout.encoding`` does. In
## PY2, ``sys.stdout.encoding`` without PYTHONIOENCODING set does not
## get any values set in subshells. However, if _preferred_encoding
## is not set to utf-8, it leads to encoding errors.
_preferred_encoding = (
os.environ.get("PYTHONIOENCODING") or locale.getpreferredencoding()
)
DEFAULT_GIT_LOG_ENCODING = "utf-8"
class Phile(object):
"""File like API to read fields separated by any delimiters
It'll take care of file decoding to unicode.
This is an adaptor on a file object.
>>> if PY3:
... from io import BytesIO
... def File(s):
... _obj = BytesIO()
... _obj.write(s.encode(_preferred_encoding))
... _obj.seek(0)
... return _obj
... else:
... from cStringIO import StringIO as File
>>> f = Phile(File("a-b-c-d"))
Read provides an iterator:
>>> def show(l):
... print(", ".join(l))
>>> show(f.read(delimiter="-"))
a, b, c, d
You can change the buffersize loaded into memory before outputing
your changes. It should not change the iterator output:
>>> f = Phile(File("é-à-ü-d"), buffersize=3)
>>> len(list(f.read(delimiter="-")))
4
>>> f = Phile(File("foo-bang-yummy"), buffersize=3)
>>> show(f.read(delimiter="-"))
foo, bang, yummy
>>> f = Phile(File("foo-bang-yummy"), buffersize=1)
>>> show(f.read(delimiter="-"))
foo, bang, yummy
"""
def __init__(self, filename, buffersize=4096, encoding=_preferred_encoding):
self._file = filename
self._buffersize = buffersize
self._encoding = encoding
def read(self, delimiter="\n"):
buf = ""
if PY3:
delimiter = delimiter.encode(_preferred_encoding)
buf = buf.encode(_preferred_encoding)
while True:
chunk = self._file.read(self._buffersize)
if not chunk:
yield buf.decode(self._encoding)
return
records = chunk.split(delimiter)
records[0] = buf + records[0]
for record in records[:-1]:
yield record.decode(self._encoding)
buf = records[-1]
def write(self, buf):
if PY3:
buf = buf.encode(self._encoding)
return self._file.write(buf)
def close(self):
return self._file.close()
class Proc(Popen):
def __init__(self, command, env=None, encoding=_preferred_encoding):
super(Proc, self).__init__(
command,
shell=True,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
close_fds=PLT_CFG["close_fds"],
env=env,
universal_newlines=False,
)
self.stdin = Phile(self.stdin, encoding=encoding)
self.stdout = Phile(self.stdout, encoding=encoding)
self.stderr = Phile(self.stderr, encoding=encoding)
def cmd(command, env=None, shell=True):
p = Popen(
command,
shell=shell,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
close_fds=PLT_CFG["close_fds"],
env=env,
universal_newlines=False,
)
out, err = p.communicate()
return (
out.decode(getattr(sys.stdout, "encoding", None) or _preferred_encoding),
err.decode(getattr(sys.stderr, "encoding", None) or _preferred_encoding),
p.returncode,
)
@available_in_config
def wrap(command, ignore_errlvls=[0], env=None, shell=True):
"""Wraps a shell command and casts an exception on unexpected errlvl
>>> wrap('/tmp/lsdjflkjf') # doctest: +ELLIPSIS +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
ShellError: Wrapped command '/tmp/lsdjflkjf' exited with errorlevel 127.
stderr:
| /bin/sh: .../tmp/lsdjflkjf: not found
>>> print(wrap('echo hello'), end='')
hello
>>> print(wrap('echo hello && false'),
... end='') # doctest: +ELLIPSIS +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
ShellError: Wrapped command 'echo hello && false' exited with errorlevel 1.
stdout:
| hello
"""
out, err, errlvl = cmd(command, env=env, shell=shell)
if errlvl not in ignore_errlvls:
formatted = []
if out:
if out.endswith("\n"):
out = out[:-1]
formatted.append("stdout:\n%s" % indent(out, "| "))
if err:
if err.endswith("\n"):
err = err[:-1]
formatted.append("stderr:\n%s" % indent(err, "| "))
msg = "\n".join(formatted)
raise ShellError(
"Wrapped command %r exited with errorlevel %d.\n%s"
% (command, errlvl, indent(msg, chars=" ")),
errlvl=errlvl,
command=command,
out=out,
err=err,
)
return out
@available_in_config
def swrap(command, **kwargs):
"""Same as ``wrap(...)`` but strips the output."""
return wrap(command, **kwargs).strip()
##
## git information access
##
class SubGitObjectMixin(object):
def __init__(self, repos):
self._repos = repos
@property
def git(self):
"""Simple delegation to ``repos`` original method."""
return self._repos.git
GIT_FORMAT_KEYS = {
"sha1": "%H",
"sha1_short": "%h",
"subject": "%s",
"author_name": "%an",
"author_email": "%ae",
"author_date": "%ad",
"author_date_timestamp": "%at",
"committer_name": "%cn",
"committer_date_timestamp": "%ct",
"raw_body": "%B",
"body": "%b",
}
GIT_FULL_FORMAT_STRING = "%x00".join(GIT_FORMAT_KEYS.values())
REGEX_RFC822_KEY_VALUE = (
r"(^|\n)(?P<key>[A-Z]\w+(-\w+)*): (?P<value>[^\n]*(\n\s+[^\n]*)*)"
)
REGEX_RFC822_POSTFIX = r"(%s)+$" % REGEX_RFC822_KEY_VALUE
class GitCommit(SubGitObjectMixin):
r"""Represent a Git Commit and expose through its attribute many information
Let's create a fake GitRepos:
>>> from minimock import Mock
>>> repos = Mock("gitRepos")
Initialization:
>>> repos.git = Mock("gitRepos.git")
>>> repos.git.log.mock_returns_func = \
... lambda *a, **kwargs: "\x00".join([{
... 'sha1': "000000",
... 'sha1_short': "000",
... 'subject': SUBJECT,
... 'author_name': "John Smith",
... 'author_date': "Tue Feb 14 20:31:22 2017 +0700",
... 'author_email': "john.smith@example.com",
... 'author_date_timestamp': "0", ## epoch
... 'committer_name': "Alice Wang",
... 'committer_date_timestamp': "0", ## epoch
... 'raw_body': "my subject\n\n%s" % BODY,
... 'body': BODY,
... }[key] for key in GIT_FORMAT_KEYS.keys()])
>>> repos.git.rev_list.mock_returns = "123456"
Query, by attributes or items:
>>> SUBJECT = "fee fie foh"
>>> BODY = "foo foo foo"
>>> head = GitCommit(repos, "HEAD")
>>> head.subject
Called gitRepos.git.log(...'HEAD'...)
'fee fie foh'
>>> head.author_name
'John Smith'
Notice that on the second call, there's no need to call again git log as
all the values have already been computed.
Trailer
=======
``GitCommit`` offers a simple direct API to trailer values. These
are like RFC822's header value but are at the end of body:
>>> BODY = '''\
... Stuff in the body
... Change-id: 1234
... Value-X: Supports multi
... line values'''
>>> head = GitCommit(repos, "HEAD")
>>> head.trailer_change_id
Called gitRepos.git.log(...'HEAD'...)
'1234'
>>> head.trailer_value_x
'Supports multi\nline values'
Notice how the multi-line value was unindented.
In case of multiple values, these are concatened in lists:
>>> BODY = '''\
... Stuff in the body
... Co-Authored-By: Bob
... Co-Authored-By: Alice
... Co-Authored-By: Jack
... '''
>>> head = GitCommit(repos, "HEAD")
>>> head.trailer_co_authored_by
Called gitRepos.git.log(...'HEAD'...)
['Bob', 'Alice', 'Jack']
Special values
==============
Authors
-------
>>> BODY = '''\
... Stuff in the body
... Co-Authored-By: Bob
... Co-Authored-By: Alice
... Co-Authored-By: Jack
... '''
>>> head = GitCommit(repos, "HEAD")
>>> head.author_names
Called gitRepos.git.log(...'HEAD'...)
['Alice', 'Bob', 'Jack', 'John Smith']
Notice that they are printed in alphabetical order.
"""
def __init__(self, repos, identifier):
super(GitCommit, self).__init__(repos)
self.identifier = identifier
self._trailer_parsed = False
def __getattr__(self, label):
"""Completes commits attributes upon request."""
attrs = GIT_FORMAT_KEYS.keys()
if label not in attrs:
try:
return self.__dict__[label]
except KeyError:
if self._trailer_parsed:
raise AttributeError(label)
identifier = self.identifier
## Compute only missing information
missing_attrs = [l for l in attrs if l not in self.__dict__]
## some commit can be already fully specified (see ``mk_commit``)
if missing_attrs:
aformat = "%x00".join(GIT_FORMAT_KEYS[l] for l in missing_attrs)
try:
ret = self.git.log(
[identifier, "--max-count=1", "--pretty=format:%s" % aformat, "--"]
)
except ShellError:
if DEBUG:
raise
raise ValueError(
"Given commit identifier %r doesn't exists" % self.identifier
)
attr_values = ret.split("\x00")
for attr, value in zip(missing_attrs, attr_values):
setattr(self, attr, value.strip())
## Let's interpret RFC822-like header keys that could be in the body
match = re.search(REGEX_RFC822_POSTFIX, self.body)
if match is not None:
pos = match.start()
postfix = self.body[pos:]
self.body = self.body[:pos]
for match in re.finditer(REGEX_RFC822_KEY_VALUE, postfix):
dct = match.groupdict()
key = dct["key"].replace("-", "_").lower()
if "\n" in dct["value"]:
first_line, remaining = dct["value"].split("\n", 1)
value = "%s\n%s" % (first_line, textwrap.dedent(remaining))
else:
value = dct["value"]
try:
prev_value = self.__dict__["trailer_%s" % key]
except KeyError:
setattr(self, "trailer_%s" % key, value)
else:
setattr(
self,
"trailer_%s" % key,
(
prev_value
+ [
value,
]
if isinstance(prev_value, list)
else [
prev_value,
value,
]
),
)
self._trailer_parsed = True
return getattr(self, label)
@property
def author_names(self):
return [
re.sub(r"^([^<]+)<[^>]+>\s*$", r"\1", author).strip()
for author in self.authors
]
@property
def authors(self):
co_authors = getattr(self, "trailer_co_authored_by", [])
co_authors = co_authors if isinstance(co_authors, list) else [co_authors]
return sorted(co_authors + ["%s <%s>" % (self.author_name, self.author_email)])
@property
def date(self):
d = datetime.datetime.utcfromtimestamp(float(self.author_date_timestamp))
return d.strftime("%Y-%m-%d")
@property
def has_annotated_tag(self):
try:
self.git.rev_parse(["%s^{tag}" % self.identifier, "--"])
return True
except ShellError as e:
if e.errlvl != 128:
raise
return False
@property
def tagger_date_timestamp(self):
if not self.has_annotated_tag:
raise ValueError(
"Can't access 'tagger_date_timestamp' on commit without annotated tag."
)
tagger_date_utc = self.git.for_each_ref(
"refs/tags/%s" % self.identifier, format="%(taggerdate:raw)"
)
return tagger_date_utc.split(" ", 1)[0]
@property
def tagger_date(self):
d = datetime.datetime.fromtimestamp(
float(self.tagger_date_timestamp), datetime.UTC
)
return d.strftime("%Y-%m-%d")
def __le__(self, value):
if not isinstance(value, GitCommit):
value = self._repos.commit(value)
try:
self.git.merge_base(value.sha1, is_ancestor=self.sha1)
return True
except ShellError as e:
if e.errlvl != 1:
raise
return False
def __lt__(self, value):
if not isinstance(value, GitCommit):
value = self._repos.commit(value)
return self <= value and self != value
def __eq__(self, value):
if not isinstance(value, GitCommit):
value = self._repos.commit(value)
return self.sha1 == value.sha1
def __hash__(self):
return hash(self.sha1)
def __repr__(self):
return "<%s %r>" % (self.__class__.__name__, self.identifier)
def normpath(path, cwd=None):
"""path can be absolute or relative, if relative it uses the cwd given as
param.
"""
if os.path.isabs(path):
return path
cwd = cwd if cwd else os.getcwd()
return os.path.normpath(os.path.join(cwd, path))
class GitConfig(SubGitObjectMixin):
"""Interface to config values of git
Let's create a fake GitRepos:
>>> from minimock import Mock
>>> repos = Mock("gitRepos")
Initialization:
>>> cfg = GitConfig(repos)
Query, by attributes or items:
>>> repos.git.config.mock_returns = "bar"
>>> cfg.foo
Called gitRepos.git.config('foo')
'bar'
>>> cfg["foo"]
Called gitRepos.git.config('foo')
'bar'
>>> cfg.get("foo")
Called gitRepos.git.config('foo')
'bar'
>>> cfg["foo.wiz"]
Called gitRepos.git.config('foo.wiz')
'bar'
Notice that you can't use attribute search in subsection as ``cfg.foo.wiz``
That's because in git config files, you can have a value attached to
an element, and this element can also be a section.
Nevertheless, you can do:
>>> getattr(cfg, "foo.wiz")
Called gitRepos.git.config('foo.wiz')
'bar'
Default values
--------------
get item, and getattr default values can be used:
>>> del repos.git.config.mock_returns
>>> repos.git.config.mock_raises = ShellError('Key not found',
... errlvl=1, out="", err="")
>>> getattr(cfg, "foo", "default")
Called gitRepos.git.config('foo')
'default'
>>> cfg["foo"] ## doctest: +ELLIPSIS
Traceback (most recent call last):
...
KeyError: 'foo'
>>> getattr(cfg, "foo") ## doctest: +ELLIPSIS
Traceback (most recent call last):
...
AttributeError...
>>> cfg.get("foo", "default")
Called gitRepos.git.config('foo')
'default'
>>> print("%r" % cfg.get("foo"))
Called gitRepos.git.config('foo')
None
"""
def __init__(self, repos):
super(GitConfig, self).__init__(repos)
def __getattr__(self, label):
try:
res = self.git.config(label)
except ShellError as e:
if e.errlvl == 1 and e.out == "":
raise AttributeError("key %r is not found in git config." % label)
raise
return res
def get(self, label, default=None):
return getattr(self, label, default)
def __getitem__(self, label):
try:
return getattr(self, label)
except AttributeError:
raise KeyError(label)
class GitCmd(SubGitObjectMixin):
def __getattr__(self, label):
label = label.replace("_", "-")
def dir_swrap(command, **kwargs):
with set_cwd(self._repos._orig_path):
return swrap(command, **kwargs)
def method(*args, **kwargs):
if len(args) == 1 and not isinstance(args[0], basestring):
return dir_swrap(
[
"git",
label,
]
+ args[0],
shell=False,
env=kwargs.get("env", None),
)
cli_args = []
for key, value in kwargs.items():
cli_key = ("-%s" if len(key) == 1 else "--%s") % key.replace("_", "-")
if isinstance(value, bool):
cli_args.append(cli_key)
else:
cli_args.append(cli_key)
cli_args.append(value)
cli_args.extend(args)
return dir_swrap(
[
"git",
label,
]
+ cli_args,
shell=False,
)
return method
class GitRepos(object):
def __init__(self, path):
## Saving this original path to ensure all future git commands
## will be done from this location.
self._orig_path = os.path.abspath(path)
## verify ``git`` command is accessible:
try:
self._git_version = self.git.version()
except ShellError:
if DEBUG:
raise
raise EnvironmentError(
"Required ``git`` command not found or broken in $PATH. "
"(calling ``git version`` failed.)"
)
## verify that we are in a git repository
try:
self.git.remote()
except ShellError:
if DEBUG:
raise
raise EnvironmentError(
"Not in a git repository. (calling ``git remote`` failed.)"
)
self.bare = self.git.rev_parse(is_bare_repository=True) == "true"
self.toplevel = None if self.bare else self.git.rev_parse(show_toplevel=True)
self.gitdir = normpath(self.git.rev_parse(git_dir=True), cwd=self._orig_path)
@classmethod
def create(cls, directory, *args, **kwargs):
os.mkdir(directory)
return cls.init(directory, *args, **kwargs)
@classmethod
def init(cls, directory, user=None, email=None):
with set_cwd(directory):
wrap("git init .")
self = cls(directory)
if user:
self.git.config("user.name", user)
if email:
self.git.config("user.email", email)
return self
def commit(self, identifier):
return GitCommit(self, identifier)
@property
def git(self):
return GitCmd(self)
@property
def config(self):
return GitConfig(self)
def tags(self, contains=None):
"""String list of repository's tag names
Current tag order is committer date timestamp of tagged commit.
No firm reason for that, and it could change in future version.
"""
if contains:
tags = self.git.tag(contains=contains).split("\n")
else:
tags = self.git.tag().split("\n")
## Should we use new version name sorting ? refering to :
## ``git tags --sort -v:refname`` in git version >2.0.
## Sorting and reversing with command line is not available on
## git version <2.0
return sorted(
[self.commit(tag) for tag in tags if tag != ""],
key=lambda x: int(x.committer_date_timestamp),
)
def log(
self,
includes=[
"HEAD",
],
excludes=[],
include_merge=True,
encoding=_preferred_encoding,
):
"""Reverse chronological list of git repository's commits
Note: rev lists can be GitCommit instance list or identifier list.
"""
refs = {"includes": includes, "excludes": excludes}
for ref_type in ("includes", "excludes"):
for idx, ref in enumerate(refs[ref_type]):
if not isinstance(ref, GitCommit):
refs[ref_type][idx] = self.commit(ref)
## --topo-order: don't mix commits from separate branches.
plog = Proc(
"git log --stdin -z --topo-order --pretty=format:%s %s --"
% (GIT_FULL_FORMAT_STRING, "--no-merges" if not include_merge else ""),
encoding=encoding,
)
for ref in refs["includes"]:
plog.stdin.write("%s\n" % ref.sha1)
for ref in refs["excludes"]:
plog.stdin.write("^%s\n" % ref.sha1)
plog.stdin.close()
def mk_commit(dct):
"""Creates an already set commit from a dct"""
c = self.commit(dct["sha1"])
for k, v in dct.items():
setattr(c, k, v)
return c
values = plog.stdout.read("\x00")
try:
while True: ## next(values) will eventualy raise a StopIteration
yield mk_commit(dict([(key, next(values)) for key in GIT_FORMAT_KEYS]))
except StopIteration:
pass ## since 3.7, we are not allowed anymore to trickle down
## StopIteration.
finally:
plog.stdout.close()
plog.stderr.close()
def first_matching(section_regexps, string):
for section, regexps in section_regexps:
if regexps is None:
return section
for regexp in regexps:
if re.search(regexp, string) is not None:
return section
def ensure_template_file_exists(label, template_name):
"""Return template file path given a label hint and the template name
Template name can be either a filename with full path,
if this is the case, the label is of no use.
If ``template_name`` does not refer to an existing file,
then ``label`` is used to find a template file in the
the bundled ones.
"""
try:
template_path = GitRepos(os.getcwd()).config.get("gitchangelog.template-path")
except ShellError as e:
stderr(
"Error parsing git config: %s."
" Won't be able to read 'template-path' if defined." % (str(e))
)
template_path = None
if template_path:
path_file = path_label = template_path
else:
path_file = os.getcwd()
path_label = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "templates", label
)
for ftn in [
os.path.join(path_file, template_name),
os.path.join(path_label, "%s.tpl" % template_name),
]:
if os.path.isfile(ftn):
return ftn
templates = glob.glob(os.path.join(path_label, "*.tpl"))
if len(templates) > 0:
msg = "These are the available %s templates:" % label
msg += "\n - " + "\n - ".join(
os.path.basename(f).split(".")[0] for f in templates
)
msg += "\nTemplates are located in %r" % path_label
else:
msg = "No available %s templates found in %r." % (label, path_label)
die("Error: Invalid %s template name %r.\n" % (label, template_name) + "%s" % msg)
##
## Output Engines
##
@available_in_config
def rest_py(data, opts={}):
"""Returns ReStructured Text changelog content from data"""
def rest_title(label, char="="):
return (label.strip() + "\n") + (char * len(label) + "\n\n")
def render_version(version):
title = (
"%s (%s)" % (version["tag"], version["date"])
if version["tag"]
else opts["unreleased_version_label"]
)
s = rest_title(title, char="-")
sections = version["sections"]
nb_sections = len(sections)
for section in sections:
section_label = section["label"] if section.get("label", None) else "Other"
if not (section_label == "Other" and nb_sections == 1):
s += rest_title(section_label, "~")
for commit in section["commits"]:
s += render_commit(commit, opts)
return s
def render_commit(commit, opts=opts):
subject = commit["subject"]
if opts["include_commit_sha"]:
subject += " ``%s``" % commit["commit"].sha1_short
entry = (
indent(
"\n".join(textwrap.wrap(subject, break_on_hyphens=False)), first="- "
).strip()
+ "\n"
)
if commit["body"]:
entry += "\n" + indent(commit["body"])
entry += "\n"
entry += "\n"
return entry
if data["title"]:
yield rest_title(data["title"], char="=") + "\n"
for version in data["versions"]:
if len(version["sections"]) > 0:
yield render_version(version) + "\n"
## formatter engines
if pystache:
@available_in_config
def mustache(template_name):
"""Return a callable that will render a changelog data structure
returned callable must take 2 arguments ``data`` and ``opts``.
"""
template_path = ensure_template_file_exists("mustache", template_name)
template = file_get_contents(template_path)
def stuffed_versions(versions, opts):
for version in versions:
title = (
"%s (%s)" % (version["tag"], version["date"])
if version["tag"]
else opts["unreleased_version_label"]
)
version["label"] = title
version["label_chars"] = list(version["label"])
for section in version["sections"]:
section["label_chars"] = list(section["label"])
section["display_label"] = not (
section["label"] == "Other" and len(version["sections"]) == 1
)
for commit in section["commits"]:
commit["author_names_joined"] = ", ".join(commit["authors"])
commit["body_indented"] = indent(commit["body"])
yield version
def renderer(data, opts):
## mustache is very simple so we need to add some intermediate
## values
data["general_title"] = True if data["title"] else False
data["title_chars"] = list(data["title"]) if data["title"] else []
data["versions"] = stuffed_versions(data["versions"], opts)
return pystache.render(template, data)
return renderer
else:
@available_in_config
def mustache(template_name): ## pylint: disable=unused-argument
die("Required 'pystache' python module not found.")
if mako:
import mako.template ## pylint: disable=wrong-import-position
mako_env = dict(
(f.__name__, f) for f in (ucfirst, indent, textwrap, paragraph_wrap)
)
@available_in_config
def makotemplate(template_name):
"""Return a callable that will render a changelog data structure
returned callable must take 2 arguments ``data`` and ``opts``.
"""
template_path = ensure_template_file_exists("mako", template_name)
template = mako.template.Template(filename=template_path)
def renderer(data, opts):
kwargs = mako_env.copy()
kwargs.update({"data": data, "opts": opts})
return template.render(**kwargs)
return renderer
else:
@available_in_config
def makotemplate(template_name): ## pylint: disable=unused-argument
die("Required 'mako' python module not found.")
##
## Publish action
##
@available_in_config
def stdout(content):
for chunk in content:
safe_print(chunk)
@available_in_config
def FileInsertAtFirstRegexMatch(filename, pattern, flags=0, idx=lambda m: m.start()):
def write_content(f, content):
for content_line in content:
f.write(content_line)
def _wrapped(content):
index = idx(_file_regex_match(filename, pattern, flags=flags))
offset = 0
new_offset = 0
postfix = False
with open(filename + "~", "w") as dst:
with open(filename, "r") as src:
for line in src:
if postfix:
dst.write(line)
continue
new_offset = offset + len(line)
if new_offset < index:
offset = new_offset
dst.write(line)
continue
dst.write(line[0 : index - offset])
write_content(dst, content)
dst.write(line[index - offset :])
postfix = True
if not postfix:
write_content(dst, content)
if WIN32:
os.remove(filename)
os.rename(filename + "~", filename)
return _wrapped
@available_in_config
def FileRegexSubst(filename, pattern, replace, flags=0):
replace = re.sub(r"\\([0-9+])", r"\\g<\1>", replace)
def _wrapped(content):
src = file_get_contents(filename)
## Protect replacement pattern against the following expansion of '\o'
src = re.sub(
pattern,
replace.replace(r"\o", "".join(content).replace("\\", "\\\\")),
src,
flags=flags,
)
if not PY3:
src = src.encode(_preferred_encoding)
file_put_contents(filename, src)
return _wrapped
##
## Data Structure
##
def versions_data_iter(
repository,
revlist=None,
ignore_regexps=[],
section_regexps=[(None, "")],
tag_filter_regexp=r"\d+\.\d+(\.\d+)?",
include_merge=True,
body_process=lambda x: x,
subject_process=lambda x: x,
log_encoding=DEFAULT_GIT_LOG_ENCODING,
warn=warn, ## Mostly used for test
):
"""Returns an iterator through versions data structures
(see ``gitchangelog.rc.reference`` file for more info)
:param repository: target ``GitRepos`` object
:param revlist: list of strings that git log understands as revlist
:param ignore_regexps: list of regexp identifying ignored commit messages
:param section_regexps: regexps identifying sections
:param tag_filter_regexp: regexp to match tags used as version
:param include_merge: whether to include merge commits in the log or not
:param body_process: text processing object to apply to body
:param subject_process: text processing object to apply to subject
:param log_encoding: the encoding used in git logs
:param warn: callable to output warnings, mocked by tests
:returns: iterator of versions data_structures
"""
revlist = revlist or []
## Hash to speedup lookups
versions_done = {}
excludes = (
[
rev[1:]
for rev in repository.git.rev_parse(
[
"--rev-only",
]
+ revlist
+ [
"--",
]
).split("\n")
if rev.startswith("^")
]
if revlist
else []
)
revs = repository.git.rev_list(*revlist).split("\n") if revlist else []
revs = [rev for rev in revs if rev != ""]
if revlist and not revs:
die("No commits matching given revlist: %s" % (" ".join(revlist),))
tags = [
tag
for tag in repository.tags(contains=revs[-1] if revs else None)
if re.match(tag_filter_regexp, tag.identifier)
]
tags.append(repository.commit("HEAD"))
if revlist:
max_rev = repository.commit(revs[0])
new_tags = []
for tag in tags:
new_tags.append(tag)
if max_rev <= tag:
break
tags = new_tags
else:
max_rev = tags[-1]
section_order = [k for k, _v in section_regexps]
tags = list(reversed(tags))
## Get the changes between tags (releases)
for idx, tag in enumerate(tags):
## New version
current_version = {
"date": tag.tagger_date if tag.has_annotated_tag else tag.date,
"commit_date": tag.date,
"tagger_date": tag.tagger_date if tag.has_annotated_tag else None,
"tag": tag.identifier if tag.identifier != "HEAD" else None,
"commit": tag,
}
sections = collections.defaultdict(list)
commits = repository.log(
includes=[min(tag, max_rev)],
excludes=tags[idx + 1 :] + excludes,
include_merge=include_merge,
encoding=log_encoding,
)
for commit in commits:
if any(
re.search(pattern, commit.subject) is not None
for pattern in ignore_regexps
):
continue
body = body_process(commit.body)
## Extract gitlab issue number
issue = None
if match := re.search(r".*:gl:`#([0-9]+)`", body):
issue = int(match.group(1))
matched_section = first_matching(section_regexps, commit.subject)
## Finally storing the commit in the matching section
sections[matched_section].append(
{
"author": commit.author_name,
"authors": commit.author_names,
"subject": subject_process(commit.subject),
"body": body,
"commit": commit,
"issue": issue,
}
)
## Sort sections by issue number or title
for section_key in sections.keys():
sections[section_key].sort(
key=lambda c: (
c["issue"] if c["issue"] is not None else sys.maxsize,
c["subject"],
)
)
## Flush current version
current_version["sections"] = [
{"label": k, "commits": sections[k]} for k in section_order if k in sections
]
if len(current_version["sections"]) != 0:
yield current_version
versions_done[tag] = current_version
def changelog(
output_engine=rest_py,
unreleased_version_label="unreleased",
include_commit_sha=False,
warn=warn, ## Mostly used for test
**kwargs,
):
"""Returns a string containing the changelog of given repository
This function returns a string corresponding to the template rendered with
the changelog data tree.
(see ``gitchangelog.rc.sample`` file for more info)
For an exact list of arguments, see the arguments of
``versions_data_iter(..)``.
:param unreleased_version_label: version label for untagged commits
:param include_commit_sha: whether message should contain commit sha
:param output_engine: callable to render the changelog data
:param warn: callable to output warnings, mocked by tests
:returns: content of changelog
"""
opts = {
"unreleased_version_label": unreleased_version_label,
"include_commit_sha": include_commit_sha,
}
## Setting main container of changelog elements
title = None if kwargs.get("revlist") else "Changelog"
data = {"title": title, "versions": []}
versions = versions_data_iter(warn=warn, **kwargs)
## poke once in versions to know if there's at least one:
try:
first_version = next(versions)
except StopIteration:
warn("Empty changelog. No commits were elected to be used as entry.")
data["versions"] = []
else:
data["versions"] = itertools.chain([first_version], versions)
return output_engine(data=data, opts=opts)
##
## Manage obsolete options
##
_obsolete_options_managers = []
def obsolete_option_manager(fun):
_obsolete_options_managers.append(fun)
@obsolete_option_manager
def obsolete_replace_regexps(config):
"""This option was superseeded by the ``subject_process`` option.
Each regex replacement you had could be translated in a
``ReSub(pattern, replace)`` in the ``subject_process`` pipeline.
"""
if "replace_regexps" in config:
for pattern, replace in config["replace_regexps"].items():
config["subject_process"] = ReSub(pattern, replace) | config.get(
"subject_process", ucfirst | final_dot
)
@obsolete_option_manager
def obsolete_body_split_regexp(config):
"""This option was superseeded by the ``body_process`` option.
The split regex can now be sent as a ``Wrap(regex)`` text process
instruction in the ``body_process`` pipeline.
"""
if "body_split_regex" in config:
config["body_process"] = Wrap(config["body_split_regex"]) | config.get(
"body_process", noop
)
def manage_obsolete_options(config):
for man in _obsolete_options_managers:
man(config)
##
## Command line parsing
##
def parse_cmd_line(usage, description, epilog, exname, version):
import argparse
kwargs = dict(
usage=usage,
description=description,
epilog="\n" + epilog,
prog=exname,
formatter_class=argparse.RawTextHelpFormatter,
)
try:
parser = argparse.ArgumentParser(version=version, **kwargs)
except TypeError: ## compat with argparse from python 3.4
parser = argparse.ArgumentParser(**kwargs)
parser.add_argument(
"-v",
"--version",
help="show program's version number and exit",
action="version",
version=version,
)
parser.add_argument(
"-d",
"--debug",
help="Enable debug mode (show full tracebacks).",
action="store_true",
dest="debug",
)
parser.add_argument("revlist", nargs="*", action="store", default=[])
## Remove "show" as first argument for compatibility reason.
argv = []
for i, arg in enumerate(sys.argv[1:]):
if arg.startswith("-"):
argv.append(arg)
continue
if arg == "show":
warn("'show' positional argument is deprecated.")
argv += sys.argv[i + 2 :]
break
else:
argv += sys.argv[i + 1 :]
break
return parser.parse_args(argv)
eval_if_callable = lambda v: v() if callable(v) else v
def get_revision(repository, config, opts):
if opts.revlist:
revs = opts.revlist
else:
revs = config.get("revs")
if revs:
revs = eval_if_callable(revs)
if not isinstance(revs, list):
die(
"Invalid type for 'revs' in config file. "
"A 'list' type is required, and a %r was given."
% type(revs).__name__
)
revs = [eval_if_callable(rev) for rev in revs]
else:
revs = []
for rev in revs:
if not isinstance(rev, basestring):
die(
"Invalid type for revision in revs list from config file. "
"'str' type is required, and a %r was given." % type(rev).__name__
)
try:
repository.git.rev_parse([rev, "--rev_only", "--"])
except ShellError:
if DEBUG:
raise
die("Revision %r is not valid." % rev)
if revs == [
"HEAD",
]:
return []
return revs
def get_log_encoding(repository, config):
log_encoding = config.get("log_encoding", None)
if log_encoding is None:
try:
log_encoding = repository.config.get("i18n.logOuputEncoding")
except ShellError as e:
warn(
"Error parsing git config: %s."
" Couldn't check if 'i18n.logOuputEncoding' was set." % (str(e))
)
## Final defaults coming from git defaults
return log_encoding or DEFAULT_GIT_LOG_ENCODING
##
## Config Manager
##
class Config(dict):
def __getitem__(self, label):
if label not in self.keys():
die("Missing value in config file for key '%s'." % label)
return super(Config, self).__getitem__(label)
##
## Safe print
##
def safe_print(content):
if not PY3:
if isinstance(content, unicode):
content = content.encode(_preferred_encoding)
try:
print(content, end="")
sys.stdout.flush()
except UnicodeEncodeError:
if DEBUG:
raise
## XXXvlab: should use $COLUMNS in bash and for windows:
## http://stackoverflow.com/questions/14978548
stderr(
paragraph_wrap(
textwrap.dedent(
"""\
UnicodeEncodeError:
There was a problem outputing the resulting changelog to
your console.
This probably means that the changelog contains characters
that can't be translated to characters in your current charset
(%s).
"""
)
% sys.stdout.encoding
)
)
if WIN32 and PY_VERSION < 3.6 and sys.stdout.encoding != "utf-8":
## As of PY 3.6, encoding is now ``utf-8`` regardless of
## PYTHONIOENCODING
## https://www.python.org/dev/peps/pep-0528/
stderr(
" You might want to try to fix that by setting "
"PYTHONIOENCODING to 'utf-8'."
)
exit(1)
except IOError as e:
if e.errno == 0 and not PY3 and WIN32:
## Yes, had a strange IOError Errno 0 after outputing string
## that contained UTF-8 chars on Windows and PY2.7
pass ## Ignoring exception
elif (WIN32 and e.errno == 22) or ( ## Invalid argument
not WIN32 and e.errno == errno.EPIPE
): ## Broken Pipe
## Nobody is listening anymore to stdout it seems. Let's bailout.
if PY3:
try:
## Called only to generate exception and have a chance at
## ignoring it. Otherwise this happens upon exit, and gets
## some error message printed on stderr.
sys.stdout.close()
except BrokenPipeError: ## expected outcome on linux
pass
except OSError as e2:
if e2.errno != 22: ## expected outcome on WIN32
raise
## Yay ! stdout is closed we can now exit safely.
exit(0)
else:
raise
##
## Main
##
def main():
global DEBUG
## Basic environment infos
reference_config = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "gitchangelog.rc.reference"
)
basename = os.path.basename(sys.argv[0])
if basename.endswith(".py"):
basename = basename[:-3]
debug_varname = "DEBUG_%s" % basename.upper()
DEBUG = os.environ.get(debug_varname, False)
i = lambda x: x % {"exname": basename}
opts = parse_cmd_line(
usage=i(usage_msg),
description=i(description_msg),
epilog=i(epilog_msg),
exname=basename,
version=__version__,
)
DEBUG = DEBUG or opts.debug
try:
repository = GitRepos(".")
except EnvironmentError as e:
if DEBUG:
raise
try:
die(str(e))
except Exception as e2:
die(repr(e2))
try:
gc_rc = repository.config.get("gitchangelog.rc-path")
except ShellError as e:
stderr(
"Error parsing git config: %s."
" Won't be able to read 'rc-path' if defined." % (str(e))
)
gc_rc = None
gc_rc = normpath(gc_rc, cwd=repository.toplevel) if gc_rc else None
## config file lookup resolution
for enforce_file_existence, fun in [
(True, lambda: os.environ.get("GITCHANGELOG_CONFIG_FILENAME")),
(True, lambda: gc_rc),
(
False,
lambda: (
(os.path.join(repository.toplevel, ".%s.rc" % basename))
if not repository.bare
else None
),
),
]:
changelogrc = fun()
if changelogrc:
if not os.path.exists(changelogrc):
if enforce_file_existence:
die("File %r does not exists." % changelogrc)
else:
continue ## changelogrc valued, but file does not exists
else:
break
## config file may lookup for templates relative to the toplevel
## of git repository
os.chdir(repository.toplevel)
config = load_config_file(
os.path.expanduser(changelogrc),
default_filename=reference_config,
fail_if_not_present=False,
)
config = Config(config)
log_encoding = get_log_encoding(repository, config)
revlist = get_revision(repository, config, opts)
config["unreleased_version_label"] = eval_if_callable(
config["unreleased_version_label"]
)
manage_obsolete_options(config)
try:
content = changelog(
repository=repository,
revlist=revlist,
ignore_regexps=config["ignore_regexps"],
section_regexps=config["section_regexps"],
unreleased_version_label=config["unreleased_version_label"],
include_commit_sha=config["include_commit_sha"],
tag_filter_regexp=config["tag_filter_regexp"],
output_engine=config.get("output_engine", rest_py),
include_merge=config.get("include_merge", True),
body_process=config.get("body_process", noop),
subject_process=config.get("subject_process", noop),
log_encoding=log_encoding,
)
if isinstance(content, basestring):
content = content.splitlines(True)
config.get("publish", stdout)(content)
except KeyboardInterrupt:
if DEBUG:
err("Keyboard interrupt received while running '%s':" % (basename,))
stderr(format_last_exception())
else:
err("Keyboard Interrupt. Bailing out.")
exit(130) ## Actual SIGINT as bash process convention.
except Exception as e: ## pylint: disable=broad-except
if DEBUG:
err("Exception while running '%s':" % (basename,))
stderr(format_last_exception())
else:
message = "%s" % e
err(message)
stderr(
" (set %s environment variable, "
"or use ``--debug`` to see full traceback)" % (debug_varname,)
)
exit(255)
##
## Launch program
##
if __name__ == "__main__":
main()