diff options
Diffstat (limited to '')
-rw-r--r-- | src/libs/xpcom18a4/xpcom/io/nsPipe3.cpp | 1276 |
1 files changed, 1276 insertions, 0 deletions
diff --git a/src/libs/xpcom18a4/xpcom/io/nsPipe3.cpp b/src/libs/xpcom18a4/xpcom/io/nsPipe3.cpp new file mode 100644 index 00000000..be06179f --- /dev/null +++ b/src/libs/xpcom18a4/xpcom/io/nsPipe3.cpp @@ -0,0 +1,1276 @@ +/* ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0/LGPL 2.1 + * + * The contents of this file are subject to the Mozilla Public License Version + * 1.1 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + * for the specific language governing rights and limitations under the + * License. + * + * The Original Code is Mozilla. + * + * The Initial Developer of the Original Code is + * Netscape Communications Corporation. + * Portions created by the Initial Developer are Copyright (C) 2002 + * the Initial Developer. All Rights Reserved. + * + * Contributor(s): + * Darin Fisher <darin@netscape.com> + * + * Alternatively, the contents of this file may be used under the terms of + * either the GNU General Public License Version 2 or later (the "GPL"), or + * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), + * in which case the provisions of the GPL or the LGPL are applicable instead + * of those above. If you wish to allow use of your version of this file only + * under the terms of either the GPL or the LGPL, and not to allow others to + * use your version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the notice + * and other provisions required by the GPL or the LGPL. If you do not delete + * the provisions above, a recipient may use your version of this file under + * the terms of any one of the MPL, the GPL or the LGPL. + * + * ***** END LICENSE BLOCK ***** */ + +#include "nsIPipe.h" +#include "nsIEventTarget.h" +#include "nsISeekableStream.h" +#include "nsSegmentedBuffer.h" +#include "nsStreamUtils.h" +#include "nsAutoLock.h" +#include "nsCOMPtr.h" +#include "nsCRT.h" +#include "prlog.h" +#include "nsInt64.h" + +#if defined(PR_LOGGING) +// +// set NSPR_LOG_MODULES=nsPipe:5 +// +static PRLogModuleInfo *gPipeLog = nsnull; +#define LOG(args) PR_LOG(gPipeLog, PR_LOG_DEBUG, args) +#else +#define LOG(args) +#endif + +#define DEFAULT_SEGMENT_SIZE 4096 +#define DEFAULT_SEGMENT_COUNT 16 + +class nsPipe; +class nsPipeEvents; +class nsPipeInputStream; +class nsPipeOutputStream; + +//----------------------------------------------------------------------------- + +// this class is used to delay notifications until the end of a particular +// scope. it helps avoid the complexity of issuing callbacks while inside +// a critical section. +class nsPipeEvents +{ +public: + nsPipeEvents() { } + ~nsPipeEvents(); + + inline void NotifyInputReady(nsIAsyncInputStream *stream, + nsIInputStreamCallback *callback) + { + NS_ASSERTION(!mInputCallback, "already have an input event"); + mInputStream = stream; + mInputCallback = callback; + } + + inline void NotifyOutputReady(nsIAsyncOutputStream *stream, + nsIOutputStreamCallback *callback) + { + NS_ASSERTION(!mOutputCallback, "already have an output event"); + mOutputStream = stream; + mOutputCallback = callback; + } + +private: + nsCOMPtr<nsIAsyncInputStream> mInputStream; + nsCOMPtr<nsIInputStreamCallback> mInputCallback; + nsCOMPtr<nsIAsyncOutputStream> mOutputStream; + nsCOMPtr<nsIOutputStreamCallback> mOutputCallback; +}; + +//----------------------------------------------------------------------------- + +// the input end of a pipe (allocated as a member of the pipe). +class nsPipeInputStream : public nsIAsyncInputStream + , public nsISeekableStream + , public nsISearchableInputStream +{ +public: + // since this class will be allocated as a member of the pipe, we do not + // need our own ref count. instead, we share the lifetime (the ref count) + // of the entire pipe. this macro is just convenience since it does not + // declare a mRefCount variable; however, don't let the name fool you... + // we are not inheriting from nsPipe ;-) + NS_DECL_ISUPPORTS_INHERITED + + NS_DECL_NSIINPUTSTREAM + NS_DECL_NSIASYNCINPUTSTREAM + NS_DECL_NSISEEKABLESTREAM + NS_DECL_NSISEARCHABLEINPUTSTREAM + + nsPipeInputStream(nsPipe *pipe) + : mPipe(pipe) + , mReaderRefCnt(0) + , mLogicalOffset(0) + , mBlocking(PR_TRUE) + , mBlocked(PR_FALSE) + , mAvailable(0) + , mCallbackFlags(0) + { } + + nsresult Fill(); + void SetNonBlocking(PRBool aNonBlocking) { mBlocking = !aNonBlocking; } + + PRUint32 Available() { return mAvailable; } + void ReduceAvailable(PRUint32 avail) { mAvailable -= avail; } + + // synchronously wait for the pipe to become readable. + nsresult Wait(); + + // these functions return true to indicate that the pipe's monitor should + // be notified, to wake up a blocked reader if any. + PRBool OnInputReadable(PRUint32 bytesWritten, nsPipeEvents &); + PRBool OnInputException(nsresult, nsPipeEvents &); + +private: + nsPipe *mPipe; + + // separate refcnt so that we know when to close the consumer + nsrefcnt mReaderRefCnt; + nsInt64 mLogicalOffset; + PRPackedBool mBlocking; + + // these variables can only be accessed while inside the pipe's monitor + PRPackedBool mBlocked; + PRUint32 mAvailable; + nsCOMPtr<nsIInputStreamCallback> mCallback; + PRUint32 mCallbackFlags; +}; + +//----------------------------------------------------------------------------- + +// the output end of a pipe (allocated as a member of the pipe). +class nsPipeOutputStream : public nsIAsyncOutputStream + , public nsISeekableStream +{ +public: + // since this class will be allocated as a member of the pipe, we do not + // need our own ref count. instead, we share the lifetime (the ref count) + // of the entire pipe. this macro is just convenience since it does not + // declare a mRefCount variable; however, don't let the name fool you... + // we are not inheriting from nsPipe ;-) + NS_DECL_ISUPPORTS_INHERITED + + NS_DECL_NSIOUTPUTSTREAM + NS_DECL_NSIASYNCOUTPUTSTREAM + NS_DECL_NSISEEKABLESTREAM + + nsPipeOutputStream(nsPipe *pipe) + : mPipe(pipe) + , mWriterRefCnt(0) + , mLogicalOffset(0) + , mBlocking(PR_TRUE) + , mBlocked(PR_FALSE) + , mWritable(PR_TRUE) + , mCallbackFlags(0) + { } + + void SetNonBlocking(PRBool aNonBlocking) { mBlocking = !aNonBlocking; } + void SetWritable(PRBool writable) { mWritable = writable; } + + // synchronously wait for the pipe to become writable. + nsresult Wait(); + + // these functions return true to indicate that the pipe's monitor should + // be notified, to wake up a blocked writer if any. + PRBool OnOutputWritable(nsPipeEvents &); + PRBool OnOutputException(nsresult, nsPipeEvents &); + +private: + nsPipe *mPipe; + + // separate refcnt so that we know when to close the producer + nsrefcnt mWriterRefCnt; + nsInt64 mLogicalOffset; + PRPackedBool mBlocking; + + // these variables can only be accessed while inside the pipe's monitor + PRPackedBool mBlocked; + PRPackedBool mWritable; + nsCOMPtr<nsIOutputStreamCallback> mCallback; + PRUint32 mCallbackFlags; +}; + +//----------------------------------------------------------------------------- + +class nsPipe : public nsIPipe +{ +public: + friend class nsPipeInputStream; + friend class nsPipeOutputStream; + + NS_DECL_ISUPPORTS + NS_DECL_NSIPIPE + + // nsPipe methods: + nsPipe(); + +private: + ~nsPipe(); + +public: + // + // methods below may only be called while inside the pipe's monitor + // + + void PeekSegment(PRUint32 n, char *&cursor, char *&limit); + + // + // methods below may be called while outside the pipe's monitor + // + + nsresult GetReadSegment(const char *&segment, PRUint32 &segmentLen); + void AdvanceReadCursor(PRUint32 count); + + nsresult GetWriteSegment(char *&segment, PRUint32 &segmentLen); + void AdvanceWriteCursor(PRUint32 count); + + void OnPipeException(nsresult reason, PRBool outputOnly = PR_FALSE); + +protected: + // We can't inherit from both nsIInputStream and nsIOutputStream + // because they collide on their Close method. Consequently we nest their + // implementations to avoid the extra object allocation. + nsPipeInputStream mInput; + nsPipeOutputStream mOutput; + + PRMonitor* mMonitor; + nsSegmentedBuffer mBuffer; + + char* mReadCursor; + char* mReadLimit; + + PRInt32 mWriteSegment; + char* mWriteCursor; + char* mWriteLimit; + + nsresult mStatus; +}; + +// +// NOTES on buffer architecture: +// +// +-----------------+ - - mBuffer.GetSegment(0) +// | | +// + - - - - - - - - + - - mReadCursor +// |/////////////////| +// |/////////////////| +// |/////////////////| +// |/////////////////| +// +-----------------+ - - mReadLimit +// | +// +-----------------+ +// |/////////////////| +// |/////////////////| +// |/////////////////| +// |/////////////////| +// |/////////////////| +// |/////////////////| +// +-----------------+ +// | +// +-----------------+ - - mBuffer.GetSegment(mWriteSegment) +// |/////////////////| +// |/////////////////| +// |/////////////////| +// + - - - - - - - - + - - mWriteCursor +// | | +// | | +// +-----------------+ - - mWriteLimit +// +// (shaded region contains data) +// +// NOTE: on some systems (notably OS/2), the heap allocator uses an arena for +// small allocations (e.g., 64 byte allocations). this means that buffers may +// be allocated back-to-back. in the diagram above, for example, mReadLimit +// would actually be pointing at the beginning of the next segment. when +// making changes to this file, please keep this fact in mind. +// + +//----------------------------------------------------------------------------- +// nsPipe methods: +//----------------------------------------------------------------------------- + +nsPipe::nsPipe() + : mInput(this) + , mOutput(this) + , mMonitor(nsnull) + , mReadCursor(nsnull) + , mReadLimit(nsnull) + , mWriteSegment(-1) + , mWriteCursor(nsnull) + , mWriteLimit(nsnull) + , mStatus(NS_OK) +{ +} + +nsPipe::~nsPipe() +{ + if (mMonitor) + PR_DestroyMonitor(mMonitor); +} + +NS_IMPL_THREADSAFE_ISUPPORTS1(nsPipe, nsIPipe) + +NS_IMETHODIMP +nsPipe::Init(PRBool nonBlockingIn, + PRBool nonBlockingOut, + PRUint32 segmentSize, + PRUint32 segmentCount, + nsIMemory *segmentAlloc) +{ + mMonitor = PR_NewMonitor(); + if (!mMonitor) + return NS_ERROR_OUT_OF_MEMORY; + + if (segmentSize == 0) + segmentSize = DEFAULT_SEGMENT_SIZE; + if (segmentCount == 0) + segmentCount = DEFAULT_SEGMENT_COUNT; + + // protect against overflow + PRUint32 maxCount = PRUint32(-1) / segmentSize; + if (segmentCount > maxCount) + segmentCount = maxCount; + + nsresult rv = mBuffer.Init(segmentSize, segmentSize * segmentCount, segmentAlloc); + if (NS_FAILED(rv)) + return rv; + + mInput.SetNonBlocking(nonBlockingIn); + mOutput.SetNonBlocking(nonBlockingOut); + return NS_OK; +} + +NS_IMETHODIMP +nsPipe::GetInputStream(nsIAsyncInputStream **aInputStream) +{ + NS_ADDREF(*aInputStream = &mInput); + return NS_OK; +} + +NS_IMETHODIMP +nsPipe::GetOutputStream(nsIAsyncOutputStream **aOutputStream) +{ + NS_ADDREF(*aOutputStream = &mOutput); + return NS_OK; +} + +void +nsPipe::PeekSegment(PRUint32 index, char *&cursor, char *&limit) +{ + if (index == 0) { + NS_ASSERTION(!mReadCursor || mBuffer.GetSegmentCount(), "unexpected state"); + cursor = mReadCursor; + limit = mReadLimit; + } + else { + PRUint32 numSegments = mBuffer.GetSegmentCount(); + if (index >= numSegments) + cursor = limit = nsnull; + else { + cursor = mBuffer.GetSegment(index); + if (mWriteSegment == (PRInt32) index) + limit = mWriteCursor; + else + limit = cursor + mBuffer.GetSegmentSize(); + } + } +} + +nsresult +nsPipe::GetReadSegment(const char *&segment, PRUint32 &segmentLen) +{ + nsAutoMonitor mon(mMonitor); + + if (mReadCursor == mReadLimit) + return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK; + + segment = mReadCursor; + segmentLen = mReadLimit - mReadCursor; + return NS_OK; +} + +void +nsPipe::AdvanceReadCursor(PRUint32 bytesRead) +{ + NS_ASSERTION(bytesRead, "dont call if no bytes read"); + + nsPipeEvents events; + { + nsAutoMonitor mon(mMonitor); + + LOG(("III advancing read cursor by %u\n", bytesRead)); + NS_ASSERTION(bytesRead <= mBuffer.GetSegmentSize(), "read too much"); + + mReadCursor += bytesRead; + NS_ASSERTION(mReadCursor <= mReadLimit, "read cursor exceeds limit"); + + mInput.ReduceAvailable(bytesRead); + + if (mReadCursor == mReadLimit) { + // we've reached the limit of how much we can read from this segment. + // if at the end of this segment, then we must discard this segment. + + // if still writing in this segment then bail because we're not done + // with the segment and have to wait for now... + if (mWriteSegment == 0 && mWriteLimit > mWriteCursor) { + NS_ASSERTION(mReadLimit == mWriteCursor, "unexpected state"); + return; + } + + // shift write segment index (-1 indicates an empty buffer). + --mWriteSegment; + + // done with this segment + mBuffer.DeleteFirstSegment(); + LOG(("III deleting first segment\n")); + + if (mWriteSegment == -1) { + // buffer is completely empty + mReadCursor = nsnull; + mReadLimit = nsnull; + mWriteCursor = nsnull; + mWriteLimit = nsnull; + } + else { + // advance read cursor and limit to next buffer segment + mReadCursor = mBuffer.GetSegment(0); + if (mWriteSegment == 0) + mReadLimit = mWriteCursor; + else + mReadLimit = mReadCursor + mBuffer.GetSegmentSize(); + } + + // we've free'd up a segment, so notify output stream that pipe has + // room for a new segment. + if (mOutput.OnOutputWritable(events)) + mon.Notify(); + } + } +} + +nsresult +nsPipe::GetWriteSegment(char *&segment, PRUint32 &segmentLen) +{ + nsAutoMonitor mon(mMonitor); + + if (NS_FAILED(mStatus)) + return mStatus; + + // write cursor and limit may both be null indicating an empty buffer. + if (mWriteCursor == mWriteLimit) { + char *seg = mBuffer.AppendNewSegment(); + // pipe is full + if (seg == nsnull) + return NS_BASE_STREAM_WOULD_BLOCK; + LOG(("OOO appended new segment\n")); + mWriteCursor = seg; + mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize(); + ++mWriteSegment; + } + + // make sure read cursor is initialized + if (mReadCursor == nsnull) { + NS_ASSERTION(mWriteSegment == 0, "unexpected null read cursor"); + mReadCursor = mReadLimit = mWriteCursor; + } + + // check to see if we can roll-back our read and write cursors to the + // beginning of the current/first segment. this is purely an optimization. + if (mReadCursor == mWriteCursor && mWriteSegment == 0) { + char *head = mBuffer.GetSegment(0); + LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor - head)); + mWriteCursor = mReadCursor = mReadLimit = head; + } + + segment = mWriteCursor; + segmentLen = mWriteLimit - mWriteCursor; + return NS_OK; +} + +void +nsPipe::AdvanceWriteCursor(PRUint32 bytesWritten) +{ + NS_ASSERTION(bytesWritten, "dont call if no bytes written"); + + nsPipeEvents events; + { + nsAutoMonitor mon(mMonitor); + + LOG(("OOO advancing write cursor by %u\n", bytesWritten)); + + char *newWriteCursor = mWriteCursor + bytesWritten; + NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit"); + + // update read limit if reading in the same segment + if (mWriteSegment == 0 && mReadLimit == mWriteCursor) + mReadLimit = newWriteCursor; + + mWriteCursor = newWriteCursor; + + NS_ASSERTION(mReadCursor != mWriteCursor, "read cursor is bad"); + + // update the writable flag on the output stream + if (mWriteCursor == mWriteLimit) { + if (mBuffer.GetSize() >= mBuffer.GetMaxSize()) + mOutput.SetWritable(PR_FALSE); + } + + // notify input stream that pipe now contains additional data + if (mInput.OnInputReadable(bytesWritten, events)) + mon.Notify(); + } +} + +void +nsPipe::OnPipeException(nsresult reason, PRBool outputOnly) +{ + LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n", + reason, outputOnly)); + + nsPipeEvents events; + { + nsAutoMonitor mon(mMonitor); + + // if we've already hit an exception, then ignore this one. + if (NS_FAILED(mStatus)) + return; + + mStatus = reason; + + // an output-only exception applies to the input end if the pipe has + // zero bytes available. + if (outputOnly && !mInput.Available()) + outputOnly = PR_FALSE; + + if (!outputOnly) + if (mInput.OnInputException(reason, events)) + mon.Notify(); + + if (mOutput.OnOutputException(reason, events)) + mon.Notify(); + } +} + +//----------------------------------------------------------------------------- +// nsPipeEvents methods: +//----------------------------------------------------------------------------- + +nsPipeEvents::~nsPipeEvents() +{ + // dispatch any pending events + + if (mInputCallback) { + mInputCallback->OnInputStreamReady(mInputStream); + mInputCallback = 0; + mInputStream = 0; + } + if (mOutputCallback) { + mOutputCallback->OnOutputStreamReady(mOutputStream); + mOutputCallback = 0; + mOutputStream = 0; + } +} + +//----------------------------------------------------------------------------- +// nsPipeInputStream methods: +//----------------------------------------------------------------------------- + +nsresult +nsPipeInputStream::Wait() +{ + NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream"); + + nsAutoMonitor mon(mPipe->mMonitor); + + while (NS_SUCCEEDED(mPipe->mStatus) && (mAvailable == 0)) { + LOG(("III pipe input: waiting for data\n")); + + mBlocked = PR_TRUE; + mon.Wait(); + mBlocked = PR_FALSE; + + LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n", + mPipe->mStatus, mAvailable)); + } + + return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus; +} + +PRBool +nsPipeInputStream::OnInputReadable(PRUint32 bytesWritten, nsPipeEvents &events) +{ + PRBool result = PR_FALSE; + + mAvailable += bytesWritten; + + if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) { + events.NotifyInputReady(this, mCallback); + mCallback = 0; + mCallbackFlags = 0; + } + else if (mBlocked) + result = PR_TRUE; + + return result; +} + +PRBool +nsPipeInputStream::OnInputException(nsresult reason, nsPipeEvents &events) +{ + LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n", + this, reason)); + + PRBool result = PR_FALSE; + + NS_ASSERTION(NS_FAILED(reason), "huh? successful exception"); + + // force count of available bytes to zero. + mAvailable = 0; + + if (mCallback) { + events.NotifyInputReady(this, mCallback); + mCallback = 0; + mCallbackFlags = 0; + } + else if (mBlocked) + result = PR_TRUE; + + return result; +} + +NS_IMETHODIMP_(nsrefcnt) +nsPipeInputStream::AddRef(void) +{ + ++mReaderRefCnt; + return mPipe->AddRef(); +} + +NS_IMETHODIMP_(nsrefcnt) +nsPipeInputStream::Release(void) +{ + if (--mReaderRefCnt == 0) + Close(); + return mPipe->Release(); +} + +NS_IMPL_QUERY_INTERFACE4(nsPipeInputStream, + nsIInputStream, + nsIAsyncInputStream, + nsISeekableStream, + nsISearchableInputStream) + +NS_IMETHODIMP +nsPipeInputStream::CloseWithStatus(nsresult reason) +{ + LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, reason)); + + if (NS_SUCCEEDED(reason)) + reason = NS_BASE_STREAM_CLOSED; + + mPipe->OnPipeException(reason); + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::Close() +{ + return CloseWithStatus(NS_BASE_STREAM_CLOSED); +} + +NS_IMETHODIMP +nsPipeInputStream::Available(PRUint32 *result) +{ + nsAutoMonitor mon(mPipe->mMonitor); + + // return error if pipe closed + if (!mAvailable && NS_FAILED(mPipe->mStatus)) + return mPipe->mStatus; + + *result = mAvailable; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::ReadSegments(nsWriteSegmentFun writer, + void *closure, + PRUint32 count, + PRUint32 *readCount) +{ + LOG(("III ReadSegments [this=%x count=%u]\n", this, count)); + + nsresult rv = NS_OK; + + const char *segment; + PRUint32 segmentLen; + + *readCount = 0; + while (count) { + rv = mPipe->GetReadSegment(segment, segmentLen); + if (NS_FAILED(rv)) { + // ignore this error if we've already read something. + if (*readCount > 0) { + rv = NS_OK; + break; + } + if (rv == NS_BASE_STREAM_WOULD_BLOCK) { + // pipe is empty + if (!mBlocking) + break; + // wait for some data to be written to the pipe + rv = Wait(); + if (NS_SUCCEEDED(rv)) + continue; + } + // ignore this error, just return. + if (rv == NS_BASE_STREAM_CLOSED) { + rv = NS_OK; + break; + } + mPipe->OnPipeException(rv); + break; + } + + // read no more than count + if (segmentLen > count) + segmentLen = count; + + PRUint32 writeCount, originalLen = segmentLen; + while (segmentLen) { + writeCount = 0; + + rv = writer(this, closure, segment, *readCount, segmentLen, &writeCount); + + if (NS_FAILED(rv) || writeCount == 0) { + count = 0; + // any errors returned from the writer end here: do not + // propogate to the caller of ReadSegments. + rv = NS_OK; + break; + } + + NS_ASSERTION(writeCount <= segmentLen, "wrote more than expected"); + segment += writeCount; + segmentLen -= writeCount; + count -= writeCount; + *readCount += writeCount; + mLogicalOffset += writeCount; + } + + if (segmentLen < originalLen) + mPipe->AdvanceReadCursor(originalLen - segmentLen); + } + + return rv; +} + +static NS_METHOD +nsWriteToRawBuffer(nsIInputStream* inStr, + void *closure, + const char *fromRawSegment, + PRUint32 offset, + PRUint32 count, + PRUint32 *writeCount) +{ + char *toBuf = (char*)closure; + memcpy(&toBuf[offset], fromRawSegment, count); + *writeCount = count; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::Read(char* toBuf, PRUint32 bufLen, PRUint32 *readCount) +{ + return ReadSegments(nsWriteToRawBuffer, toBuf, bufLen, readCount); +} + +NS_IMETHODIMP +nsPipeInputStream::IsNonBlocking(PRBool *aNonBlocking) +{ + *aNonBlocking = !mBlocking; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::AsyncWait(nsIInputStreamCallback *callback, + PRUint32 flags, + PRUint32 requestedCount, + nsIEventTarget *target) +{ + LOG(("III AsyncWait [this=%x]\n", this)); + + nsPipeEvents pipeEvents; + { + nsAutoMonitor mon(mPipe->mMonitor); + + // replace a pending callback + mCallback = 0; + mCallbackFlags = 0; + + nsCOMPtr<nsIInputStreamCallback> proxy; + if (target) { + nsresult rv = NS_NewInputStreamReadyEvent(getter_AddRefs(proxy), + callback, target); + if (NS_FAILED(rv)) return rv; + callback = proxy; + } + + if (NS_FAILED(mPipe->mStatus) || + (mAvailable && !(flags & WAIT_CLOSURE_ONLY))) { + // stream is already closed or readable; post event. + pipeEvents.NotifyInputReady(this, callback); + } + else { + // queue up callback object to be notified when data becomes available + mCallback = callback; + mCallbackFlags = flags; + } + } + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::Seek(PRInt32 whence, PRInt64 offset) +{ + NS_NOTREACHED("nsPipeInputStream::Seek"); + return NS_ERROR_NOT_IMPLEMENTED; +} + +NS_IMETHODIMP +nsPipeInputStream::Tell(PRInt64 *offset) +{ + *offset = mLogicalOffset; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeInputStream::SetEOF() +{ + NS_NOTREACHED("nsPipeInputStream::SetEOF"); + return NS_ERROR_NOT_IMPLEMENTED; +} + +#define COMPARE(s1, s2, i) \ + (ignoreCase \ + ? nsCRT::strncasecmp((const char *)s1, (const char *)s2, (PRUint32)i) \ + : nsCRT::strncmp((const char *)s1, (const char *)s2, (PRUint32)i)) + +NS_IMETHODIMP +nsPipeInputStream::Search(const char *forString, + PRBool ignoreCase, + PRBool *found, + PRUint32 *offsetSearchedTo) +{ + LOG(("III Search [for=%s ic=%u]\n", forString, ignoreCase)); + + nsAutoMonitor mon(mPipe->mMonitor); + + char *cursor1, *limit1; + PRUint32 index = 0, offset = 0; + PRUint32 strLen = strlen(forString); + + mPipe->PeekSegment(0, cursor1, limit1); + if (cursor1 == limit1) { + *found = PR_FALSE; + *offsetSearchedTo = 0; + LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); + return NS_OK; + } + + while (PR_TRUE) { + PRUint32 i, len1 = limit1 - cursor1; + + // check if the string is in the buffer segment + for (i = 0; i < len1 - strLen + 1; i++) { + if (COMPARE(&cursor1[i], forString, strLen) == 0) { + *found = PR_TRUE; + *offsetSearchedTo = offset + i; + LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); + return NS_OK; + } + } + + // get the next segment + char *cursor2, *limit2; + PRUint32 len2; + + index++; + offset += len1; + + mPipe->PeekSegment(index, cursor2, limit2); + if (cursor2 == limit2) { + *found = PR_FALSE; + *offsetSearchedTo = offset - strLen + 1; + LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); + return NS_OK; + } + len2 = limit2 - cursor2; + + // check if the string is straddling the next buffer segment + PRUint32 lim = PR_MIN(strLen, len2 + 1); + for (i = 0; i < lim; ++i) { + PRUint32 strPart1Len = strLen - i - 1; + PRUint32 strPart2Len = strLen - strPart1Len; + const char* strPart2 = &forString[strLen - strPart2Len]; + PRUint32 bufSeg1Offset = len1 - strPart1Len; + if (COMPARE(&cursor1[bufSeg1Offset], forString, strPart1Len) == 0 && + COMPARE(cursor2, strPart2, strPart2Len) == 0) { + *found = PR_TRUE; + *offsetSearchedTo = offset - strPart1Len; + LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo)); + return NS_OK; + } + } + + // finally continue with the next buffer + cursor1 = cursor2; + limit1 = limit2; + } + + NS_NOTREACHED("can't get here"); + return NS_ERROR_UNEXPECTED; // keep compiler happy +} + +//----------------------------------------------------------------------------- +// nsPipeOutputStream methods: +//----------------------------------------------------------------------------- + +nsresult +nsPipeOutputStream::Wait() +{ + NS_ASSERTION(mBlocking, "wait on non-blocking pipe output stream"); + + nsAutoMonitor mon(mPipe->mMonitor); + + if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) { + LOG(("OOO pipe output: waiting for space\n")); + mBlocked = PR_TRUE; + mon.Wait(); + mBlocked = PR_FALSE; + LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n", + mPipe->mStatus, mWritable == PR_TRUE)); + } + + return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus; +} + +PRBool +nsPipeOutputStream::OnOutputWritable(nsPipeEvents &events) +{ + PRBool result = PR_FALSE; + + mWritable = PR_TRUE; + + if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) { + events.NotifyOutputReady(this, mCallback); + mCallback = 0; + mCallbackFlags = 0; + } + else if (mBlocked) + result = PR_TRUE; + + return result; +} + +PRBool +nsPipeOutputStream::OnOutputException(nsresult reason, nsPipeEvents &events) +{ + LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n", + this, reason)); + + nsresult result = PR_FALSE; + + NS_ASSERTION(NS_FAILED(reason), "huh? successful exception"); + mWritable = PR_FALSE; + + if (mCallback) { + events.NotifyOutputReady(this, mCallback); + mCallback = 0; + mCallbackFlags = 0; + } + else if (mBlocked) + result = PR_TRUE; + + return result; +} + + +NS_IMETHODIMP_(nsrefcnt) +nsPipeOutputStream::AddRef() +{ + mWriterRefCnt++; + return mPipe->AddRef(); +} + +NS_IMETHODIMP_(nsrefcnt) +nsPipeOutputStream::Release() +{ + if (--mWriterRefCnt == 0) + Close(); + return mPipe->Release(); +} + +NS_IMPL_QUERY_INTERFACE2(nsPipeOutputStream, + nsIOutputStream, + nsIAsyncOutputStream) + +NS_IMETHODIMP +nsPipeOutputStream::CloseWithStatus(nsresult reason) +{ + LOG(("OOO CloseWithStatus [this=%x reason=%x]\n", this, reason)); + + if (NS_SUCCEEDED(reason)) + reason = NS_BASE_STREAM_CLOSED; + + // input stream may remain open + mPipe->OnPipeException(reason, PR_TRUE); + return NS_OK; +} + +NS_IMETHODIMP +nsPipeOutputStream::Close() +{ + return CloseWithStatus(NS_BASE_STREAM_CLOSED); +} + +NS_IMETHODIMP +nsPipeOutputStream::WriteSegments(nsReadSegmentFun reader, + void* closure, + PRUint32 count, + PRUint32 *writeCount) +{ + LOG(("OOO WriteSegments [this=%x count=%u]\n", this, count)); + + nsresult rv = NS_OK; + + char *segment; + PRUint32 segmentLen; + + *writeCount = 0; + while (count) { + rv = mPipe->GetWriteSegment(segment, segmentLen); + if (NS_FAILED(rv)) { + if (rv == NS_BASE_STREAM_WOULD_BLOCK) { + // pipe is full + if (!mBlocking) { + // ignore this error if we've already written something + if (*writeCount > 0) + rv = NS_OK; + break; + } + // wait for the pipe to have an empty segment. + rv = Wait(); + if (NS_SUCCEEDED(rv)) + continue; + } + mPipe->OnPipeException(rv); + break; + } + + // write no more than count + if (segmentLen > count) + segmentLen = count; + + PRUint32 readCount, originalLen = segmentLen; + while (segmentLen) { + readCount = 0; + + rv = reader(this, closure, segment, *writeCount, segmentLen, &readCount); + + if (NS_FAILED(rv) || readCount == 0) { + count = 0; + // any errors returned from the reader end here: do not + // propogate to the caller of WriteSegments. + rv = NS_OK; + break; + } + + NS_ASSERTION(readCount <= segmentLen, "read more than expected"); + segment += readCount; + segmentLen -= readCount; + count -= readCount; + *writeCount += readCount; + mLogicalOffset += readCount; + } + + if (segmentLen < originalLen) + mPipe->AdvanceWriteCursor(originalLen - segmentLen); + } + + return rv; +} + +static NS_METHOD +nsReadFromRawBuffer(nsIOutputStream* outStr, + void* closure, + char* toRawSegment, + PRUint32 offset, + PRUint32 count, + PRUint32 *readCount) +{ + const char* fromBuf = (const char*)closure; + memcpy(toRawSegment, &fromBuf[offset], count); + *readCount = count; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeOutputStream::Write(const char* fromBuf, + PRUint32 bufLen, + PRUint32 *writeCount) +{ + return WriteSegments(nsReadFromRawBuffer, (void*)fromBuf, bufLen, writeCount); +} + +NS_IMETHODIMP +nsPipeOutputStream::Flush(void) +{ + // nothing to do + return NS_OK; +} + +static NS_METHOD +nsReadFromInputStream(nsIOutputStream* outStr, + void* closure, + char* toRawSegment, + PRUint32 offset, + PRUint32 count, + PRUint32 *readCount) +{ + nsIInputStream* fromStream = (nsIInputStream*)closure; + return fromStream->Read(toRawSegment, count, readCount); +} + +NS_IMETHODIMP +nsPipeOutputStream::WriteFrom(nsIInputStream* fromStream, + PRUint32 count, + PRUint32 *writeCount) +{ + return WriteSegments(nsReadFromInputStream, fromStream, count, writeCount); +} + +NS_IMETHODIMP +nsPipeOutputStream::IsNonBlocking(PRBool *aNonBlocking) +{ + *aNonBlocking = !mBlocking; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback *callback, + PRUint32 flags, + PRUint32 requestedCount, + nsIEventTarget *target) +{ + LOG(("OOO AsyncWait [this=%x]\n", this)); + + nsPipeEvents pipeEvents; + { + nsAutoMonitor mon(mPipe->mMonitor); + + // replace a pending callback + mCallback = 0; + mCallbackFlags = 0; + + nsCOMPtr<nsIOutputStreamCallback> proxy; + if (target) { + nsresult rv = NS_NewOutputStreamReadyEvent(getter_AddRefs(proxy), + callback, target); + if (NS_FAILED(rv)) return rv; + callback = proxy; + } + + if (NS_FAILED(mPipe->mStatus) || + (mWritable && !(flags & WAIT_CLOSURE_ONLY))) { + // stream is already closed or writable; post event. + pipeEvents.NotifyOutputReady(this, callback); + } + else { + // queue up callback object to be notified when data becomes available + mCallback = callback; + mCallbackFlags = flags; + } + } + return NS_OK; +} + +NS_IMETHODIMP +nsPipeOutputStream::Seek(PRInt32 whence, PRInt64 offset) +{ + NS_NOTREACHED("nsPipeOutputStream::Seek"); + return NS_ERROR_NOT_IMPLEMENTED; +} + +NS_IMETHODIMP +nsPipeOutputStream::Tell(PRInt64 *offset) +{ + *offset = mLogicalOffset; + return NS_OK; +} + +NS_IMETHODIMP +nsPipeOutputStream::SetEOF() +{ + NS_NOTREACHED("nsPipeOutputStream::SetEOF"); + return NS_ERROR_NOT_IMPLEMENTED; +} + +//////////////////////////////////////////////////////////////////////////////// + +NS_COM nsresult +NS_NewPipe2(nsIAsyncInputStream **pipeIn, + nsIAsyncOutputStream **pipeOut, + PRBool nonBlockingInput, + PRBool nonBlockingOutput, + PRUint32 segmentSize, + PRUint32 segmentCount, + nsIMemory *segmentAlloc) +{ + nsresult rv; + +#if defined(PR_LOGGING) + if (!gPipeLog) + gPipeLog = PR_NewLogModule("nsPipe"); +#endif + + nsPipe *pipe = new nsPipe(); + if (!pipe) + return NS_ERROR_OUT_OF_MEMORY; + + rv = pipe->Init(nonBlockingInput, + nonBlockingOutput, + segmentSize, + segmentCount, + segmentAlloc); + if (NS_FAILED(rv)) { + NS_ADDREF(pipe); + NS_RELEASE(pipe); + return rv; + } + + pipe->GetInputStream(pipeIn); + pipe->GetOutputStream(pipeOut); + return NS_OK; +} + +//////////////////////////////////////////////////////////////////////////////// |