summaryrefslogtreecommitdiffstats
path: root/powerline/lib/vcs/mercurial.py
blob: 09b6e0b1a9e54208d57e9f7f0c16fc4d8213eccb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# vim:fileencoding=utf-8:noet
from __future__ import (unicode_literals, division, absolute_import, print_function)

import os

import hglib

from powerline.lib.vcs import get_branch_name, get_file_status
from powerline.lib.path import join
from powerline.lib.encoding import get_preferred_file_contents_encoding


def branch_name_from_config_file(directory, config_file):
	try:
		with open(config_file, 'rb') as f:
			raw = f.read()
		return raw.decode(get_preferred_file_contents_encoding(), 'replace').strip()
	except Exception:
		return 'default'


class Repository(object):
	__slots__ = ('directory', 'create_watcher')

	# hg status -> (powerline file status, repo status flag)
	statuses = {
		b'M': ('M', 1), b'A': ('A', 1), b'R': ('R', 1), b'!': ('D', 1),
		b'?': ('U', 2), b'I': ('I', 0), b'C': ('', 0),
	}
	repo_statuses_str = (None, 'D ', ' U', 'DU')

	def __init__(self, directory, create_watcher):
		self.directory = os.path.abspath(directory)
		self.create_watcher = create_watcher

	def _repo(self, directory):
		# Cannot create this object once and use always: when repository updates
		# functions emit invalid results
		return hglib.open(directory)

	def status(self, path=None):
		'''Return status of repository or file.

		Without file argument: returns status of the repository:

		:'D?': dirty (tracked modified files: added, removed, deleted, modified),
		:'?U': untracked-dirty (added, but not tracked files)
		:None: clean (status is empty)

		With file argument: returns status of this file: `M`odified, `A`dded,
		`R`emoved, `D`eleted (removed from filesystem, but still tracked),
		`U`nknown, `I`gnored, (None)Clean.
		'''
		if path:
			return get_file_status(
				directory=self.directory,
				dirstate_file=join(self.directory, '.hg', 'dirstate'),
				file_path=path,
				ignore_file_name='.hgignore',
				get_func=self.do_status,
				create_watcher=self.create_watcher,
			)
		return self.do_status(self.directory, path)

	def do_status(self, directory, path):
		with self._repo(directory) as repo:
			if path:
				path = os.path.join(directory, path)
				statuses = repo.status(include=path, all=True)
				for status, paths in statuses:
					if paths:
						return self.statuses[status][0]
				return None
			else:
				resulting_status = 0
				for status, paths in repo.status(all=True):
					if paths:
						resulting_status |= self.statuses[status][1]
				return self.repo_statuses_str[resulting_status]

	def branch(self):
		config_file = join(self.directory, '.hg', 'branch')
		return get_branch_name(
			directory=self.directory,
			config_file=config_file,
			get_func=branch_name_from_config_file,
			create_watcher=self.create_watcher,
		)