summaryrefslogtreecommitdiffstats
path: root/src/vfs/extfs/helpers/uc1541
diff options
context:
space:
mode:
Diffstat (limited to 'src/vfs/extfs/helpers/uc1541')
-rwxr-xr-xsrc/vfs/extfs/helpers/uc1541702
1 files changed, 702 insertions, 0 deletions
diff --git a/src/vfs/extfs/helpers/uc1541 b/src/vfs/extfs/helpers/uc1541
new file mode 100755
index 0000000..1da649f
--- /dev/null
+++ b/src/vfs/extfs/helpers/uc1541
@@ -0,0 +1,702 @@
+#!/usr/bin/env python
+"""
+UC1541 Virtual filesystem
+
+Author: Roman 'gryf' Dobosz <gryf73@gmail.com>
+Date: 2019-09-20
+Version: 3.3
+Licence: BSD
+source: https://bitbucket.org/gryf/uc1541
+mirror: https://github.com/gryf/uc1541
+"""
+
+import sys
+import re
+import os
+import gzip
+from subprocess import Popen, PIPE
+
+if os.getenv('UC1541_DEBUG'):
+ import logging
+ LOG = logging.getLogger('UC1541')
+ LOG.setLevel(logging.DEBUG)
+ FILE_HANDLER = logging.FileHandler("/tmp/uc1541.log")
+ FILE_FORMATTER = logging.Formatter("%(asctime)s %(levelname)-8s "
+ "%(lineno)s %(funcName)s - %(message)s")
+ FILE_HANDLER.setFormatter(FILE_FORMATTER)
+ FILE_HANDLER.setLevel(logging.DEBUG)
+ LOG.addHandler(FILE_HANDLER)
+else:
+ class LOG(object):
+ """
+ Dummy logger object. Does nothing.
+ """
+ @classmethod
+ def debug(*args, **kwargs):
+ pass
+
+ @classmethod
+ def info(*args, **kwargs):
+ pass
+
+ @classmethod
+ def warning(*args, **kwargs):
+ pass
+
+ @classmethod
+ def error(*args, **kwargs):
+ pass
+
+ @classmethod
+ def critical(*args, **kwargs):
+ pass
+
+
+SECLEN = 256
+
+
+def _ord(string_or_int):
+ """
+ Return an int value for the (possible) string passed in argument. This
+ function is for compatibility between python2 and python3, where single
+ element in byte string array is a string or an int respectively.
+ """
+ try:
+ return ord(string_or_int)
+ except TypeError:
+ return string_or_int
+
+
+def _get_raw(dimage):
+ """
+ Try to get contents of the D64 image either it's gzip compressed or not.
+ """
+ raw = None
+ with gzip.open(dimage, 'rb') as fobj:
+ # Although the common approach with gzipped files is to check the
+ # magic number, in this case there is no guarantee that first track
+ # does not contain exactly the same byte sequence as the magic number.
+ # So the only way left is to actually try to uncompress the file.
+ try:
+ raw = fobj.read()
+ except (IOError, OSError):
+ pass
+ if not raw:
+ with open(dimage, 'rb') as fobj:
+ raw = fobj.read()
+
+ return raw
+
+
+def _get_implementation(disk):
+ """
+ Check the file under fname and return right class for creating an object
+ corresponding for the file
+ """
+ len_map = {822400: D81, # 80 tracks
+ 819200: D81, # 80 tracks, 3200 error bytes
+ 349696: D71, # 70 tracks
+ 351062: D71, # 70 tracks, 1366 error bytes
+ 174848: D64, # usual d64 disc image, 35 tracks, no errors
+ 175531: D64, # 35 track, 683 error bytes
+ 196608: D64, # 40 track, no errors
+ 197376: D64} # 40 track, 768 error bytes
+
+ if disk[:32].startswith(b'C64'):
+ return # T64
+
+ return len_map.get(len(disk))(disk)
+
+
+class Disk(object):
+ """
+ Represent common disk interface
+ """
+ CHAR_MAP = {32: ' ', 33: '!', 34: '"', 35: '#', 37: '%', 38: '&', 39: "'",
+ 40: '(', 41: ')', 42: '*', 43: '+', 44: ',', 45: '-', 46: '.',
+ 47: '/', 48: '0', 49: '1', 50: '2', 51: '3', 52: '4', 53: '5',
+ 54: '6', 55: '7', 56: '8', 57: '9', 59: ';', 60: '<', 61: '=',
+ 62: '>', 63: '?', 64: '@', 65: 'a', 66: 'b', 67: 'c', 68: 'd',
+ 69: 'e', 70: 'f', 71: 'g', 72: 'h', 73: 'i', 74: 'j', 75: 'k',
+ 76: 'l', 77: 'm', 78: 'n', 79: 'o', 80: 'p', 81: 'q', 82: 'r',
+ 83: 's', 84: 't', 85: 'u', 86: 'v', 87: 'w', 88: 'x', 89: 'y',
+ 90: 'z', 91: '[', 93: ']', 97: 'A', 98: 'B', 99: 'C',
+ 100: 'D', 101: 'E', 102: 'F', 103: 'G', 104: 'H', 105: 'I',
+ 106: 'J', 107: 'K', 108: 'L', 109: 'M', 110: 'N', 111: 'O',
+ 112: 'P', 113: 'Q', 114: 'R', 115: 'S', 116: 'T', 117: 'U',
+ 118: 'V', 119: 'W', 120: 'X', 121: 'Y', 122: 'Z', 193: 'A',
+ 194: 'B', 195: 'C', 196: 'D', 197: 'E', 198: 'F', 199: 'G',
+ 200: 'H', 201: 'I', 202: 'J', 203: 'K', 204: 'L', 205: 'M',
+ 206: 'N', 207: 'O', 208: 'P', 209: 'Q', 210: 'R', 211: 'S',
+ 212: 'T', 213: 'U', 214: 'V', 215: 'W', 216: 'X', 217: 'Y',
+ 218: 'Z'}
+
+ FILE_TYPES = {0b000: 'del',
+ 0b001: 'seq',
+ 0b010: 'prg',
+ 0b011: 'usr',
+ 0b100: 'rel'}
+
+ DIR_TRACK = 18
+ DIR_SECTOR = 1
+
+ def __init__(self, raw):
+ """
+ Init
+ """
+ self.raw = raw
+ self.current_sector_data = None
+ self.next_sector = 0
+ self.next_track = None
+ self._dir_contents = []
+ self._already_done = []
+
+ def _map_filename(self, string):
+ """
+ Transcode filename to ASCII compatible. Replace not supported
+ characters with jokers.
+ """
+
+ filename = list()
+
+ for chr_ in string:
+ if _ord(chr_) == 160: # shift+space character; $a0
+ break
+
+ character = D64.CHAR_MAP.get(_ord(chr_), '?')
+ filename.append(character)
+
+ # special cases
+ if filename[0] == "-":
+ filename[0] = "?"
+
+ LOG.debug("string: ``%s'' mapped to: ``%s''", string,
+ "".join(filename))
+ return "".join(filename)
+
+ def _go_to_next_sector(self):
+ """
+ Fetch (if exist) next sector from a directory chain
+ Return False if the chain ends, True otherwise
+ """
+
+ # Well, self.next_sector _should_ have value $FF, but apparently there
+ # are the cases where it is not, therefore checking for that will not
+ # be performed and value of $00 on the next track will end the
+ # directory
+ if self.next_track == 0:
+ LOG.debug("End of directory")
+ return False
+
+ if self.next_track is None:
+ LOG.debug("Going to the track: %s, %s", self.DIR_TRACK,
+ self.DIR_SECTOR)
+ offset = self._get_offset(self.DIR_TRACK, self.DIR_SECTOR)
+ else:
+ offset = self._get_offset(self.next_track, self.next_sector)
+ LOG.debug("Going to the track: %s,%s", self.next_track,
+ self.next_sector)
+
+ self.current_sector_data = self.raw[offset:offset + SECLEN]
+
+ # Guard for reading data out of bound - that happened for discs which
+ # store only raw data, even on directory track
+ if not self.current_sector_data:
+ return False
+
+ self.next_track = _ord(self.current_sector_data[0])
+ self.next_sector = _ord(self.current_sector_data[1])
+
+ if (self.next_track, self.next_sector) in self._already_done:
+ # Just a failsafe. Endless loop is not what is expected.
+ LOG.debug("Loop in track/sector pointer at %d,%d",
+ self.next_track, self.next_sector)
+ self._already_done = []
+ return False
+
+ self._already_done.append((self.next_track, self.next_sector))
+ LOG.debug("Next track: %s,%s", self.next_track, self.next_sector)
+ return True
+
+ def _get_ftype(self, num):
+ """
+ Get filetype as a string
+ """
+ return D64.FILE_TYPES.get(int("%d%d%d" % (num & 4 and 1,
+ num & 2 and 1,
+ num & 1), 2), '???')
+
+ def _get_offset(self, track, sector):
+ """
+ Return offset (in bytes) for specified track and sector.
+ """
+ return 0
+
+ def _harvest_entries(self):
+ """
+ Traverse through sectors and store entries in _dir_contents
+ """
+ sector = self.current_sector_data
+ for dummy in range(8):
+ entry = sector[:32]
+ ftype = _ord(entry[2])
+
+ if ftype == 0: # deleted
+ sector = sector[32:]
+ continue
+
+ type_verbose = self._get_ftype(ftype)
+
+ protect = _ord(entry[2]) & 64 and "<" or " "
+ fname = entry[5:21]
+ if ftype == 'rel':
+ size = _ord(entry[23])
+ else:
+ size = _ord(entry[30]) + _ord(entry[31]) * 226
+
+ self._dir_contents.append({'fname': self._map_filename(fname),
+ 'ftype': type_verbose,
+ 'size': size,
+ 'protect': protect})
+ sector = sector[32:]
+
+ def list_dir(self):
+ """
+ Return directory list as list of dict with keys:
+ fname, ftype, protect and size
+ """
+ while self._go_to_next_sector():
+ self._harvest_entries()
+
+ return self._dir_contents
+
+
+class D64(Disk):
+ """
+ Implement d64 directory reader
+ """
+
+ def _get_offset(self, track, sector):
+ """
+ Return offset (in bytes) for specified track and sector.
+
+ Track Sectors/track # Tracks
+ ----- ------------- ---------
+ 1-17 21 17
+ 18-24 19 7
+ 25-30 18 6
+ 31-40 17 10
+ """
+ offset = 0
+ truncate_track = 0
+
+ if track > 17:
+ offset = 17 * 21 * SECLEN
+ truncate_track = 17
+
+ if track > 24:
+ offset += 7 * 19 * SECLEN
+ truncate_track = 24
+
+ if track > 30:
+ offset += 6 * 18 * SECLEN
+ truncate_track = 30
+
+ track = track - truncate_track
+ offset += track * sector * SECLEN
+
+ return offset
+
+
+class D71(Disk):
+ """
+ Implement d71 directory reader
+ """
+
+ def _get_offset(self, track, sector):
+ """
+ Return offset (in bytes) for specified track and sector.
+
+ Track Sec/trk # Tracks
+ -------------- ------- ---------
+ 1-17 (side 0) 21 17
+ 18-24 (side 0) 19 7
+ 25-30 (side 0) 18 6
+ 31-35 (side 0) 17 5
+ 36-52 (side 1) 21 17
+ 53-59 (side 1) 19 7
+ 60-65 (side 1) 18 6
+ 66-70 (side 1) 17 5
+ """
+ offset = 0
+ truncate_track = 0
+
+ if track > 17:
+ offset = 17 * 21 * SECLEN
+ truncate_track = 17
+
+ if track > 24:
+ offset += 7 * 19 * SECLEN
+ truncate_track = 24
+
+ if track > 30:
+ offset += 6 * 18 * SECLEN
+ truncate_track = 30
+
+ if track > 35:
+ offset += 5 * 17 * SECLEN
+ truncate_track = 35
+
+ if track > 52:
+ offset = 17 * 21 * SECLEN
+ truncate_track = 17
+
+ if track > 59:
+ offset += 7 * 19 * SECLEN
+ truncate_track = 24
+
+ if track > 65:
+ offset += 6 * 18 * SECLEN
+ truncate_track = 30
+
+ track = track - truncate_track
+ offset += track * sector * SECLEN
+
+ return offset
+
+
+class D81(Disk):
+ """
+ Implement d81 directory reader
+ """
+ DIR_TRACK = 40
+ DIR_SECTOR = 3
+ FILE_TYPES = {0b000: 'del',
+ 0b001: 'seq',
+ 0b010: 'prg',
+ 0b011: 'usr',
+ 0b100: 'rel',
+ 0b101: 'cbm'}
+
+ def _get_offset(self, track, sector):
+ """
+ Return offset (in bytes) for specified track and sector. In d81 is
+ easy, since we have 80 tracks with 40 sectors for 256 bytes each.
+ """
+ # we wan to go to the beginning (first sector) of the track, not it's
+ # max, so that we need to extract its amount.
+ return (track * 40 - 40) * SECLEN + sector * SECLEN
+
+
+class Uc1541(object):
+ """
+ Class for interact with c1541 program and MC
+ """
+ PRG = re.compile(r'(\d+)\s+"([^"]*)".+?\s(del|prg|rel|seq|usr)([\s<])')
+
+ def __init__(self, archname):
+ self.arch = archname
+ self.out = ''
+ self.err = ''
+ self._verbose = os.getenv("UC1541_VERBOSE", False)
+ self._hide_del = os.getenv("UC1541_HIDE_DEL", False)
+
+ self.dirlist = _get_implementation(_get_raw(archname)).list_dir()
+ self.file_map = {}
+ self.directory = []
+
+ def list(self):
+ """
+ Output list contents of D64 image.
+ Convert filenames to be Unix filesystem friendly
+ Add suffix to show user what kind of file do he dealing with.
+ """
+ LOG.info("List contents of %s", self.arch)
+ directory = self._get_dir()
+
+ # If there is an error reading directory, show the reason to the user
+ if self.out.startswith("Error"):
+ sys.stderr.write(self.out.split("\n")[0] + "\n")
+ return 2
+
+ for entry in directory:
+ sys.stdout.write("%(perms)s 1 %(uid)-8d %(gid)-8d %(size)8d "
+ "Jan 01 1980 %(display_name)s\n" % entry)
+ return 0
+
+ def rm(self, dst):
+ """
+ Remove file from D64 image
+ """
+ LOG.info("Removing file %s", dst)
+ dst = self._get_masked_fname(dst)
+
+ if not self._call_command('delete', dst=dst):
+ return self._show_error()
+
+ return 0
+
+ def copyin(self, dst, src):
+ """
+ Copy file to the D64 image. Destination filename has to be corrected.
+ """
+ LOG.info("Copy into D64 %s as %s", src, dst)
+ dst = self._correct_fname(dst)
+
+ if not self._call_command('write', src=src, dst=dst):
+ return self._show_error()
+
+ return 0
+
+ def copyout(self, src, dst):
+ """
+ Copy file form the D64 image. Source filename has to be corrected,
+ since it's representation differ from the real one inside D64 image.
+ """
+ LOG.info("Copy form D64 %s as %s", src, dst)
+ if not src.endswith(".prg"):
+ return "cannot read"
+
+ src = self._get_masked_fname(src)
+
+ if not self._call_command('read', src=src, dst=dst):
+ return self._show_error()
+
+ return 0
+
+ def mkdir(self, dirname):
+ """Not supported"""
+ self.err = "D64 format doesn't support directories"
+ return self._show_error()
+
+ def run(self, fname):
+ """Not supported"""
+ self.err = "Not supported, unless you are using MC on real C64 ;)"
+ return self._show_error()
+
+ def _correct_fname(self, fname):
+ """
+ Return filename with mapped characters, without .prg extension.
+ Characters like $, *, + in filenames are perfectly legal, but c1541
+ program seem to have issues with it while writing, so it will also be
+ replaced.
+ """
+ char_map = {'|': "/",
+ "\\": "/",
+ "~": " ",
+ "$": "?",
+ "*": "?"}
+
+ if fname.lower().endswith(".prg"):
+ fname = fname[:-4]
+
+ new_fname = []
+ for char in fname:
+ trans = char_map.get(char)
+ new_fname.append(trans if trans else char)
+
+ return "".join(new_fname)
+
+ def _get_masked_fname(self, fname):
+ """
+ Return masked filename with '?' jokers instead of non ASCII
+ characters, useful for copying or deleting files with c1541. In case
+ of several files with same name exists in directory, only first one
+ will be operative (first as appeared in directory).
+
+ Warning! If there are two different names but the only difference is in
+ non-ASCII characters (some PET ASCII or control characters) there is
+ a risk that one can remove both files.
+ """
+ directory = self._get_dir()
+
+ for entry in directory:
+ if entry['display_name'] == fname:
+ return entry['pattern_name']
+
+ def _get_dir(self):
+ """
+ Retrieve directory via c1541 program
+ """
+ directory = []
+
+ uid = os.getuid()
+ gid = os.getgid()
+
+ if not self._call_command('list'):
+ return self._show_error()
+
+ idx = 0
+ for line in self.out.split("\n"):
+ if Uc1541.PRG.match(line):
+ blocks, fname, ext, rw = Uc1541.PRG.match(line).groups()
+
+ if ext == 'del' and self._hide_del:
+ continue
+
+ display_name = ".".join([fname, ext])
+ pattern_name = self.dirlist[idx]['fname']
+
+ if '/' in display_name:
+ display_name = display_name.replace('/', '|')
+
+ # workaround for space and dash at the beggining of the
+ # filename
+ char_map = {' ': '~',
+ '-': '_'}
+ display_name = "".join([char_map.get(display_name[0],
+ display_name[0]),
+ display_name[1:]])
+
+ if ext == 'del':
+ perms = "----------"
+ else:
+ perms = "-r%s-r--r--" % (rw.strip() and "-" or "w")
+
+ directory.append({'pattern_name': pattern_name,
+ 'display_name': display_name,
+ 'uid': uid,
+ 'gid': gid,
+ 'size': int(blocks) * SECLEN,
+ 'perms': perms})
+ idx += 1
+ return directory
+
+ def _show_error(self):
+ """
+ Pass out error output from c1541 execution
+ """
+ if self._verbose:
+ return self.err
+ else:
+ return 1
+
+ def _call_command(self, cmd, src=None, dst=None):
+ """
+ Return status of the provided command, which can be one of:
+ write
+ read
+ delete
+ dir/list
+ """
+ command = ['c1541', '-attach', self.arch, '-%s' % cmd]
+ if src:
+ command.append(src)
+ if dst:
+ command.append(dst)
+
+ LOG.debug('executing command: %s', ' '.join(command))
+ # For some reason using write and delete commands and reading output
+ # confuses Python3 beneath MC and as a consequence MC report an
+ # error...therefore for those commands let's not use
+ # universal_newlines...
+ universal_newlines = True
+ if cmd in ['delete', 'write']:
+ universal_newlines = False
+ self.out, self.err = Popen(command,
+ universal_newlines=universal_newlines,
+ stdout=PIPE, stderr=PIPE).communicate()
+
+ if self.err:
+ LOG.debug('an err: %s', self.err)
+ return not self.err
+
+
+CALL_MAP = {'list': lambda a: Uc1541(a.arch).list(),
+ 'copyin': lambda a: Uc1541(a.arch).copyin(a.src, a.dst),
+ 'copyout': lambda a: Uc1541(a.arch).copyout(a.src, a.dst),
+ 'mkdir': lambda a: Uc1541(a.arch).mkdir(a.dst),
+ 'rm': lambda a: Uc1541(a.arch).rm(a.dst),
+ 'run': lambda a: Uc1541(a.arch).run(a.dst)}
+
+
+def parse_args():
+ """Use ArgumentParser to check for script arguments and execute."""
+ parser = ArgumentParser()
+ subparsers = parser.add_subparsers(help='supported commands',
+ dest='subcommand')
+ subparsers.required = True
+ parser_list = subparsers.add_parser('list', help="List contents of D64 "
+ "image")
+ parser_copyin = subparsers.add_parser('copyin', help="Copy file into D64 "
+ "image")
+ parser_copyout = subparsers.add_parser('copyout', help="Copy file out of "
+ "D64 image")
+ parser_rm = subparsers.add_parser('rm', help="Delete file from D64 image")
+ parser_mkdir = subparsers.add_parser('mkdir', help="Create directory in "
+ "archive")
+ parser_run = subparsers.add_parser('run', help="Execute archived file")
+
+ parser_list.add_argument('arch', help="D64 Image filename")
+ parser_list.set_defaults(func=CALL_MAP['list'])
+
+ parser_copyin.add_argument('arch', help="D64 Image filename")
+ parser_copyin.add_argument('src', help="Source filename")
+ parser_copyin.add_argument('dst', help="Destination filename (to be "
+ "written into D64 image)")
+ parser_copyin.set_defaults(func=CALL_MAP['copyin'])
+
+ parser_copyout.add_argument('arch', help="D64 Image filename")
+ parser_copyout.add_argument('src', help="Source filename (to be read from"
+ " D64 image")
+ parser_copyout.add_argument('dst', help="Destination filename")
+ parser_copyout.set_defaults(func=CALL_MAP['copyout'])
+
+ parser_rm.add_argument('arch', help="D64 Image filename")
+ parser_rm.add_argument('dst', help="File inside D64 image to be deleted")
+ parser_rm.set_defaults(func=CALL_MAP['rm'])
+
+ parser_mkdir.add_argument('arch', help="archive filename")
+ parser_mkdir.add_argument('dst', help="Directory name inside archive to "
+ "be created")
+ parser_mkdir.set_defaults(func=CALL_MAP['mkdir'])
+
+ parser_run.add_argument('arch', help="archive filename")
+ parser_run.add_argument('dst', help="File to be executed")
+ parser_run.set_defaults(func=CALL_MAP['run'])
+
+ args = parser.parse_args()
+ return args.func(args)
+
+
+def no_parse():
+ """Failsafe argument "parsing". Note, that it blindly takes positional
+ arguments without checking them. In case of wrong arguments it will
+ silently exit"""
+ try:
+ if sys.argv[1] not in ('list', 'copyin', 'copyout', 'rm', 'mkdir',
+ "run"):
+ sys.exit(2)
+ except IndexError:
+ sys.exit(2)
+
+ class Arg(object):
+ """Mimic argparse object"""
+ dst = None
+ src = None
+ arch = None
+
+ arg = Arg()
+
+ try:
+ arg.arch = sys.argv[2]
+ if sys.argv[1] in ('copyin', 'copyout'):
+ arg.src = sys.argv[3]
+ arg.dst = sys.argv[4]
+ elif sys.argv[1] in ('rm', 'run', 'mkdir'):
+ arg.dst = sys.argv[3]
+ except IndexError:
+ sys.exit(2)
+
+ return CALL_MAP[sys.argv[1]](arg)
+
+
+if __name__ == "__main__":
+ LOG.debug("Script params: %s", str(sys.argv))
+ try:
+ from argparse import ArgumentParser
+ PARSE_FUNC = parse_args
+ except ImportError:
+ PARSE_FUNC = no_parse
+
+ sys.exit(PARSE_FUNC())