diff options
Diffstat (limited to 'bin/convwatch.py')
-rw-r--r-- | bin/convwatch.py | 462 |
1 files changed, 462 insertions, 0 deletions
diff --git a/bin/convwatch.py b/bin/convwatch.py new file mode 100644 index 000000000..d88d43d64 --- /dev/null +++ b/bin/convwatch.py @@ -0,0 +1,462 @@ +# -*- tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4 -*- +# +# This file is part of the LibreOffice project. +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# Conversion watch, initially intended to detect if document layout changed since the last time it was run. +# +# Print a set of docs, compare the pdf against the old run and highlight the differences +# + +import getopt +import os +import subprocess +import sys +import time +import uuid +import datetime +import traceback +import threading +try: + from urllib.parse import quote +except ImportError: + from urllib import quote + +try: + import pyuno + import uno + import unohelper +except ImportError: + print("pyuno not found: try to set PYTHONPATH and URE_BOOTSTRAP variables") + print("PYTHONPATH=/installation/opt/program") + print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc") + raise + +try: + from com.sun.star.document import XDocumentEventListener +except ImportError: + print("UNO API class not found: try to set URE_BOOTSTRAP variable") + print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc") + raise + +### utilities ### + +def log(*args): + print(*args, flush=True) + +def partition(list, pred): + left = [] + right = [] + for e in list: + if pred(e): + left.append(e) + else: + right.append(e) + return (left, right) + +def filelist(dir, suffix): + if len(dir) == 0: + raise Exception("filelist: empty directory") + if not(dir[-1] == "/"): + dir += "/" + files = [dir + f for f in os.listdir(dir)] +# log(files) + return [f for f in files + if os.path.isfile(f) and os.path.splitext(f)[1] == suffix] + +def getFiles(dirs, suffix): + files = [] + for dir in dirs: + files += filelist(dir, suffix) + return files + +### UNO utilities ### + +class OfficeConnection: + def __init__(self, args): + self.args = args + self.soffice = None + self.socket = None + self.xContext = None + def setUp(self): + (method, sep, rest) = self.args["--soffice"].partition(":") + if sep != ":": + raise Exception("soffice parameter does not specify method") + if method == "path": + self.socket = "pipe,name=pytest" + str(uuid.uuid1()) + try: + userdir = self.args["--userdir"] + except KeyError: + raise Exception("'path' method requires --userdir") + if not(userdir.startswith("file://")): + raise Exception("--userdir must be file URL") + self.soffice = self.bootstrap(rest, userdir, self.socket) + elif method == "connect": + self.socket = rest + else: + raise Exception("unsupported connection method: " + method) + self.xContext = self.connect(self.socket) + + def bootstrap(self, soffice, userdir, socket): + argv = [ soffice, "--accept=" + socket + ";urp", + "-env:UserInstallation=" + userdir, + "--quickstart=no", + "--norestore", "--nologo", "--headless" ] + if "--valgrind" in self.args: + argv.append("--valgrind") + return subprocess.Popen(argv) + + def connect(self, socket): + xLocalContext = uno.getComponentContext() + xUnoResolver = xLocalContext.ServiceManager.createInstanceWithContext( + "com.sun.star.bridge.UnoUrlResolver", xLocalContext) + url = "uno:" + socket + ";urp;StarOffice.ComponentContext" + log("OfficeConnection: connecting to: " + url) + while True: + try: + xContext = xUnoResolver.resolve(url) + return xContext +# except com.sun.star.connection.NoConnectException + except pyuno.getClass("com.sun.star.connection.NoConnectException"): + log("NoConnectException: sleeping...") + time.sleep(1) + + def tearDown(self): + if self.soffice: + if self.xContext: + try: + log("tearDown: calling terminate()...") + xMgr = self.xContext.ServiceManager + xDesktop = xMgr.createInstanceWithContext( + "com.sun.star.frame.Desktop", self.xContext) + xDesktop.terminate() + log("...done") +# except com.sun.star.lang.DisposedException: + except pyuno.getClass("com.sun.star.beans.UnknownPropertyException"): + log("caught UnknownPropertyException") + pass # ignore, also means disposed + except pyuno.getClass("com.sun.star.lang.DisposedException"): + log("caught DisposedException") + pass # ignore + else: + self.soffice.terminate() + ret = self.soffice.wait() + self.xContext = None + self.socket = None + self.soffice = None + if ret != 0: + raise Exception("Exit status indicates failure: " + str(ret)) +# return ret + +class WatchDog(threading.Thread): + def __init__(self, connection): + threading.Thread.__init__(self, name="WatchDog " + connection.socket) + self.connection = connection + def run(self): + try: + if self.connection.soffice: # not possible for "connect" + self.connection.soffice.wait(timeout=120) # 2 minutes? + except subprocess.TimeoutExpired: + log("WatchDog: TIMEOUT -> killing soffice") + self.connection.soffice.terminate() # actually killing oosplash... + self.connection.xContext = None + log("WatchDog: killed soffice") + +class PerTestConnection: + def __init__(self, args): + self.args = args + self.connection = None + self.watchdog = None + def getContext(self): + return self.connection.xContext + def setUp(self): + assert(not(self.connection)) + def preTest(self): + conn = OfficeConnection(self.args) + conn.setUp() + self.connection = conn + self.watchdog = WatchDog(self.connection) + self.watchdog.start() + def postTest(self): + if self.connection: + try: + self.connection.tearDown() + finally: + self.connection = None + self.watchdog.join() + def tearDown(self): + assert(not(self.connection)) + +class PersistentConnection: + def __init__(self, args): + self.args = args + self.connection = None + def getContext(self): + return self.connection.xContext + def setUp(self): + conn = OfficeConnection(self.args) + conn.setUp() + self.connection = conn + def preTest(self): + assert(self.connection) + def postTest(self): + assert(self.connection) + def tearDown(self): + if self.connection: + try: + self.connection.tearDown() + finally: + self.connection = None + +def simpleInvoke(connection, test): + try: + connection.preTest() + test.run(connection.getContext()) + finally: + connection.postTest() + +def retryInvoke(connection, test): + tries = 5 + while tries > 0: + try: + tries -= 1 + try: + connection.preTest() + test.run(connection.getContext()) + return + finally: + connection.postTest() + except KeyboardInterrupt: + raise # Ctrl+C should work + except: + log("retryInvoke: caught exception") + raise Exception("FAILED retryInvoke") + +def runConnectionTests(connection, invoker, tests): + try: + connection.setUp() + failed = [] + for test in tests: + try: + invoker(connection, test) + except KeyboardInterrupt: + raise # Ctrl+C should work + except: + failed.append(test.file) + estr = traceback.format_exc() + log("... FAILED with exception:\n" + estr) + return failed + finally: + connection.tearDown() + +class EventListener(XDocumentEventListener,unohelper.Base): + def __init__(self): + self.layoutFinished = False + def documentEventOccured(self, event): +# log(str(event.EventName)) + if event.EventName == "OnLayoutFinished": + self.layoutFinished = True + def disposing(event): + pass + +def mkPropertyValue(name, value): + return uno.createUnoStruct("com.sun.star.beans.PropertyValue", + name, 0, value, 0) + +### tests ### + +def loadFromURL(xContext, url): + xDesktop = xContext.ServiceManager.createInstanceWithContext( + "com.sun.star.frame.Desktop", xContext) + props = [("Hidden", True), ("ReadOnly", True)] # FilterName? + loadProps = tuple([mkPropertyValue(name, value) for (name, value) in props]) + xListener = EventListener() + xGEB = xContext.getValueByName( + "/singletons/com.sun.star.frame.theGlobalEventBroadcaster") + xGEB.addDocumentEventListener(xListener) + xDoc = None + try: + xDoc = xDesktop.loadComponentFromURL(url, "_blank", 0, loadProps) + if xDoc is None: + raise Exception("No document loaded?") + time_ = 0 + while time_ < 30: + if xListener.layoutFinished: + return xDoc + log("delaying...") + time_ += 1 + time.sleep(1) + log("timeout: no OnLayoutFinished received") + return xDoc + except: + if xDoc: + log("CLOSING") + xDoc.close(True) + raise + finally: + if xListener: + xGEB.removeDocumentEventListener(xListener) + +def printDoc(xContext, xDoc, url): + props = [ mkPropertyValue("FileName", url) ] +# xDoc.print(props) + uno.invoke(xDoc, "print", (tuple(props),)) # damn, that's a keyword! + busy = True + while busy: + log("printing...") + time.sleep(1) + prt = xDoc.getPrinter() + for value in prt: + if value.Name == "IsBusy": + busy = value.Value + log("...done printing") + +class LoadPrintFileTest: + def __init__(self, file, prtsuffix): + self.file = file + self.prtsuffix = prtsuffix + def run(self, xContext): + start = datetime.datetime.now() + log("Time: " + str(start) + " Loading document: " + self.file) + xDoc = None + try: + if os.name == 'nt' and self.file[1] == ':': + url = "file:///" + self.file[0:2] + quote(self.file[2:]) + else: + url = "file://" + quote(self.file) + xDoc = loadFromURL(xContext, url) + printDoc(xContext, xDoc, url + self.prtsuffix) + finally: + if xDoc: + xDoc.close(True) + end = datetime.datetime.now() + log("...done with: " + self.file + " in: " + str(end - start)) + +def runLoadPrintFileTests(opts, dirs, suffix, reference): + if reference: + prtsuffix = ".pdf.reference" + else: + prtsuffix = ".pdf" + files = getFiles(dirs, suffix) + tests = (LoadPrintFileTest(file, prtsuffix) for file in files) +# connection = PersistentConnection(opts) + connection = PerTestConnection(opts) + failed = runConnectionTests(connection, simpleInvoke, tests) + print("all printed: FAILURES: " + str(len(failed))) + for fail in failed: + print(fail) + return failed + +def mkImages(file, resolution): + argv = [ "gs", "-r" + resolution, "-sOutputFile=" + file + ".%04d.jpeg", + "-dNOPROMPT", "-dNOPAUSE", "-dBATCH", "-sDEVICE=jpeg", file ] + ret = subprocess.check_call(argv) + +def mkAllImages(dirs, suffix, resolution, reference, failed): + if reference: + prtsuffix = ".pdf.reference" + else: + prtsuffix = ".pdf" + for dir in dirs: + files = filelist(dir, suffix) + log(files) + for f in files: + if f in failed: + log("Skipping failed: " + f) + else: + mkImages(f + prtsuffix, resolution) + +def identify(imagefile): + argv = ["identify", "-format", "%k", imagefile] + process = subprocess.Popen(argv, stdout=subprocess.PIPE) + result, _ = process.communicate() + if process.wait() != 0: + raise Exception("identify failed") + if result.partition(b"\n")[0] != b"1": + log("identify result: " + result.decode('utf-8')) + log("DIFFERENCE in " + imagefile) + +def compose(refimagefile, imagefile, diffimagefile): + argv = [ "composite", "-compose", "difference", + refimagefile, imagefile, diffimagefile ] + subprocess.check_call(argv) + +def compareImages(file): + allimages = [f for f in filelist(os.path.dirname(file), ".jpeg") + if f.startswith(file)] +# refimages = [f for f in filelist(os.path.dirname(file), ".jpeg") +# if f.startswith(file + ".reference")] +# log("compareImages: allimages:" + str(allimages)) + (refimages, images) = partition(sorted(allimages), + lambda f: f.startswith(file + ".pdf.reference")) +# log("compareImages: images" + str(images)) + for (image, refimage) in zip(images, refimages): + compose(image, refimage, image + ".diff") + identify(image + ".diff") + if (len(images) != len(refimages)): + log("DIFFERENT NUMBER OF IMAGES FOR: " + file) + +def compareAllImages(dirs, suffix): + log("compareAllImages...") + for dir in dirs: + files = filelist(dir, suffix) +# log("compareAllImages:" + str(files)) + for f in files: + compareImages(f) + log("...compareAllImages done") + + +def parseArgs(argv): + (optlist,args) = getopt.getopt(argv[1:], "hr", + ["help", "soffice=", "userdir=", "reference", "valgrind"]) +# print optlist + return (dict(optlist), args) + +def usage(): + message = """usage: {program} [option]... [directory]..." + -h | --help: print usage information + -r | --reference: generate new reference files (otherwise: compare) + --soffice=method:location + specify soffice instance to connect to + supported methods: 'path', 'connect' + --userdir=URL specify user installation directory for 'path' method + --valgrind pass --valgrind to soffice for 'path' method""" + print(message.format(program = os.path.basename(sys.argv[0]))) + +def checkTools(): + try: + subprocess.check_output(["gs", "--version"]) + except: + print("Cannot execute 'gs'. Please install ghostscript.") + sys.exit(1) + try: + subprocess.check_output(["composite", "-version"]) + subprocess.check_output(["identify", "-version"]) + except: + print("Cannot execute 'composite' or 'identify'.") + print("Please install ImageMagick.") + sys.exit(1) + +if __name__ == "__main__": + checkTools() + (opts,args) = parseArgs(sys.argv) + if len(args) == 0: + usage() + sys.exit(1) + if "-h" in opts or "--help" in opts: + usage() + sys.exit() + elif "--soffice" in opts: + reference = "-r" in opts or "--reference" in opts + failed = runLoadPrintFileTests(opts, args, ".odt", reference) + mkAllImages(args, ".odt", "200", reference, failed) + if not(reference): + compareAllImages(args, ".odt") + else: + usage() + sys.exit(1) + +# vim: set shiftwidth=4 softtabstop=4 expandtab: |