1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
# ceph-deploy ftw
import os
try:
from typing import Optional
except ImportError:
pass
PYTHONS = ['python3', 'python2', 'python']
PATH = [
'/usr/bin',
'/usr/local/bin',
'/bin',
'/usr/sbin',
'/usr/local/sbin',
'/sbin',
]
def choose_python():
# type: () -> Optional[str]
for e in PYTHONS:
for b in PATH:
p = os.path.join(b, e)
if os.path.exists(p):
return p
return None
def write_file(path: str, content: bytes, mode: int, uid: int, gid: int,
mkdir_p: bool = True) -> Optional[str]:
try:
if mkdir_p:
dirname = os.path.dirname(path)
if not os.path.exists(dirname):
os.makedirs(dirname)
tmp_path = path + '.new'
with open(tmp_path, 'wb') as f:
os.fchown(f.fileno(), uid, gid)
os.fchmod(f.fileno(), mode)
f.write(content)
os.fsync(f.fileno())
os.rename(tmp_path, path)
except Exception as e:
return str(e)
return None
if __name__ == '__channelexec__':
for item in channel: # type: ignore # noqa: F821
channel.send(eval(item)) # type: ignore # noqa: F821
|