summaryrefslogtreecommitdiffstats
path: root/powerline/lib/vcs
diff options
context:
space:
mode:
Diffstat (limited to 'powerline/lib/vcs')
-rw-r--r--powerline/lib/vcs/__init__.py276
-rw-r--r--powerline/lib/vcs/bzr.py108
-rw-r--r--powerline/lib/vcs/git.py208
-rw-r--r--powerline/lib/vcs/mercurial.py88
4 files changed, 680 insertions, 0 deletions
diff --git a/powerline/lib/vcs/__init__.py b/powerline/lib/vcs/__init__.py
new file mode 100644
index 0000000..f862c6b
--- /dev/null
+++ b/powerline/lib/vcs/__init__.py
@@ -0,0 +1,276 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+import errno
+
+from threading import Lock
+from collections import defaultdict
+
+from powerline.lib.watcher import create_tree_watcher
+from powerline.lib.unicode import out_u
+from powerline.lib.path import join
+
+
+def generate_directories(path):
+ if os.path.isdir(path):
+ yield path
+ while True:
+ if os.path.ismount(path):
+ break
+ old_path = path
+ path = os.path.dirname(path)
+ if path == old_path or not path:
+ break
+ yield path
+
+
+_file_watcher = None
+
+
+def file_watcher(create_watcher):
+ global _file_watcher
+ if _file_watcher is None:
+ _file_watcher = create_watcher()
+ return _file_watcher
+
+
+_branch_watcher = None
+
+
+def branch_watcher(create_watcher):
+ global _branch_watcher
+ if _branch_watcher is None:
+ _branch_watcher = create_watcher()
+ return _branch_watcher
+
+
+branch_name_cache = {}
+branch_lock = Lock()
+file_status_lock = Lock()
+
+
+def get_branch_name(directory, config_file, get_func, create_watcher):
+ global branch_name_cache
+ with branch_lock:
+ # Check if the repo directory was moved/deleted
+ fw = branch_watcher(create_watcher)
+ is_watched = fw.is_watching(directory)
+ try:
+ changed = fw(directory)
+ except OSError as e:
+ if getattr(e, 'errno', None) != errno.ENOENT:
+ raise
+ changed = True
+ if changed:
+ branch_name_cache.pop(config_file, None)
+ # Remove the watches for this repo
+ if is_watched:
+ fw.unwatch(directory)
+ fw.unwatch(config_file)
+ else:
+ # Check if the config file has changed
+ try:
+ changed = fw(config_file)
+ except OSError as e:
+ if getattr(e, 'errno', None) != errno.ENOENT:
+ raise
+ # Config file does not exist (happens for mercurial)
+ if config_file not in branch_name_cache:
+ branch_name_cache[config_file] = out_u(get_func(directory, config_file))
+ if changed:
+ # Config file has changed or was not tracked
+ branch_name_cache[config_file] = out_u(get_func(directory, config_file))
+ return branch_name_cache[config_file]
+
+
+class FileStatusCache(dict):
+ def __init__(self):
+ self.dirstate_map = defaultdict(set)
+ self.ignore_map = defaultdict(set)
+ self.keypath_ignore_map = {}
+
+ def update_maps(self, keypath, directory, dirstate_file, ignore_file_name, extra_ignore_files):
+ parent = keypath
+ ignore_files = set()
+ while parent != directory:
+ nparent = os.path.dirname(keypath)
+ if nparent == parent:
+ break
+ parent = nparent
+ ignore_files.add(join(parent, ignore_file_name))
+ for f in extra_ignore_files:
+ ignore_files.add(f)
+ self.keypath_ignore_map[keypath] = ignore_files
+ for ignf in ignore_files:
+ self.ignore_map[ignf].add(keypath)
+ self.dirstate_map[dirstate_file].add(keypath)
+
+ def invalidate(self, dirstate_file=None, ignore_file=None):
+ for keypath in self.dirstate_map[dirstate_file]:
+ self.pop(keypath, None)
+ for keypath in self.ignore_map[ignore_file]:
+ self.pop(keypath, None)
+
+ def ignore_files(self, keypath):
+ for ignf in self.keypath_ignore_map[keypath]:
+ yield ignf
+
+
+file_status_cache = FileStatusCache()
+
+
+def get_file_status(directory, dirstate_file, file_path, ignore_file_name, get_func, create_watcher, extra_ignore_files=()):
+ global file_status_cache
+ keypath = file_path if os.path.isabs(file_path) else join(directory, file_path)
+ file_status_cache.update_maps(keypath, directory, dirstate_file, ignore_file_name, extra_ignore_files)
+
+ with file_status_lock:
+ # Optimize case of keypath not being cached
+ if keypath not in file_status_cache:
+ file_status_cache[keypath] = ans = get_func(directory, file_path)
+ return ans
+
+ # Check if any relevant files have changed
+ file_changed = file_watcher(create_watcher)
+ changed = False
+ # Check if dirstate has changed
+ try:
+ changed = file_changed(dirstate_file)
+ except OSError as e:
+ if getattr(e, 'errno', None) != errno.ENOENT:
+ raise
+ # The .git index file does not exist for a new git repo
+ return get_func(directory, file_path)
+
+ if changed:
+ # Remove all cached values for files that depend on this
+ # dirstate_file
+ file_status_cache.invalidate(dirstate_file=dirstate_file)
+ else:
+ # Check if the file itself has changed
+ try:
+ changed ^= file_changed(keypath)
+ except OSError as e:
+ if getattr(e, 'errno', None) != errno.ENOENT:
+ raise
+ # Do not call get_func again for a non-existent file
+ if keypath not in file_status_cache:
+ file_status_cache[keypath] = get_func(directory, file_path)
+ return file_status_cache[keypath]
+
+ if changed:
+ file_status_cache.pop(keypath, None)
+ else:
+ # Check if one of the ignore files has changed
+ for ignf in file_status_cache.ignore_files(keypath):
+ try:
+ changed ^= file_changed(ignf)
+ except OSError as e:
+ if getattr(e, 'errno', None) != errno.ENOENT:
+ raise
+ if changed:
+ # Invalidate cache for all files that might be affected
+ # by this ignore file
+ file_status_cache.invalidate(ignore_file=ignf)
+ break
+
+ try:
+ return file_status_cache[keypath]
+ except KeyError:
+ file_status_cache[keypath] = ans = get_func(directory, file_path)
+ return ans
+
+
+class TreeStatusCache(dict):
+ def __init__(self, pl):
+ self.tw = create_tree_watcher(pl)
+ self.pl = pl
+
+ def cache_and_get(self, key, status):
+ ans = self.get(key, self)
+ if ans is self:
+ ans = self[key] = status()
+ return ans
+
+ def __call__(self, repo):
+ key = repo.directory
+ try:
+ if self.tw(key, ignore_event=getattr(repo, 'ignore_event', None)):
+ self.pop(key, None)
+ except OSError as e:
+ self.pl.warn('Failed to check {0} for changes, with error: {1}', key, str(e))
+ return self.cache_and_get(key, repo.status)
+
+
+_tree_status_cache = None
+
+
+def tree_status(repo, pl):
+ global _tree_status_cache
+ if _tree_status_cache is None:
+ _tree_status_cache = TreeStatusCache(pl)
+ return _tree_status_cache(repo)
+
+
+vcs_props = (
+ ('git', '.git', os.path.exists),
+ ('mercurial', '.hg', os.path.isdir),
+ ('bzr', '.bzr', os.path.isdir),
+)
+
+
+vcs_props_bytes = [
+ (vcs, vcs_dir.encode('ascii'), check)
+ for vcs, vcs_dir, check in vcs_props
+]
+
+
+def guess(path, create_watcher):
+ for directory in generate_directories(path):
+ for vcs, vcs_dir, check in (vcs_props_bytes if isinstance(path, bytes) else vcs_props):
+ repo_dir = os.path.join(directory, vcs_dir)
+ if check(repo_dir):
+ if os.path.isdir(repo_dir) and not os.access(repo_dir, os.X_OK):
+ continue
+ try:
+ if vcs not in globals():
+ globals()[vcs] = getattr(__import__(str('powerline.lib.vcs'), fromlist=[str(vcs)]), str(vcs))
+ return globals()[vcs].Repository(directory, create_watcher)
+ except:
+ pass
+ return None
+
+
+def get_fallback_create_watcher():
+ from powerline.lib.watcher import create_file_watcher
+ from powerline import get_fallback_logger
+ from functools import partial
+ return partial(create_file_watcher, get_fallback_logger(), 'auto')
+
+
+def debug():
+ '''Test run guess(), repo.branch() and repo.status()
+
+ To use::
+ python -c 'from powerline.lib.vcs import debug; debug()' some_file_to_watch.
+ '''
+ import sys
+ dest = sys.argv[-1]
+ repo = guess(os.path.abspath(dest), get_fallback_create_watcher)
+ if repo is None:
+ print ('%s is not a recognized vcs repo' % dest)
+ raise SystemExit(1)
+ print ('Watching %s' % dest)
+ print ('Press Ctrl-C to exit.')
+ try:
+ while True:
+ if os.path.isdir(dest):
+ print ('Branch name: %s Status: %s' % (repo.branch(), repo.status()))
+ else:
+ print ('File status: %s' % repo.status(dest))
+ raw_input('Press Enter to check again: ')
+ except KeyboardInterrupt:
+ pass
+ except EOFError:
+ pass
diff --git a/powerline/lib/vcs/bzr.py b/powerline/lib/vcs/bzr.py
new file mode 100644
index 0000000..e47d8b2
--- /dev/null
+++ b/powerline/lib/vcs/bzr.py
@@ -0,0 +1,108 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+import re
+
+from io import StringIO
+
+from bzrlib import (workingtree, status, library_state, trace, ui)
+
+from powerline.lib.vcs import get_branch_name, get_file_status
+from powerline.lib.path import join
+from powerline.lib.encoding import get_preferred_file_contents_encoding
+
+
+class CoerceIO(StringIO):
+ def write(self, arg):
+ if isinstance(arg, bytes):
+ arg = arg.decode(get_preferred_file_contents_encoding(), 'replace')
+ return super(CoerceIO, self).write(arg)
+
+
+nick_pat = re.compile(br'nickname\s*=\s*(.+)')
+
+
+def branch_name_from_config_file(directory, config_file):
+ ans = None
+ try:
+ with open(config_file, 'rb') as f:
+ for line in f:
+ m = nick_pat.match(line)
+ if m is not None:
+ ans = m.group(1).strip().decode(get_preferred_file_contents_encoding(), 'replace')
+ break
+ except Exception:
+ pass
+ return ans or os.path.basename(directory)
+
+
+state = None
+
+
+class Repository(object):
+ def __init__(self, directory, create_watcher):
+ self.directory = os.path.abspath(directory)
+ self.create_watcher = create_watcher
+
+ def status(self, path=None):
+ '''Return status of repository or file.
+
+ Without file argument: returns status of the repository:
+
+ :'D?': dirty (tracked modified files: added, removed, deleted, modified),
+ :'?U': untracked-dirty (added, but not tracked files)
+ :None: clean (status is empty)
+
+ With file argument: returns status of this file: The status codes are
+ those returned by bzr status -S
+ '''
+ if path is not None:
+ return get_file_status(
+ directory=self.directory,
+ dirstate_file=join(self.directory, '.bzr', 'checkout', 'dirstate'),
+ file_path=path,
+ ignore_file_name='.bzrignore',
+ get_func=self.do_status,
+ create_watcher=self.create_watcher,
+ )
+ return self.do_status(self.directory, path)
+
+ def do_status(self, directory, path):
+ try:
+ return self._status(self.directory, path)
+ except Exception:
+ pass
+
+ def _status(self, directory, path):
+ global state
+ if state is None:
+ state = library_state.BzrLibraryState(ui=ui.SilentUIFactory, trace=trace.DefaultConfig())
+ buf = CoerceIO()
+ w = workingtree.WorkingTree.open(directory)
+ status.show_tree_status(w, specific_files=[path] if path else None, to_file=buf, short=True)
+ raw = buf.getvalue()
+ if not raw.strip():
+ return
+ if path:
+ ans = raw[:2]
+ if ans == 'I ': # Ignored
+ ans = None
+ return ans
+ dirtied = untracked = ' '
+ for line in raw.splitlines():
+ if len(line) > 1 and line[1] in 'ACDMRIN':
+ dirtied = 'D'
+ elif line and line[0] == '?':
+ untracked = 'U'
+ ans = dirtied + untracked
+ return ans if ans.strip() else None
+
+ def branch(self):
+ config_file = join(self.directory, '.bzr', 'branch', 'branch.conf')
+ return get_branch_name(
+ directory=self.directory,
+ config_file=config_file,
+ get_func=branch_name_from_config_file,
+ create_watcher=self.create_watcher,
+ )
diff --git a/powerline/lib/vcs/git.py b/powerline/lib/vcs/git.py
new file mode 100644
index 0000000..bebc311
--- /dev/null
+++ b/powerline/lib/vcs/git.py
@@ -0,0 +1,208 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+import re
+
+from powerline.lib.vcs import get_branch_name, get_file_status
+from powerline.lib.shell import readlines
+from powerline.lib.path import join
+from powerline.lib.encoding import (get_preferred_file_name_encoding,
+ get_preferred_file_contents_encoding)
+from powerline.lib.shell import which
+
+
+_ref_pat = re.compile(br'ref:\s*refs/heads/(.+)')
+
+
+def branch_name_from_config_file(directory, config_file):
+ try:
+ with open(config_file, 'rb') as f:
+ raw = f.read()
+ except EnvironmentError:
+ return os.path.basename(directory)
+ m = _ref_pat.match(raw)
+ if m is not None:
+ return m.group(1).decode(get_preferred_file_contents_encoding(), 'replace')
+ return raw[:7]
+
+
+def git_directory(directory):
+ path = join(directory, '.git')
+ if os.path.isfile(path):
+ with open(path, 'rb') as f:
+ raw = f.read()
+ if not raw.startswith(b'gitdir: '):
+ raise IOError('invalid gitfile format')
+ raw = raw[8:]
+ if raw[-1:] == b'\n':
+ raw = raw[:-1]
+ if not isinstance(path, bytes):
+ raw = raw.decode(get_preferred_file_name_encoding())
+ if not raw:
+ raise IOError('no path in gitfile')
+ return os.path.abspath(os.path.join(directory, raw))
+ else:
+ return path
+
+
+class GitRepository(object):
+ __slots__ = ('directory', 'create_watcher')
+
+ def __init__(self, directory, create_watcher):
+ self.directory = os.path.abspath(directory)
+ self.create_watcher = create_watcher
+
+ def status(self, path=None):
+ '''Return status of repository or file.
+
+ Without file argument: returns status of the repository:
+
+ :First column: working directory status (D: dirty / space)
+ :Second column: index status (I: index dirty / space)
+ :Third column: presence of untracked files (U: untracked files / space)
+ :None: repository clean
+
+ With file argument: returns status of this file. Output is
+ equivalent to the first two columns of ``git status --porcelain``
+ (except for merge statuses as they are not supported by libgit2).
+ '''
+ if path:
+ gitd = git_directory(self.directory)
+ # We need HEAD as without it using fugitive to commit causes the
+ # current file’s status (and only the current file) to not be updated
+ # for some reason I cannot be bothered to figure out.
+ return get_file_status(
+ directory=self.directory,
+ dirstate_file=join(gitd, 'index'),
+ file_path=path,
+ ignore_file_name='.gitignore',
+ get_func=self.do_status,
+ create_watcher=self.create_watcher,
+ extra_ignore_files=tuple(join(gitd, x) for x in ('logs/HEAD', 'info/exclude')),
+ )
+ return self.do_status(self.directory, path)
+
+ def branch(self):
+ directory = git_directory(self.directory)
+ head = join(directory, 'HEAD')
+ return get_branch_name(
+ directory=directory,
+ config_file=head,
+ get_func=branch_name_from_config_file,
+ create_watcher=self.create_watcher,
+ )
+
+
+try:
+ import pygit2 as git
+
+ class Repository(GitRepository):
+ @staticmethod
+ def ignore_event(path, name):
+ return False
+
+ def stash(self):
+ try:
+ stashref = git.Repository(git_directory(self.directory)).lookup_reference('refs/stash')
+ except KeyError:
+ return 0
+ return sum(1 for _ in stashref.log())
+
+ def do_status(self, directory, path):
+ if path:
+ try:
+ status = git.Repository(directory).status_file(path)
+ except (KeyError, ValueError):
+ return None
+
+ if status == git.GIT_STATUS_CURRENT:
+ return None
+ else:
+ if status & git.GIT_STATUS_WT_NEW:
+ return '??'
+ if status & git.GIT_STATUS_IGNORED:
+ return '!!'
+
+ if status & git.GIT_STATUS_INDEX_NEW:
+ index_status = 'A'
+ elif status & git.GIT_STATUS_INDEX_DELETED:
+ index_status = 'D'
+ elif status & git.GIT_STATUS_INDEX_MODIFIED:
+ index_status = 'M'
+ else:
+ index_status = ' '
+
+ if status & git.GIT_STATUS_WT_DELETED:
+ wt_status = 'D'
+ elif status & git.GIT_STATUS_WT_MODIFIED:
+ wt_status = 'M'
+ else:
+ wt_status = ' '
+
+ return index_status + wt_status
+ else:
+ wt_column = ' '
+ index_column = ' '
+ untracked_column = ' '
+ for status in git.Repository(directory).status().values():
+ if status & git.GIT_STATUS_WT_NEW:
+ untracked_column = 'U'
+ continue
+
+ if status & (git.GIT_STATUS_WT_DELETED | git.GIT_STATUS_WT_MODIFIED):
+ wt_column = 'D'
+
+ if status & (
+ git.GIT_STATUS_INDEX_NEW
+ | git.GIT_STATUS_INDEX_MODIFIED
+ | git.GIT_STATUS_INDEX_DELETED
+ ):
+ index_column = 'I'
+ r = wt_column + index_column + untracked_column
+ return r if r != ' ' else None
+except ImportError:
+ class Repository(GitRepository):
+ def __init__(self, *args, **kwargs):
+ if not which('git'):
+ raise OSError('git executable is not available')
+ super(Repository, self).__init__(*args, **kwargs)
+
+ @staticmethod
+ def ignore_event(path, name):
+ # Ignore changes to the index.lock file, since they happen
+ # frequently and don't indicate an actual change in the working tree
+ # status
+ return path.endswith('.git') and name == 'index.lock'
+
+ def _gitcmd(self, directory, *args):
+ return readlines(('git',) + args, directory)
+
+ def stash(self):
+ return sum(1 for _ in self._gitcmd(self.directory, '--no-optional-locks', 'stash', 'list'))
+
+ def do_status(self, directory, path):
+ if path:
+ try:
+ return next(self._gitcmd(directory, '--no-optional-locks', 'status', '--porcelain', '--ignored', '--', path))[:2]
+ except StopIteration:
+ return None
+ else:
+ wt_column = ' '
+ index_column = ' '
+ untracked_column = ' '
+ for line in self._gitcmd(directory, '--no-optional-locks', 'status', '--porcelain'):
+ if line[0] == '?':
+ untracked_column = 'U'
+ continue
+ elif line[0] == '!':
+ continue
+
+ if line[0] != ' ':
+ index_column = 'I'
+
+ if line[1] != ' ':
+ wt_column = 'D'
+
+ r = wt_column + index_column + untracked_column
+ return r if r != ' ' else None
diff --git a/powerline/lib/vcs/mercurial.py b/powerline/lib/vcs/mercurial.py
new file mode 100644
index 0000000..09b6e0b
--- /dev/null
+++ b/powerline/lib/vcs/mercurial.py
@@ -0,0 +1,88 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+
+import hglib
+
+from powerline.lib.vcs import get_branch_name, get_file_status
+from powerline.lib.path import join
+from powerline.lib.encoding import get_preferred_file_contents_encoding
+
+
+def branch_name_from_config_file(directory, config_file):
+ try:
+ with open(config_file, 'rb') as f:
+ raw = f.read()
+ return raw.decode(get_preferred_file_contents_encoding(), 'replace').strip()
+ except Exception:
+ return 'default'
+
+
+class Repository(object):
+ __slots__ = ('directory', 'create_watcher')
+
+ # hg status -> (powerline file status, repo status flag)
+ statuses = {
+ b'M': ('M', 1), b'A': ('A', 1), b'R': ('R', 1), b'!': ('D', 1),
+ b'?': ('U', 2), b'I': ('I', 0), b'C': ('', 0),
+ }
+ repo_statuses_str = (None, 'D ', ' U', 'DU')
+
+ def __init__(self, directory, create_watcher):
+ self.directory = os.path.abspath(directory)
+ self.create_watcher = create_watcher
+
+ def _repo(self, directory):
+ # Cannot create this object once and use always: when repository updates
+ # functions emit invalid results
+ return hglib.open(directory)
+
+ def status(self, path=None):
+ '''Return status of repository or file.
+
+ Without file argument: returns status of the repository:
+
+ :'D?': dirty (tracked modified files: added, removed, deleted, modified),
+ :'?U': untracked-dirty (added, but not tracked files)
+ :None: clean (status is empty)
+
+ With file argument: returns status of this file: `M`odified, `A`dded,
+ `R`emoved, `D`eleted (removed from filesystem, but still tracked),
+ `U`nknown, `I`gnored, (None)Clean.
+ '''
+ if path:
+ return get_file_status(
+ directory=self.directory,
+ dirstate_file=join(self.directory, '.hg', 'dirstate'),
+ file_path=path,
+ ignore_file_name='.hgignore',
+ get_func=self.do_status,
+ create_watcher=self.create_watcher,
+ )
+ return self.do_status(self.directory, path)
+
+ def do_status(self, directory, path):
+ with self._repo(directory) as repo:
+ if path:
+ path = os.path.join(directory, path)
+ statuses = repo.status(include=path, all=True)
+ for status, paths in statuses:
+ if paths:
+ return self.statuses[status][0]
+ return None
+ else:
+ resulting_status = 0
+ for status, paths in repo.status(all=True):
+ if paths:
+ resulting_status |= self.statuses[status][1]
+ return self.repo_statuses_str[resulting_status]
+
+ def branch(self):
+ config_file = join(self.directory, '.hg', 'branch')
+ return get_branch_name(
+ directory=self.directory,
+ config_file=config_file,
+ get_func=branch_name_from_config_file,
+ create_watcher=self.create_watcher,
+ )