diff options
Diffstat (limited to 'src/vfs/extfs/helpers/uc1541')
-rwxr-xr-x | src/vfs/extfs/helpers/uc1541 | 702 |
1 files changed, 702 insertions, 0 deletions
diff --git a/src/vfs/extfs/helpers/uc1541 b/src/vfs/extfs/helpers/uc1541 new file mode 100755 index 0000000..1da649f --- /dev/null +++ b/src/vfs/extfs/helpers/uc1541 @@ -0,0 +1,702 @@ +#!/usr/bin/env python +""" +UC1541 Virtual filesystem + +Author: Roman 'gryf' Dobosz <gryf73@gmail.com> +Date: 2019-09-20 +Version: 3.3 +Licence: BSD +source: https://bitbucket.org/gryf/uc1541 +mirror: https://github.com/gryf/uc1541 +""" + +import sys +import re +import os +import gzip +from subprocess import Popen, PIPE + +if os.getenv('UC1541_DEBUG'): + import logging + LOG = logging.getLogger('UC1541') + LOG.setLevel(logging.DEBUG) + FILE_HANDLER = logging.FileHandler("/tmp/uc1541.log") + FILE_FORMATTER = logging.Formatter("%(asctime)s %(levelname)-8s " + "%(lineno)s %(funcName)s - %(message)s") + FILE_HANDLER.setFormatter(FILE_FORMATTER) + FILE_HANDLER.setLevel(logging.DEBUG) + LOG.addHandler(FILE_HANDLER) +else: + class LOG(object): + """ + Dummy logger object. Does nothing. + """ + @classmethod + def debug(*args, **kwargs): + pass + + @classmethod + def info(*args, **kwargs): + pass + + @classmethod + def warning(*args, **kwargs): + pass + + @classmethod + def error(*args, **kwargs): + pass + + @classmethod + def critical(*args, **kwargs): + pass + + +SECLEN = 256 + + +def _ord(string_or_int): + """ + Return an int value for the (possible) string passed in argument. This + function is for compatibility between python2 and python3, where single + element in byte string array is a string or an int respectively. + """ + try: + return ord(string_or_int) + except TypeError: + return string_or_int + + +def _get_raw(dimage): + """ + Try to get contents of the D64 image either it's gzip compressed or not. + """ + raw = None + with gzip.open(dimage, 'rb') as fobj: + # Although the common approach with gzipped files is to check the + # magic number, in this case there is no guarantee that first track + # does not contain exactly the same byte sequence as the magic number. + # So the only way left is to actually try to uncompress the file. + try: + raw = fobj.read() + except (IOError, OSError): + pass + if not raw: + with open(dimage, 'rb') as fobj: + raw = fobj.read() + + return raw + + +def _get_implementation(disk): + """ + Check the file under fname and return right class for creating an object + corresponding for the file + """ + len_map = {822400: D81, # 80 tracks + 819200: D81, # 80 tracks, 3200 error bytes + 349696: D71, # 70 tracks + 351062: D71, # 70 tracks, 1366 error bytes + 174848: D64, # usual d64 disc image, 35 tracks, no errors + 175531: D64, # 35 track, 683 error bytes + 196608: D64, # 40 track, no errors + 197376: D64} # 40 track, 768 error bytes + + if disk[:32].startswith(b'C64'): + return # T64 + + return len_map.get(len(disk))(disk) + + +class Disk(object): + """ + Represent common disk interface + """ + CHAR_MAP = {32: ' ', 33: '!', 34: '"', 35: '#', 37: '%', 38: '&', 39: "'", + 40: '(', 41: ')', 42: '*', 43: '+', 44: ',', 45: '-', 46: '.', + 47: '/', 48: '0', 49: '1', 50: '2', 51: '3', 52: '4', 53: '5', + 54: '6', 55: '7', 56: '8', 57: '9', 59: ';', 60: '<', 61: '=', + 62: '>', 63: '?', 64: '@', 65: 'a', 66: 'b', 67: 'c', 68: 'd', + 69: 'e', 70: 'f', 71: 'g', 72: 'h', 73: 'i', 74: 'j', 75: 'k', + 76: 'l', 77: 'm', 78: 'n', 79: 'o', 80: 'p', 81: 'q', 82: 'r', + 83: 's', 84: 't', 85: 'u', 86: 'v', 87: 'w', 88: 'x', 89: 'y', + 90: 'z', 91: '[', 93: ']', 97: 'A', 98: 'B', 99: 'C', + 100: 'D', 101: 'E', 102: 'F', 103: 'G', 104: 'H', 105: 'I', + 106: 'J', 107: 'K', 108: 'L', 109: 'M', 110: 'N', 111: 'O', + 112: 'P', 113: 'Q', 114: 'R', 115: 'S', 116: 'T', 117: 'U', + 118: 'V', 119: 'W', 120: 'X', 121: 'Y', 122: 'Z', 193: 'A', + 194: 'B', 195: 'C', 196: 'D', 197: 'E', 198: 'F', 199: 'G', + 200: 'H', 201: 'I', 202: 'J', 203: 'K', 204: 'L', 205: 'M', + 206: 'N', 207: 'O', 208: 'P', 209: 'Q', 210: 'R', 211: 'S', + 212: 'T', 213: 'U', 214: 'V', 215: 'W', 216: 'X', 217: 'Y', + 218: 'Z'} + + FILE_TYPES = {0b000: 'del', + 0b001: 'seq', + 0b010: 'prg', + 0b011: 'usr', + 0b100: 'rel'} + + DIR_TRACK = 18 + DIR_SECTOR = 1 + + def __init__(self, raw): + """ + Init + """ + self.raw = raw + self.current_sector_data = None + self.next_sector = 0 + self.next_track = None + self._dir_contents = [] + self._already_done = [] + + def _map_filename(self, string): + """ + Transcode filename to ASCII compatible. Replace not supported + characters with jokers. + """ + + filename = list() + + for chr_ in string: + if _ord(chr_) == 160: # shift+space character; $a0 + break + + character = D64.CHAR_MAP.get(_ord(chr_), '?') + filename.append(character) + + # special cases + if filename[0] == "-": + filename[0] = "?" + + LOG.debug("string: ``%s'' mapped to: ``%s''", string, + "".join(filename)) + return "".join(filename) + + def _go_to_next_sector(self): + """ + Fetch (if exist) next sector from a directory chain + Return False if the chain ends, True otherwise + """ + + # Well, self.next_sector _should_ have value $FF, but apparently there + # are the cases where it is not, therefore checking for that will not + # be performed and value of $00 on the next track will end the + # directory + if self.next_track == 0: + LOG.debug("End of directory") + return False + + if self.next_track is None: + LOG.debug("Going to the track: %s, %s", self.DIR_TRACK, + self.DIR_SECTOR) + offset = self._get_offset(self.DIR_TRACK, self.DIR_SECTOR) + else: + offset = self._get_offset(self.next_track, self.next_sector) + LOG.debug("Going to the track: %s,%s", self.next_track, + self.next_sector) + + self.current_sector_data = self.raw[offset:offset + SECLEN] + + # Guard for reading data out of bound - that happened for discs which + # store only raw data, even on directory track + if not self.current_sector_data: + return False + + self.next_track = _ord(self.current_sector_data[0]) + self.next_sector = _ord(self.current_sector_data[1]) + + if (self.next_track, self.next_sector) in self._already_done: + # Just a failsafe. Endless loop is not what is expected. + LOG.debug("Loop in track/sector pointer at %d,%d", + self.next_track, self.next_sector) + self._already_done = [] + return False + + self._already_done.append((self.next_track, self.next_sector)) + LOG.debug("Next track: %s,%s", self.next_track, self.next_sector) + return True + + def _get_ftype(self, num): + """ + Get filetype as a string + """ + return D64.FILE_TYPES.get(int("%d%d%d" % (num & 4 and 1, + num & 2 and 1, + num & 1), 2), '???') + + def _get_offset(self, track, sector): + """ + Return offset (in bytes) for specified track and sector. + """ + return 0 + + def _harvest_entries(self): + """ + Traverse through sectors and store entries in _dir_contents + """ + sector = self.current_sector_data + for dummy in range(8): + entry = sector[:32] + ftype = _ord(entry[2]) + + if ftype == 0: # deleted + sector = sector[32:] + continue + + type_verbose = self._get_ftype(ftype) + + protect = _ord(entry[2]) & 64 and "<" or " " + fname = entry[5:21] + if ftype == 'rel': + size = _ord(entry[23]) + else: + size = _ord(entry[30]) + _ord(entry[31]) * 226 + + self._dir_contents.append({'fname': self._map_filename(fname), + 'ftype': type_verbose, + 'size': size, + 'protect': protect}) + sector = sector[32:] + + def list_dir(self): + """ + Return directory list as list of dict with keys: + fname, ftype, protect and size + """ + while self._go_to_next_sector(): + self._harvest_entries() + + return self._dir_contents + + +class D64(Disk): + """ + Implement d64 directory reader + """ + + def _get_offset(self, track, sector): + """ + Return offset (in bytes) for specified track and sector. + + Track Sectors/track # Tracks + ----- ------------- --------- + 1-17 21 17 + 18-24 19 7 + 25-30 18 6 + 31-40 17 10 + """ + offset = 0 + truncate_track = 0 + + if track > 17: + offset = 17 * 21 * SECLEN + truncate_track = 17 + + if track > 24: + offset += 7 * 19 * SECLEN + truncate_track = 24 + + if track > 30: + offset += 6 * 18 * SECLEN + truncate_track = 30 + + track = track - truncate_track + offset += track * sector * SECLEN + + return offset + + +class D71(Disk): + """ + Implement d71 directory reader + """ + + def _get_offset(self, track, sector): + """ + Return offset (in bytes) for specified track and sector. + + Track Sec/trk # Tracks + -------------- ------- --------- + 1-17 (side 0) 21 17 + 18-24 (side 0) 19 7 + 25-30 (side 0) 18 6 + 31-35 (side 0) 17 5 + 36-52 (side 1) 21 17 + 53-59 (side 1) 19 7 + 60-65 (side 1) 18 6 + 66-70 (side 1) 17 5 + """ + offset = 0 + truncate_track = 0 + + if track > 17: + offset = 17 * 21 * SECLEN + truncate_track = 17 + + if track > 24: + offset += 7 * 19 * SECLEN + truncate_track = 24 + + if track > 30: + offset += 6 * 18 * SECLEN + truncate_track = 30 + + if track > 35: + offset += 5 * 17 * SECLEN + truncate_track = 35 + + if track > 52: + offset = 17 * 21 * SECLEN + truncate_track = 17 + + if track > 59: + offset += 7 * 19 * SECLEN + truncate_track = 24 + + if track > 65: + offset += 6 * 18 * SECLEN + truncate_track = 30 + + track = track - truncate_track + offset += track * sector * SECLEN + + return offset + + +class D81(Disk): + """ + Implement d81 directory reader + """ + DIR_TRACK = 40 + DIR_SECTOR = 3 + FILE_TYPES = {0b000: 'del', + 0b001: 'seq', + 0b010: 'prg', + 0b011: 'usr', + 0b100: 'rel', + 0b101: 'cbm'} + + def _get_offset(self, track, sector): + """ + Return offset (in bytes) for specified track and sector. In d81 is + easy, since we have 80 tracks with 40 sectors for 256 bytes each. + """ + # we wan to go to the beginning (first sector) of the track, not it's + # max, so that we need to extract its amount. + return (track * 40 - 40) * SECLEN + sector * SECLEN + + +class Uc1541(object): + """ + Class for interact with c1541 program and MC + """ + PRG = re.compile(r'(\d+)\s+"([^"]*)".+?\s(del|prg|rel|seq|usr)([\s<])') + + def __init__(self, archname): + self.arch = archname + self.out = '' + self.err = '' + self._verbose = os.getenv("UC1541_VERBOSE", False) + self._hide_del = os.getenv("UC1541_HIDE_DEL", False) + + self.dirlist = _get_implementation(_get_raw(archname)).list_dir() + self.file_map = {} + self.directory = [] + + def list(self): + """ + Output list contents of D64 image. + Convert filenames to be Unix filesystem friendly + Add suffix to show user what kind of file do he dealing with. + """ + LOG.info("List contents of %s", self.arch) + directory = self._get_dir() + + # If there is an error reading directory, show the reason to the user + if self.out.startswith("Error"): + sys.stderr.write(self.out.split("\n")[0] + "\n") + return 2 + + for entry in directory: + sys.stdout.write("%(perms)s 1 %(uid)-8d %(gid)-8d %(size)8d " + "Jan 01 1980 %(display_name)s\n" % entry) + return 0 + + def rm(self, dst): + """ + Remove file from D64 image + """ + LOG.info("Removing file %s", dst) + dst = self._get_masked_fname(dst) + + if not self._call_command('delete', dst=dst): + return self._show_error() + + return 0 + + def copyin(self, dst, src): + """ + Copy file to the D64 image. Destination filename has to be corrected. + """ + LOG.info("Copy into D64 %s as %s", src, dst) + dst = self._correct_fname(dst) + + if not self._call_command('write', src=src, dst=dst): + return self._show_error() + + return 0 + + def copyout(self, src, dst): + """ + Copy file form the D64 image. Source filename has to be corrected, + since it's representation differ from the real one inside D64 image. + """ + LOG.info("Copy form D64 %s as %s", src, dst) + if not src.endswith(".prg"): + return "cannot read" + + src = self._get_masked_fname(src) + + if not self._call_command('read', src=src, dst=dst): + return self._show_error() + + return 0 + + def mkdir(self, dirname): + """Not supported""" + self.err = "D64 format doesn't support directories" + return self._show_error() + + def run(self, fname): + """Not supported""" + self.err = "Not supported, unless you are using MC on real C64 ;)" + return self._show_error() + + def _correct_fname(self, fname): + """ + Return filename with mapped characters, without .prg extension. + Characters like $, *, + in filenames are perfectly legal, but c1541 + program seem to have issues with it while writing, so it will also be + replaced. + """ + char_map = {'|': "/", + "\\": "/", + "~": " ", + "$": "?", + "*": "?"} + + if fname.lower().endswith(".prg"): + fname = fname[:-4] + + new_fname = [] + for char in fname: + trans = char_map.get(char) + new_fname.append(trans if trans else char) + + return "".join(new_fname) + + def _get_masked_fname(self, fname): + """ + Return masked filename with '?' jokers instead of non ASCII + characters, useful for copying or deleting files with c1541. In case + of several files with same name exists in directory, only first one + will be operative (first as appeared in directory). + + Warning! If there are two different names but the only difference is in + non-ASCII characters (some PET ASCII or control characters) there is + a risk that one can remove both files. + """ + directory = self._get_dir() + + for entry in directory: + if entry['display_name'] == fname: + return entry['pattern_name'] + + def _get_dir(self): + """ + Retrieve directory via c1541 program + """ + directory = [] + + uid = os.getuid() + gid = os.getgid() + + if not self._call_command('list'): + return self._show_error() + + idx = 0 + for line in self.out.split("\n"): + if Uc1541.PRG.match(line): + blocks, fname, ext, rw = Uc1541.PRG.match(line).groups() + + if ext == 'del' and self._hide_del: + continue + + display_name = ".".join([fname, ext]) + pattern_name = self.dirlist[idx]['fname'] + + if '/' in display_name: + display_name = display_name.replace('/', '|') + + # workaround for space and dash at the beggining of the + # filename + char_map = {' ': '~', + '-': '_'} + display_name = "".join([char_map.get(display_name[0], + display_name[0]), + display_name[1:]]) + + if ext == 'del': + perms = "----------" + else: + perms = "-r%s-r--r--" % (rw.strip() and "-" or "w") + + directory.append({'pattern_name': pattern_name, + 'display_name': display_name, + 'uid': uid, + 'gid': gid, + 'size': int(blocks) * SECLEN, + 'perms': perms}) + idx += 1 + return directory + + def _show_error(self): + """ + Pass out error output from c1541 execution + """ + if self._verbose: + return self.err + else: + return 1 + + def _call_command(self, cmd, src=None, dst=None): + """ + Return status of the provided command, which can be one of: + write + read + delete + dir/list + """ + command = ['c1541', '-attach', self.arch, '-%s' % cmd] + if src: + command.append(src) + if dst: + command.append(dst) + + LOG.debug('executing command: %s', ' '.join(command)) + # For some reason using write and delete commands and reading output + # confuses Python3 beneath MC and as a consequence MC report an + # error...therefore for those commands let's not use + # universal_newlines... + universal_newlines = True + if cmd in ['delete', 'write']: + universal_newlines = False + self.out, self.err = Popen(command, + universal_newlines=universal_newlines, + stdout=PIPE, stderr=PIPE).communicate() + + if self.err: + LOG.debug('an err: %s', self.err) + return not self.err + + +CALL_MAP = {'list': lambda a: Uc1541(a.arch).list(), + 'copyin': lambda a: Uc1541(a.arch).copyin(a.src, a.dst), + 'copyout': lambda a: Uc1541(a.arch).copyout(a.src, a.dst), + 'mkdir': lambda a: Uc1541(a.arch).mkdir(a.dst), + 'rm': lambda a: Uc1541(a.arch).rm(a.dst), + 'run': lambda a: Uc1541(a.arch).run(a.dst)} + + +def parse_args(): + """Use ArgumentParser to check for script arguments and execute.""" + parser = ArgumentParser() + subparsers = parser.add_subparsers(help='supported commands', + dest='subcommand') + subparsers.required = True + parser_list = subparsers.add_parser('list', help="List contents of D64 " + "image") + parser_copyin = subparsers.add_parser('copyin', help="Copy file into D64 " + "image") + parser_copyout = subparsers.add_parser('copyout', help="Copy file out of " + "D64 image") + parser_rm = subparsers.add_parser('rm', help="Delete file from D64 image") + parser_mkdir = subparsers.add_parser('mkdir', help="Create directory in " + "archive") + parser_run = subparsers.add_parser('run', help="Execute archived file") + + parser_list.add_argument('arch', help="D64 Image filename") + parser_list.set_defaults(func=CALL_MAP['list']) + + parser_copyin.add_argument('arch', help="D64 Image filename") + parser_copyin.add_argument('src', help="Source filename") + parser_copyin.add_argument('dst', help="Destination filename (to be " + "written into D64 image)") + parser_copyin.set_defaults(func=CALL_MAP['copyin']) + + parser_copyout.add_argument('arch', help="D64 Image filename") + parser_copyout.add_argument('src', help="Source filename (to be read from" + " D64 image") + parser_copyout.add_argument('dst', help="Destination filename") + parser_copyout.set_defaults(func=CALL_MAP['copyout']) + + parser_rm.add_argument('arch', help="D64 Image filename") + parser_rm.add_argument('dst', help="File inside D64 image to be deleted") + parser_rm.set_defaults(func=CALL_MAP['rm']) + + parser_mkdir.add_argument('arch', help="archive filename") + parser_mkdir.add_argument('dst', help="Directory name inside archive to " + "be created") + parser_mkdir.set_defaults(func=CALL_MAP['mkdir']) + + parser_run.add_argument('arch', help="archive filename") + parser_run.add_argument('dst', help="File to be executed") + parser_run.set_defaults(func=CALL_MAP['run']) + + args = parser.parse_args() + return args.func(args) + + +def no_parse(): + """Failsafe argument "parsing". Note, that it blindly takes positional + arguments without checking them. In case of wrong arguments it will + silently exit""" + try: + if sys.argv[1] not in ('list', 'copyin', 'copyout', 'rm', 'mkdir', + "run"): + sys.exit(2) + except IndexError: + sys.exit(2) + + class Arg(object): + """Mimic argparse object""" + dst = None + src = None + arch = None + + arg = Arg() + + try: + arg.arch = sys.argv[2] + if sys.argv[1] in ('copyin', 'copyout'): + arg.src = sys.argv[3] + arg.dst = sys.argv[4] + elif sys.argv[1] in ('rm', 'run', 'mkdir'): + arg.dst = sys.argv[3] + except IndexError: + sys.exit(2) + + return CALL_MAP[sys.argv[1]](arg) + + +if __name__ == "__main__": + LOG.debug("Script params: %s", str(sys.argv)) + try: + from argparse import ArgumentParser + PARSE_FUNC = parse_args + except ImportError: + PARSE_FUNC = no_parse + + sys.exit(PARSE_FUNC()) |