First checkin of the Python XPCOM bindings.
This commit is contained in:
298
extensions/python/xpcom/file.py
Normal file
298
extensions/python/xpcom/file.py
Normal file
@@ -0,0 +1,298 @@
|
||||
# Copyright (c) 2000-2001 ActiveState Tool Corporation.
|
||||
# See the file LICENSE.txt for licensing information.
|
||||
|
||||
"""Implementation of Python file objects for Mozilla/xpcom.
|
||||
|
||||
Introduction:
|
||||
This module defines various class that are implemented using
|
||||
Mozilla streams. This allows you to open Mozilla URI's, and
|
||||
treat them as Python file object.
|
||||
|
||||
Example:
|
||||
>>> file = URIFile("chrome://whatever")
|
||||
>>> data = file.read(5) # Pass no arg to read everything.
|
||||
|
||||
Known Limitations:
|
||||
* Not all URL schemes will work from "python.exe" - most notably
|
||||
"chrome://" and "http://" URLs - this is because a simple initialization of
|
||||
xpcom by Python does not load up the full set of Mozilla URL handlers.
|
||||
If you can work out how to correctly initialize the chrome registry and
|
||||
setup a message queue.
|
||||
|
||||
Known Bugs:
|
||||
* Only read ("r") mode is supported. Although write ("w") mode doesnt make
|
||||
sense for HTTP type URLs, it potentially does for file:// etc type ones.
|
||||
* No concept of text mode vs binary mode. It appears Mozilla takes care of
|
||||
this internally (ie, all "text/???" mime types are text, rest are binary)
|
||||
|
||||
"""
|
||||
|
||||
from xpcom import components, Exception, _xpcom
|
||||
import os
|
||||
import threading # for locks.
|
||||
|
||||
NS_RDONLY = components.interfaces.nsIFileChannel.NS_RDONLY
|
||||
NS_WRONLY = components.interfaces.nsIFileChannel.NS_WRONLY
|
||||
NS_RDWR = components.interfaces.nsIFileChannel.NS_RDWR
|
||||
NS_CREATE_FILE = components.interfaces.nsIFileChannel.NS_CREATE_FILE
|
||||
NS_APPEND = components.interfaces.nsIFileChannel.NS_APPEND
|
||||
NS_TRUNCATE = components.interfaces.nsIFileChannel.NS_TRUNCATE
|
||||
NS_SYNC = components.interfaces.nsIFileChannel.NS_SYNC
|
||||
NS_EXCL = components.interfaces.nsIFileChannel.NS_EXCL
|
||||
|
||||
# A helper function - you pass a progID, an interface, plus optionally the
|
||||
# name of the "constructor" and its args.
|
||||
def _construct(progid, interface, ctor = None, *args):
|
||||
assert (ctor is None and not args) or (ctor is not None and args), \
|
||||
"no point having a ctor with no args, or if you provide args, there must be a ctor!"
|
||||
instance = components.classes[progid] \
|
||||
.createInstance(interface)
|
||||
if ctor is not None:
|
||||
ctor = getattr(instance, ctor)
|
||||
apply(ctor, args)
|
||||
return instance
|
||||
|
||||
# A helper function that may come in useful
|
||||
def LocalFileToURL(localFileName):
|
||||
"Convert a filename to an XPCOM nsIFileURL object."
|
||||
# Create an nsILocalFile
|
||||
localFile = components.classes["@mozilla.org/file/local;1"] \
|
||||
.createInstance(components.interfaces.nsILocalFile)
|
||||
localFile.initWithPath(localFileName)
|
||||
|
||||
# Create a standard URL, but we QI for the FileURL interface!
|
||||
url = components.classes["@mozilla.org/network/standard-url;1"] \
|
||||
.createInstance(components.interfaces.nsIFileURL)
|
||||
# Setting the "file" attribute causes initialization...
|
||||
url.file = localFile
|
||||
return url
|
||||
|
||||
# A base class for file objects.
|
||||
class _File:
|
||||
def __init__(self, name_thingy = None, mode="r"):
|
||||
self.lockob = threading.Lock()
|
||||
self.inputStream = self.outputStream = None
|
||||
if name_thingy is not None:
|
||||
self.init(name_thingy, mode)
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
# The Moz file streams are not thread safe.
|
||||
def _lock(self):
|
||||
self.lockob.acquire()
|
||||
def _release(self):
|
||||
self.lockob.release()
|
||||
def read(self, n = -1):
|
||||
assert self.inputStream is not None, "Not setup for read!"
|
||||
self._lock()
|
||||
try:
|
||||
return str(self.inputStream.read(n))
|
||||
finally:
|
||||
self._release()
|
||||
|
||||
def readlines(self):
|
||||
# Not part of the xpcom interface, but handy for direct Python users.
|
||||
# Not 100% faithful, but near enough for now!
|
||||
lines = self.read().split("\n")
|
||||
if len(lines) and len(lines[-1]) == 0:
|
||||
lines = lines[:-1]
|
||||
return [s+"\n" for s in lines ]
|
||||
|
||||
def write(self, data):
|
||||
assert self.outputStream is not None, "Not setup for write!"
|
||||
self._lock()
|
||||
try:
|
||||
self.outputStream.write(data, len(data))
|
||||
finally:
|
||||
self._release()
|
||||
|
||||
def close(self):
|
||||
self._lock()
|
||||
try:
|
||||
if self.inputStream is not None:
|
||||
self.inputStream.close()
|
||||
self.inputStream = None
|
||||
if self.outputStream is not None:
|
||||
self.outputStream.close()
|
||||
self.outputStream = None
|
||||
self.channel = None
|
||||
# leave self.fileInst alone - it should be available after close.
|
||||
finally:
|
||||
self._release()
|
||||
|
||||
def flush(self):
|
||||
self._lock()
|
||||
try:
|
||||
if self.outputStream is not None: self.outputStream.flush()
|
||||
finally:
|
||||
self._release()
|
||||
|
||||
# A synchronous "file object" used to open a URI.
|
||||
class URIFile(_File):
|
||||
def init(self, url, mode="r"):
|
||||
self.close()
|
||||
if mode != "r":
|
||||
raise ValueError, "only 'r' mode supported'"
|
||||
io_service = components.classes["@mozilla.org/network/io-service;1"] \
|
||||
.createInstance("nsIIOService")
|
||||
if hasattr(url, "queryInterface"):
|
||||
url_ob = url
|
||||
else:
|
||||
url_ob = components.classes["@mozilla.org/network/standard-url;1"] \
|
||||
.createInstance(components.interfaces.nsIURL)
|
||||
url_ob.spec = url
|
||||
# Mozilla asserts and starts saying "NULL POINTER" if this is wrong!
|
||||
if not url_ob.scheme:
|
||||
raise ValueError, ("The URI '%s' is invalid (no scheme)"
|
||||
% (url_ob.spec,))
|
||||
self.channel = io_service.newChannelFromURI(url_ob)
|
||||
self.inputStream = self.channel.openInputStream()
|
||||
|
||||
# A "file object" implemented using Netscape's native file support.
|
||||
# Based on io.js - http://lxr.mozilla.org/seamonkey/source/xpcom/tests/utils/io.js
|
||||
# You open this file using a local file name (as a string) so it really is pointless -
|
||||
# you may as well be using a standard Python file object!
|
||||
class LocalFile(_File):
|
||||
def init(self, name, mode = "r"):
|
||||
name = os.path.abspath(name) # Moz libraries under Linux fail with relative paths.
|
||||
self.close()
|
||||
self.fileInst = _construct('@mozilla.org/file/local;1', "nsILocalFile", "initWithPath", name)
|
||||
self.inputStream = None
|
||||
self.channel = None
|
||||
if mode in ["w","a"]:
|
||||
if mode== "w":
|
||||
if self.fileInst.exists():
|
||||
self.fileInst.delete(0)
|
||||
moz_mode = NS_CREATE_FILE | NS_WRONLY
|
||||
elif mode=="a":
|
||||
moz_mode = NS_APPEND
|
||||
else:
|
||||
assert 0, "Can't happen!"
|
||||
perms = 0644
|
||||
if not self.fileInst.exists():
|
||||
self.fileInst.create(components.interfaces.nsILocalFile.NORMAL_FILE_TYPE, 0644)
|
||||
self.channel = _construct('@mozilla.org/network/local-file-channel;1', "nsIFileChannel", "init",
|
||||
self.fileInst, moz_mode, perms)
|
||||
self.outputStream = self.channel.openOutputStream()
|
||||
self.fileInst.permissions = perms
|
||||
|
||||
elif mode == "r":
|
||||
self.channel = _construct('@mozilla.org/network/local-file-channel;1', "nsIFileChannel", "init",
|
||||
self.fileInst, NS_RDONLY, self.fileInst.permissions)
|
||||
self.inputStream = self.channel.openInputStream()
|
||||
else:
|
||||
raise ValueError, "Unknown mode"
|
||||
|
||||
def read(self, n = -1):
|
||||
if n == -1:
|
||||
n = self.fileInst.fileSize
|
||||
return _File.read(self, n)
|
||||
|
||||
|
||||
##########################################################
|
||||
##
|
||||
## Test Code
|
||||
##
|
||||
##########################################################
|
||||
def _DoTestRead(file, expected):
|
||||
# read in a couple of chunks, just to test that our various arg combinations work.
|
||||
got = file.read(3)
|
||||
got = got + file.read(300)
|
||||
got = got + file.read(0)
|
||||
got = got + file.read()
|
||||
if got != expected:
|
||||
raise RuntimeError, "Reading '%s' failed - got %d bytes, but expected %d bytes" % (file, len(got), len(expected))
|
||||
|
||||
def _DoTestBufferRead(file, expected):
|
||||
# read in a couple of chunks, just to test that our various arg combinations work.
|
||||
buffer = _xpcom.AllocateBuffer(50)
|
||||
got = ''
|
||||
while 1:
|
||||
# Note - we need to reach into the file object so we
|
||||
# can get at the native buffer supported function.
|
||||
num = file.inputStream.read(buffer)
|
||||
if num == 0:
|
||||
break
|
||||
got = got + str(buffer[:num])
|
||||
if got != expected:
|
||||
raise RuntimeError, "Reading '%s' failed - got %d bytes, but expected %d bytes" % (file, len(got), len(expected))
|
||||
|
||||
def _TestLocalFile():
|
||||
import tempfile, os
|
||||
fname = tempfile.mktemp()
|
||||
data = "Hello from Python"
|
||||
test_file = LocalFile(fname, "w")
|
||||
try:
|
||||
test_file.write(data)
|
||||
test_file.close()
|
||||
# Make sure Python can read it OK.
|
||||
f = open(fname, "r")
|
||||
if f.read() != data:
|
||||
print "Eeek - Python could not read the data back correctly!"
|
||||
f.close()
|
||||
# For the sake of the test, try a re-init.
|
||||
test_file.init(fname, "r")
|
||||
got = str(test_file.read())
|
||||
if got != data:
|
||||
print "Read the wrong data back - %r" % (got,)
|
||||
else:
|
||||
print "Read the correct data."
|
||||
test_file.close()
|
||||
# Try reading in chunks.
|
||||
test_file = LocalFile(fname, "r")
|
||||
got = test_file.read(10) + test_file.read()
|
||||
if got != data:
|
||||
print "Chunks the wrong data back - %r" % (got,)
|
||||
else:
|
||||
print "Chunks read the correct data."
|
||||
test_file.close()
|
||||
# XXX - todo - test "a" mode!
|
||||
finally:
|
||||
try:
|
||||
os.unlink(fname)
|
||||
except OSError, details:
|
||||
print "Error removing temp test file:", details
|
||||
|
||||
def _TestAll():
|
||||
# A mini test suite.
|
||||
# Get a test file, and convert it to a file:// URI.
|
||||
# check what we read is the same as when
|
||||
# we read this file "normally"
|
||||
fname = components.__file__
|
||||
if fname[-1] in "cCoO": # fix .pyc/.pyo
|
||||
fname = fname[:-1]
|
||||
expected = open(fname, "rb").read()
|
||||
# convert the fname to a URI.
|
||||
url = LocalFileToURL(fname)
|
||||
# First try passing a URL as a string.
|
||||
_DoTestRead( URIFile( url.spec), expected)
|
||||
print "Open as string test worked."
|
||||
# Now with a URL object.
|
||||
_DoTestRead( URIFile( url ), expected)
|
||||
print "Open as URL test worked."
|
||||
|
||||
_DoTestBufferRead( URIFile( url ), expected)
|
||||
print "File test using buffers worked."
|
||||
|
||||
# For the sake of testing, do our pointless, demo object!
|
||||
_DoTestRead( LocalFile(fname), expected )
|
||||
print "Local file read test worked."
|
||||
|
||||
# Now do the full test of our pointless, demo object!
|
||||
_TestLocalFile()
|
||||
|
||||
def _TestURI(url):
|
||||
test_file = URIFile(url)
|
||||
print "Opened file is", test_file
|
||||
got = test_file.read()
|
||||
print "Read %d bytes of data from %r" % (len(got), url)
|
||||
test_file.close()
|
||||
|
||||
if __name__=='__main__':
|
||||
import sys
|
||||
if len(sys.argv) < 2:
|
||||
print "No URL specified on command line - performing self-test"
|
||||
_TestAll()
|
||||
else:
|
||||
_TestURI(sys.argv[1])
|
||||
Reference in New Issue
Block a user