summaryrefslogtreecommitdiffstats
path: root/src/libs/xpcom18a4/xpcom/io/nsStreamUtils.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/libs/xpcom18a4/xpcom/io/nsStreamUtils.cpp583
1 files changed, 583 insertions, 0 deletions
diff --git a/src/libs/xpcom18a4/xpcom/io/nsStreamUtils.cpp b/src/libs/xpcom18a4/xpcom/io/nsStreamUtils.cpp
new file mode 100644
index 00000000..aeb52b2a
--- /dev/null
+++ b/src/libs/xpcom18a4/xpcom/io/nsStreamUtils.cpp
@@ -0,0 +1,583 @@
+/* ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0/LGPL 2.1
+ *
+ * The contents of this file are subject to the Mozilla Public License Version
+ * 1.1 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS" basis,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+ * for the specific language governing rights and limitations under the
+ * License.
+ *
+ * The Original Code is Mozilla.
+ *
+ * The Initial Developer of the Original Code is
+ * Netscape Communications Corporation.
+ * Portions created by the Initial Developer are Copyright (C) 2002
+ * the Initial Developer. All Rights Reserved.
+ *
+ * Contributor(s):
+ * Darin Fisher <darin@netscape.com>
+ *
+ * Alternatively, the contents of this file may be used under the terms of
+ * either the GNU General Public License Version 2 or later (the "GPL"), or
+ * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
+ * in which case the provisions of the GPL or the LGPL are applicable instead
+ * of those above. If you wish to allow use of your version of this file only
+ * under the terms of either the GPL or the LGPL, and not to allow others to
+ * use your version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the notice
+ * and other provisions required by the GPL or the LGPL. If you do not delete
+ * the provisions above, a recipient may use your version of this file under
+ * the terms of any one of the MPL, the GPL or the LGPL.
+ *
+ * ***** END LICENSE BLOCK ***** */
+
+#include "nsStreamUtils.h"
+#include "nsCOMPtr.h"
+#include "nsIPipe.h"
+#include "nsIEventTarget.h"
+#include "nsAutoLock.h"
+
+//-----------------------------------------------------------------------------
+
+class nsInputStreamReadyEvent : public PLEvent
+ , public nsIInputStreamCallback
+{
+public:
+ NS_DECL_ISUPPORTS
+
+ nsInputStreamReadyEvent(nsIInputStreamCallback *callback,
+ nsIEventTarget *target)
+ : mCallback(callback)
+ , mTarget(target)
+ {
+ }
+
+private:
+ ~nsInputStreamReadyEvent()
+ {
+ if (mCallback) {
+ nsresult rv;
+ //
+ // whoa!! looks like we never posted this event. take care to
+ // release mCallback on the correct thread. if mTarget lives on the
+ // calling thread, then we are ok. otherwise, we have to try to
+ // proxy the Release over the right thread. if that thread is dead,
+ // then there's nothing we can do... better to leak than crash.
+ //
+ PRBool val;
+ rv = mTarget->IsOnCurrentThread(&val);
+ if (NS_FAILED(rv) || !val) {
+ nsCOMPtr<nsIInputStreamCallback> event;
+ NS_NewInputStreamReadyEvent(getter_AddRefs(event), mCallback, mTarget);
+ mCallback = 0;
+ if (event) {
+ rv = event->OnInputStreamReady(nsnull);
+ if (NS_FAILED(rv)) {
+ NS_NOTREACHED("leaking stream event");
+ nsISupports *sup = event;
+ NS_ADDREF(sup);
+ }
+ }
+ }
+ }
+ }
+
+public:
+ NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream *stream)
+ {
+ mStream = stream;
+
+ // will be released when event is handled
+ NS_ADDREF_THIS();
+
+ PL_InitEvent(this, nsnull, EventHandler, EventCleanup);
+
+ if (NS_FAILED(mTarget->PostEvent(this))) {
+ NS_WARNING("PostEvent failed");
+ NS_RELEASE_THIS();
+ return NS_ERROR_FAILURE;
+ }
+
+ return NS_OK;
+ }
+
+private:
+ nsCOMPtr<nsIAsyncInputStream> mStream;
+ nsCOMPtr<nsIInputStreamCallback> mCallback;
+ nsCOMPtr<nsIEventTarget> mTarget;
+
+ PR_STATIC_CALLBACK(void *) EventHandler(PLEvent *plevent)
+ {
+ nsInputStreamReadyEvent *ev = (nsInputStreamReadyEvent *) plevent;
+ // bypass event delivery if this is a cleanup event...
+ if (ev->mCallback)
+ ev->mCallback->OnInputStreamReady(ev->mStream);
+ ev->mCallback = 0;
+ return NULL;
+ }
+
+ PR_STATIC_CALLBACK(void) EventCleanup(PLEvent *plevent)
+ {
+ nsInputStreamReadyEvent *ev = (nsInputStreamReadyEvent *) plevent;
+ NS_RELEASE(ev);
+ }
+};
+
+NS_IMPL_THREADSAFE_ISUPPORTS1(nsInputStreamReadyEvent,
+ nsIInputStreamCallback)
+
+//-----------------------------------------------------------------------------
+
+class nsOutputStreamReadyEvent : public PLEvent
+ , public nsIOutputStreamCallback
+{
+public:
+ NS_DECL_ISUPPORTS
+
+ nsOutputStreamReadyEvent(nsIOutputStreamCallback *callback,
+ nsIEventTarget *target)
+ : mCallback(callback)
+ , mTarget(target)
+ {
+ }
+
+private:
+ ~nsOutputStreamReadyEvent()
+ {
+ if (mCallback) {
+ nsresult rv;
+ //
+ // whoa!! looks like we never posted this event. take care to
+ // release mCallback on the correct thread. if mTarget lives on the
+ // calling thread, then we are ok. otherwise, we have to try to
+ // proxy the Release over the right thread. if that thread is dead,
+ // then there's nothing we can do... better to leak than crash.
+ //
+ PRBool val;
+ rv = mTarget->IsOnCurrentThread(&val);
+ if (NS_FAILED(rv) || !val) {
+ nsCOMPtr<nsIOutputStreamCallback> event;
+ NS_NewOutputStreamReadyEvent(getter_AddRefs(event), mCallback, mTarget);
+ mCallback = 0;
+ if (event) {
+ rv = event->OnOutputStreamReady(nsnull);
+ if (NS_FAILED(rv)) {
+ NS_NOTREACHED("leaking stream event");
+ nsISupports *sup = event;
+ NS_ADDREF(sup);
+ }
+ }
+ }
+ }
+ }
+
+public:
+ void Init(nsIOutputStreamCallback *callback, nsIEventTarget *target)
+ {
+ mCallback = callback;
+ mTarget = target;
+
+ PL_InitEvent(this, nsnull, EventHandler, EventCleanup);
+ }
+
+ NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream *stream)
+ {
+ mStream = stream;
+
+ // this will be released when the event is handled
+ NS_ADDREF_THIS();
+
+ PL_InitEvent(this, nsnull, EventHandler, EventCleanup);
+
+ if (NS_FAILED(mTarget->PostEvent(this))) {
+ NS_WARNING("PostEvent failed");
+ NS_RELEASE_THIS();
+ return NS_ERROR_FAILURE;
+ }
+
+ return NS_OK;
+ }
+
+private:
+ nsCOMPtr<nsIAsyncOutputStream> mStream;
+ nsCOMPtr<nsIOutputStreamCallback> mCallback;
+ nsCOMPtr<nsIEventTarget> mTarget;
+
+ PR_STATIC_CALLBACK(void *) EventHandler(PLEvent *plevent)
+ {
+ nsOutputStreamReadyEvent *ev = (nsOutputStreamReadyEvent *) plevent;
+ if (ev->mCallback)
+ ev->mCallback->OnOutputStreamReady(ev->mStream);
+ ev->mCallback = 0;
+ return NULL;
+ }
+
+ PR_STATIC_CALLBACK(void) EventCleanup(PLEvent *ev)
+ {
+ nsOutputStreamReadyEvent *event = (nsOutputStreamReadyEvent *) ev;
+ NS_RELEASE(event);
+ }
+};
+
+NS_IMPL_THREADSAFE_ISUPPORTS1(nsOutputStreamReadyEvent,
+ nsIOutputStreamCallback)
+
+//-----------------------------------------------------------------------------
+
+NS_COM nsresult
+NS_NewInputStreamReadyEvent(nsIInputStreamCallback **event,
+ nsIInputStreamCallback *callback,
+ nsIEventTarget *target)
+{
+ nsInputStreamReadyEvent *ev = new nsInputStreamReadyEvent(callback, target);
+ if (!ev)
+ return NS_ERROR_OUT_OF_MEMORY;
+ NS_ADDREF(*event = ev);
+ return NS_OK;
+}
+
+NS_COM nsresult
+NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback **event,
+ nsIOutputStreamCallback *callback,
+ nsIEventTarget *target)
+{
+ nsOutputStreamReadyEvent *ev = new nsOutputStreamReadyEvent(callback, target);
+ if (!ev)
+ return NS_ERROR_OUT_OF_MEMORY;
+ NS_ADDREF(*event = ev);
+ return NS_OK;
+}
+
+//-----------------------------------------------------------------------------
+// NS_AsyncCopy implementation
+
+// abstract stream copier...
+class nsAStreamCopier : public nsIInputStreamCallback
+ , public nsIOutputStreamCallback
+{
+public:
+ NS_DECL_ISUPPORTS
+
+ nsAStreamCopier()
+ : mLock(nsnull)
+ , mCallback(nsnull)
+ , mClosure(nsnull)
+ , mChunkSize(0)
+ , mEventInProcess(PR_FALSE)
+ , mEventIsPending(PR_FALSE)
+ {
+ }
+
+ // virtual since subclasses call superclass Release()
+ virtual ~nsAStreamCopier()
+ {
+ if (mLock)
+ PR_DestroyLock(mLock);
+ }
+
+ // kick off the async copy...
+ nsresult Start(nsIInputStream *source,
+ nsIOutputStream *sink,
+ nsIEventTarget *target,
+ nsAsyncCopyCallbackFun callback,
+ void *closure,
+ PRUint32 chunksize)
+ {
+ mSource = source;
+ mSink = sink;
+ mTarget = target;
+ mCallback = callback;
+ mClosure = closure;
+ mChunkSize = chunksize;
+
+ mLock = PR_NewLock();
+ if (!mLock)
+ return NS_ERROR_OUT_OF_MEMORY;
+
+ mAsyncSource = do_QueryInterface(mSource);
+ mAsyncSink = do_QueryInterface(mSink);
+
+ return PostContinuationEvent();
+ }
+
+ // implemented by subclasses, returns number of bytes copied and
+ // sets source and sink condition before returning.
+ virtual PRUint32 DoCopy(nsresult *sourceCondition, nsresult *sinkCondition) = 0;
+
+ void Process()
+ {
+ if (!mSource || !mSink)
+ return;
+
+ nsresult sourceCondition, sinkCondition;
+
+ // ok, copy data from source to sink.
+ for (;;) {
+ PRUint32 n = DoCopy(&sourceCondition, &sinkCondition);
+ if (NS_FAILED(sourceCondition) || NS_FAILED(sinkCondition) || n == 0) {
+ if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
+ // need to wait for more data from source. while waiting for
+ // more source data, be sure to observe failures on output end.
+ mAsyncSource->AsyncWait(this, 0, 0, nsnull);
+
+ if (mAsyncSink)
+ mAsyncSink->AsyncWait(this,
+ nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
+ 0, nsnull);
+ }
+ else if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
+ // need to wait for more room in the sink. while waiting for
+ // more room in the sink, be sure to observer failures on the
+ // input end.
+ mAsyncSink->AsyncWait(this, 0, 0, nsnull);
+
+ if (mAsyncSource)
+ mAsyncSource->AsyncWait(this,
+ nsIAsyncInputStream::WAIT_CLOSURE_ONLY,
+ 0, nsnull);
+ }
+ else {
+ // close source
+ if (mAsyncSource)
+ mAsyncSource->CloseWithStatus(sinkCondition);
+ else
+ mSource->Close();
+ mAsyncSource = nsnull;
+ mSource = nsnull;
+
+ // close sink
+ if (mAsyncSink)
+ mAsyncSink->CloseWithStatus(sourceCondition);
+ else
+ mSink->Close();
+ mAsyncSink = nsnull;
+ mSink = nsnull;
+
+ // notify state complete...
+ if (mCallback) {
+ nsresult status = sourceCondition;
+ if (NS_SUCCEEDED(status))
+ status = sinkCondition;
+ if (status == NS_BASE_STREAM_CLOSED)
+ status = NS_OK;
+ mCallback(mClosure, status);
+ }
+ }
+ break;
+ }
+ }
+ }
+
+ NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream *source)
+ {
+ PostContinuationEvent();
+ return NS_OK;
+ }
+
+ NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream *sink)
+ {
+ PostContinuationEvent();
+ return NS_OK;
+ }
+
+ PR_STATIC_CALLBACK(void*) HandleContinuationEvent(PLEvent *event)
+ {
+ nsAStreamCopier *self = (nsAStreamCopier *) event->owner;
+ self->Process();
+
+ // clear "in process" flag and post any pending continuation event
+ nsAutoLock lock(self->mLock);
+ self->mEventInProcess = PR_FALSE;
+ if (self->mEventIsPending) {
+ self->mEventIsPending = PR_FALSE;
+ self->PostContinuationEvent_Locked();
+ }
+ return nsnull;
+ }
+
+ PR_STATIC_CALLBACK(void) DestroyContinuationEvent(PLEvent *event)
+ {
+ nsAStreamCopier *self = (nsAStreamCopier *) event->owner;
+ NS_RELEASE(self);
+ delete event;
+ }
+
+ nsresult PostContinuationEvent()
+ {
+ // we cannot post a continuation event if there is currently
+ // an event in process. doing so could result in Process being
+ // run simultaneously on multiple threads, so we mark the event
+ // as pending, and if an event is already in process then we
+ // just let that existing event take care of posting the real
+ // continuation event.
+
+ nsAutoLock lock(mLock);
+ return PostContinuationEvent_Locked();
+ }
+
+ nsresult PostContinuationEvent_Locked()
+ {
+ nsresult rv = NS_OK;
+ if (mEventInProcess)
+ mEventIsPending = PR_TRUE;
+ else {
+ PLEvent *event = new PLEvent;
+ if (!event)
+ rv = NS_ERROR_OUT_OF_MEMORY;
+ else {
+ NS_ADDREF_THIS();
+ PL_InitEvent(event, this,
+ HandleContinuationEvent,
+ DestroyContinuationEvent);
+
+ rv = mTarget->PostEvent(event);
+ if (NS_SUCCEEDED(rv))
+ mEventInProcess = PR_TRUE;
+ else {
+ NS_ERROR("unable to post continuation event");
+ PL_DestroyEvent(event);
+ }
+ }
+ }
+ return rv;
+ }
+
+protected:
+ nsCOMPtr<nsIInputStream> mSource;
+ nsCOMPtr<nsIOutputStream> mSink;
+ nsCOMPtr<nsIAsyncInputStream> mAsyncSource;
+ nsCOMPtr<nsIAsyncOutputStream> mAsyncSink;
+ nsCOMPtr<nsIEventTarget> mTarget;
+ PRLock *mLock;
+ nsAsyncCopyCallbackFun mCallback;
+ void *mClosure;
+ PRUint32 mChunkSize;
+ PRPackedBool mEventInProcess;
+ PRPackedBool mEventIsPending;
+};
+
+NS_IMPL_THREADSAFE_ISUPPORTS2(nsAStreamCopier,
+ nsIInputStreamCallback,
+ nsIOutputStreamCallback)
+
+class nsStreamCopierIB : public nsAStreamCopier
+{
+public:
+ nsStreamCopierIB() : nsAStreamCopier() {}
+ virtual ~nsStreamCopierIB() {}
+
+ struct ReadSegmentsState {
+ nsIOutputStream *mSink;
+ nsresult mSinkCondition;
+ };
+
+ static NS_METHOD ConsumeInputBuffer(nsIInputStream *inStr,
+ void *closure,
+ const char *buffer,
+ PRUint32 offset,
+ PRUint32 count,
+ PRUint32 *countWritten)
+ {
+ ReadSegmentsState *state = (ReadSegmentsState *) closure;
+
+ nsresult rv = state->mSink->Write(buffer, count, countWritten);
+ if (NS_FAILED(rv))
+ state->mSinkCondition = rv;
+ else if (*countWritten == 0)
+ state->mSinkCondition = NS_BASE_STREAM_CLOSED;
+
+ return state->mSinkCondition;
+ }
+
+ PRUint32 DoCopy(nsresult *sourceCondition, nsresult *sinkCondition)
+ {
+ ReadSegmentsState state;
+ state.mSink = mSink;
+ state.mSinkCondition = NS_OK;
+
+ PRUint32 n;
+ *sourceCondition =
+ mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n);
+ *sinkCondition = state.mSinkCondition;
+ return n;
+ }
+};
+
+class nsStreamCopierOB : public nsAStreamCopier
+{
+public:
+ nsStreamCopierOB() : nsAStreamCopier() {}
+ virtual ~nsStreamCopierOB() {}
+
+ struct WriteSegmentsState {
+ nsIInputStream *mSource;
+ nsresult mSourceCondition;
+ };
+
+ static NS_METHOD FillOutputBuffer(nsIOutputStream *outStr,
+ void *closure,
+ char *buffer,
+ PRUint32 offset,
+ PRUint32 count,
+ PRUint32 *countRead)
+ {
+ WriteSegmentsState *state = (WriteSegmentsState *) closure;
+
+ nsresult rv = state->mSource->Read(buffer, count, countRead);
+ if (NS_FAILED(rv))
+ state->mSourceCondition = rv;
+ else if (*countRead == 0)
+ state->mSourceCondition = NS_BASE_STREAM_CLOSED;
+
+ return state->mSourceCondition;
+ }
+
+ PRUint32 DoCopy(nsresult *sourceCondition, nsresult *sinkCondition)
+ {
+ WriteSegmentsState state;
+ state.mSource = mSource;
+ state.mSourceCondition = NS_OK;
+
+ PRUint32 n;
+ *sinkCondition =
+ mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n);
+ *sourceCondition = state.mSourceCondition;
+ return n;
+ }
+};
+
+//-----------------------------------------------------------------------------
+
+NS_COM nsresult
+NS_AsyncCopy(nsIInputStream *source,
+ nsIOutputStream *sink,
+ nsIEventTarget *target,
+ nsAsyncCopyMode mode,
+ PRUint32 chunkSize,
+ nsAsyncCopyCallbackFun callback,
+ void *closure)
+{
+ NS_ASSERTION(target, "non-null target required");
+
+ nsresult rv;
+ nsAStreamCopier *copier;
+
+ if (mode == NS_ASYNCCOPY_VIA_READSEGMENTS)
+ copier = new nsStreamCopierIB();
+ else
+ copier = new nsStreamCopierOB();
+
+ if (!copier)
+ return NS_ERROR_OUT_OF_MEMORY;
+
+ // Start() takes an owning ref to the copier...
+ NS_ADDREF(copier);
+ rv = copier->Start(source, sink, target, callback, closure, chunkSize);
+ NS_RELEASE(copier);
+
+ return rv;
+}