diff options
Diffstat (limited to 'src/libs/xpcom18a4/python/file.py')
-rwxr-xr-x | src/libs/xpcom18a4/python/file.py | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/src/libs/xpcom18a4/python/file.py b/src/libs/xpcom18a4/python/file.py new file mode 100755 index 00000000..59b0da58 --- /dev/null +++ b/src/libs/xpcom18a4/python/file.py @@ -0,0 +1,318 @@ +# ***** BEGIN LICENSE BLOCK ***** +# Version: MPL 1.1/GPL 2.0/LGPL 2.1 +# +# The contents of this file are subject to the Mozilla Public License Version +# 1.1 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.mozilla.org/MPL/ +# +# Software distributed under the License is distributed on an "AS IS" basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. +# +# The Original Code is the Python XPCOM language bindings. +# +# The Initial Developer of the Original Code is +# ActiveState Tool Corp. +# Portions created by the Initial Developer are Copyright (C) 2000, 2001 +# the Initial Developer. All Rights Reserved. +# +# Contributor(s): +# Mark Hammond <MarkH@ActiveState.com> (original author) +# +# Alternatively, the contents of this file may be used under the terms of +# either the GNU General Public License Version 2 or later (the "GPL"), or +# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), +# in which case the provisions of the GPL or the LGPL are applicable instead +# of those above. If you wish to allow use of your version of this file only +# under the terms of either the GPL or the LGPL, and not to allow others to +# use your version of this file under the terms of the MPL, indicate your +# decision by deleting the provisions above and replace them with the notice +# and other provisions required by the GPL or the LGPL. If you do not delete +# the provisions above, a recipient may use your version of this file under +# the terms of any one of the MPL, the GPL or the LGPL. +# +# ***** END LICENSE BLOCK ***** + +"""Implementation of Python file objects for Mozilla/xpcom. + +Introduction: + This module defines various class that are implemented using + Mozilla streams. This allows you to open Mozilla URI's, and + treat them as Python file object. + +Example: +>>> file = URIFile("chrome://whatever") +>>> data = file.read(5) # Pass no arg to read everything. + +Known Limitations: + * Not all URL schemes will work from "python.exe" - most notably + "chrome://" and "http://" URLs - this is because a simple initialization of + xpcom by Python does not load up the full set of Mozilla URL handlers. + If you can work out how to correctly initialize the chrome registry and + setup a message queue. + +Known Bugs: + * Only read ("r") mode is supported. Although write ("w") mode doesnt make + sense for HTTP type URLs, it potentially does for file:// etc type ones. + * No concept of text mode vs binary mode. It appears Mozilla takes care of + this internally (ie, all "text/???" mime types are text, rest are binary) + +""" + +from xpcom import components, Exception, _xpcom +import os +import threading # for locks. + +NS_RDONLY = 0x01 +NS_WRONLY = 0x02 +NS_RDWR = 0x04 +NS_CREATE_FILE = 0x08 +NS_APPEND = 0x10 +NS_TRUNCATE = 0x20 +NS_SYNC = 0x40 +NS_EXCL = 0x80 + +# A helper function that may come in useful +def LocalFileToURL(localFileName): + "Convert a filename to an XPCOM nsIFileURL object." + # Create an nsILocalFile + localFile = components.classes["@mozilla.org/file/local;1"] \ + .createInstance(components.interfaces.nsILocalFile) + localFile.initWithPath(localFileName) + + # Use the IO Service to create the interface, then QI for a FileURL + io_service = components.classes["@mozilla.org/network/io-service;1"] \ + .getService(components.interfaces.nsIIOService) + url = io_service.newFileURI(localFile).queryInterface(components.interfaces.nsIFileURL) + # Setting the "file" attribute causes initialization... + url.file = localFile + return url + +# A base class for file objects. +class _File: + def __init__(self, name_thingy = None, mode="r"): + self.lockob = threading.Lock() + self.inputStream = self.outputStream = None + if name_thingy is not None: + self.init(name_thingy, mode) + + def __del__(self): + self.close() + + # The Moz file streams are not thread safe. + def _lock(self): + self.lockob.acquire() + def _release(self): + self.lockob.release() + def read(self, n = -1): + assert self.inputStream is not None, "Not setup for read!" + self._lock() + try: + return str(self.inputStream.read(n)) + finally: + self._release() + + def readlines(self): + # Not part of the xpcom interface, but handy for direct Python users. + # Not 100% faithful, but near enough for now! + lines = self.read().split("\n") + if len(lines) and len(lines[-1]) == 0: + lines = lines[:-1] + return [s+"\n" for s in lines ] + + def write(self, data): + assert self.outputStream is not None, "Not setup for write!" + self._lock() + try: + self.outputStream.write(data, len(data)) + finally: + self._release() + + def close(self): + self._lock() + try: + if self.inputStream is not None: + self.inputStream.close() + self.inputStream = None + if self.outputStream is not None: + self.outputStream.close() + self.outputStream = None + self.channel = None + finally: + self._release() + + def flush(self): + self._lock() + try: + if self.outputStream is not None: self.outputStream.flush() + finally: + self._release() + +# A synchronous "file object" used to open a URI. +class URIFile(_File): + def init(self, url, mode="r"): + self.close() + if mode != "r": + raise ValueError("only 'r' mode supported") + io_service = components.classes["@mozilla.org/network/io-service;1"] \ + .getService(components.interfaces.nsIIOService) + if hasattr(url, "queryInterface"): + url_ob = url + else: + url_ob = io_service.newURI(url, None, None) + # Mozilla asserts and starts saying "NULL POINTER" if this is wrong! + if not url_ob.scheme: + raise ValueError("The URI '%s' is invalid (no scheme)" + % (url_ob.spec,)) + self.channel = io_service.newChannelFromURI(url_ob) + self.inputStream = self.channel.open() + +# A "file object" implemented using Netscape's native file support. +# Based on io.js - http://lxr.mozilla.org/seamonkey/source/xpcom/tests/utils/io.js +# You open this file using a local file name (as a string) so it really is pointless - +# you may as well be using a standard Python file object! +class LocalFile(_File): + def __init__(self, *args): + self.fileIO = None + _File.__init__(self, *args) + + def init(self, name, mode = "r"): + name = os.path.abspath(name) # Moz libraries under Linux fail with relative paths. + self.close() + file = components.classes['@mozilla.org/file/local;1'].createInstance("nsILocalFile") + file.initWithPath(name) + if mode in ["w","a"]: + self.fileIO = components.classes["@mozilla.org/network/file-output-stream;1"].createInstance("nsIFileOutputStream") + if mode== "w": + if file.exists(): + file.remove(0) + moz_mode = NS_CREATE_FILE | NS_WRONLY + elif mode=="a": + moz_mode = NS_APPEND + else: + assert 0, "Can't happen!" + self.fileIO.init(file, moz_mode, -1,0) + self.outputStream = self.fileIO + elif mode == "r": + self.fileIO = components.classes["@mozilla.org/network/file-input-stream;1"].createInstance("nsIFileInputStream") + self.fileIO.init(file, NS_RDONLY, -1,0) + self.inputStream = components.classes["@mozilla.org/scriptableinputstream;1"].createInstance("nsIScriptableInputStream") + self.inputStream.init(self.fileIO) + else: + raise ValueError("Unknown mode") + + def close(self): + if self.fileIO is not None: + self.fileIO.close() + self.fileIO = None + _File.close(self) + + def read(self, n = -1): + return _File.read(self, n) + + +########################################################## +## +## Test Code +## +########################################################## +def _DoTestRead(file, expected): + # read in a couple of chunks, just to test that our various arg combinations work. + got = file.read(3) + got = got + file.read(300) + got = got + file.read(0) + got = got + file.read() + if got != expected: + raise RuntimeError("Reading '%s' failed - got %d bytes, but expected %d bytes" % (file, len(got), len(expected))) + +def _DoTestBufferRead(file, expected): + # read in a couple of chunks, just to test that our various arg combinations work. + buffer = _xpcom.AllocateBuffer(50) + got = '' + while 1: + # Note - we need to reach into the file object so we + # can get at the native buffer supported function. + num = file.inputStream.read(buffer) + if num == 0: + break + got = got + str(buffer[:num]) + if got != expected: + raise RuntimeError("Reading '%s' failed - got %d bytes, but expected %d bytes" % (file, len(got), len(expected))) + +def _TestLocalFile(): + import tempfile, os + fname = tempfile.mktemp() + data = "Hello from Python" + test_file = LocalFile(fname, "w") + try: + test_file.write(data) + test_file.close() + # Make sure Python can read it OK. + f = open(fname, "r") + assert f.read() == data, "Eeek - Python could not read the data back correctly!" + f.close() + # For the sake of the test, try a re-init. + test_file.init(fname, "r") + got = str(test_file.read()) + assert got == data, got + test_file.close() + # Try reading in chunks. + test_file = LocalFile(fname, "r") + got = test_file.read(10) + test_file.read() + assert got == data, got + test_file.close() + # Open the same file again for writing - this should delete the old one. + if not os.path.isfile(fname): + raise RuntimeError("The file '%s' does not exist, but we are explicitly testing create semantics when it does" % (fname,)) + test_file = LocalFile(fname, "w") + test_file.write(data) + test_file.close() + # Make sure Python can read it OK. + f = open(fname, "r") + assert f.read() == data, "Eeek - Python could not read the data back correctly after recreating an existing file!" + f.close() + + # XXX - todo - test "a" mode! + finally: + os.unlink(fname) + +def _TestAll(): + # A mini test suite. + # Get a test file, and convert it to a file:// URI. + # check what we read is the same as when + # we read this file "normally" + fname = components.__file__ + if fname[-1] in "cCoO": # fix .pyc/.pyo + fname = fname[:-1] + expected = open(fname, "rb").read() + # convert the fname to a URI. + url = LocalFileToURL(fname) + # First try passing a URL as a string. + _DoTestRead( URIFile( url.spec), expected) + # Now with a URL object. + _DoTestRead( URIFile( url ), expected) + + _DoTestBufferRead( URIFile( url ), expected) + + # For the sake of testing, do our pointless, demo object! + _DoTestRead( LocalFile(fname), expected ) + + # Now do the full test of our pointless, demo object! + _TestLocalFile() + +def _TestURI(url): + test_file = URIFile(url) + print("Opened file is", test_file) + got = test_file.read() + print("Read %d bytes of data from %r" % (len(got), url)) + test_file.close() + +if __name__=='__main__': + import sys + if len(sys.argv) < 2: + print("No URL specified on command line - performing self-test") + _TestAll() + else: + _TestURI(sys.argv[1]) |