1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
# vim:fileencoding=utf-8:noet
from __future__ import (unicode_literals, division, absolute_import, print_function)
import os
import hglib
from powerline.lib.vcs import get_branch_name, get_file_status
from powerline.lib.path import join
from powerline.lib.encoding import get_preferred_file_contents_encoding
def branch_name_from_config_file(directory, config_file):
try:
with open(config_file, 'rb') as f:
raw = f.read()
return raw.decode(get_preferred_file_contents_encoding(), 'replace').strip()
except Exception:
return 'default'
class Repository(object):
__slots__ = ('directory', 'create_watcher')
# hg status -> (powerline file status, repo status flag)
statuses = {
b'M': ('M', 1), b'A': ('A', 1), b'R': ('R', 1), b'!': ('D', 1),
b'?': ('U', 2), b'I': ('I', 0), b'C': ('', 0),
}
repo_statuses_str = (None, 'D ', ' U', 'DU')
def __init__(self, directory, create_watcher):
self.directory = os.path.abspath(directory)
self.create_watcher = create_watcher
def _repo(self, directory):
# Cannot create this object once and use always: when repository updates
# functions emit invalid results
return hglib.open(directory)
def status(self, path=None):
'''Return status of repository or file.
Without file argument: returns status of the repository:
:'D?': dirty (tracked modified files: added, removed, deleted, modified),
:'?U': untracked-dirty (added, but not tracked files)
:None: clean (status is empty)
With file argument: returns status of this file: `M`odified, `A`dded,
`R`emoved, `D`eleted (removed from filesystem, but still tracked),
`U`nknown, `I`gnored, (None)Clean.
'''
if path:
return get_file_status(
directory=self.directory,
dirstate_file=join(self.directory, '.hg', 'dirstate'),
file_path=path,
ignore_file_name='.hgignore',
get_func=self.do_status,
create_watcher=self.create_watcher,
)
return self.do_status(self.directory, path)
def do_status(self, directory, path):
with self._repo(directory) as repo:
if path:
path = os.path.join(directory, path)
statuses = repo.status(include=path, all=True)
for status, paths in statuses:
if paths:
return self.statuses[status][0]
return None
else:
resulting_status = 0
for status, paths in repo.status(all=True):
if paths:
resulting_status |= self.statuses[status][1]
return self.repo_statuses_str[resulting_status]
def branch(self):
config_file = join(self.directory, '.hg', 'branch')
return get_branch_name(
directory=self.directory,
config_file=config_file,
get_func=branch_name_from_config_file,
create_watcher=self.create_watcher,
)
|