diff options
Diffstat (limited to 'xpcom/tests/gtest/TestMultiplexInputStream.cpp')
-rw-r--r-- | xpcom/tests/gtest/TestMultiplexInputStream.cpp | 958 |
1 files changed, 958 insertions, 0 deletions
diff --git a/xpcom/tests/gtest/TestMultiplexInputStream.cpp b/xpcom/tests/gtest/TestMultiplexInputStream.cpp new file mode 100644 index 0000000000..7c422b068b --- /dev/null +++ b/xpcom/tests/gtest/TestMultiplexInputStream.cpp @@ -0,0 +1,958 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "gtest/gtest.h" +#include "mozilla/gtest/MozAssertions.h" +#include "mozilla/ipc/DataPipe.h" +#include "mozilla/SpinEventLoopUntil.h" +#include "nsIAsyncInputStream.h" +#include "nsComponentManagerUtils.h" +#include "nsIAsyncOutputStream.h" +#include "nsIInputStream.h" +#include "nsIMultiplexInputStream.h" +#include "nsIPipe.h" +#include "nsISeekableStream.h" +#include "nsStreamUtils.h" +#include "nsThreadUtils.h" +#include "nsIThread.h" +#include "Helpers.h" + +using mozilla::GetCurrentSerialEventTarget; +using mozilla::SpinEventLoopUntil; + +TEST(MultiplexInputStream, Seek_SET) +{ + nsCString buf1; + nsCString buf2; + nsCString buf3; + buf1.AssignLiteral("Hello world"); + buf2.AssignLiteral("The quick brown fox jumped over the lazy dog"); + buf3.AssignLiteral("Foo bar"); + + nsCOMPtr<nsIInputStream> inputStream1; + nsCOMPtr<nsIInputStream> inputStream2; + nsCOMPtr<nsIInputStream> inputStream3; + nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream1), buf1); + ASSERT_NS_SUCCEEDED(rv); + rv = NS_NewCStringInputStream(getter_AddRefs(inputStream2), buf2); + ASSERT_NS_SUCCEEDED(rv); + rv = NS_NewCStringInputStream(getter_AddRefs(inputStream3), buf3); + ASSERT_NS_SUCCEEDED(rv); + + nsCOMPtr<nsIMultiplexInputStream> multiplexStream = + do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); + ASSERT_TRUE(multiplexStream); + nsCOMPtr<nsIInputStream> stream(do_QueryInterface(multiplexStream)); + ASSERT_TRUE(stream); + + rv = multiplexStream->AppendStream(inputStream1); + ASSERT_NS_SUCCEEDED(rv); + rv = multiplexStream->AppendStream(inputStream2); + ASSERT_NS_SUCCEEDED(rv); + rv = multiplexStream->AppendStream(inputStream3); + ASSERT_NS_SUCCEEDED(rv); + + int64_t tell; + uint64_t length; + uint32_t count; + char readBuf[4096]; + nsCOMPtr<nsISeekableStream> seekStream = do_QueryInterface(multiplexStream); + ASSERT_TRUE(seekStream); + + // Seek forward in first input stream + rv = seekStream->Seek(nsISeekableStream::NS_SEEK_SET, 1); + ASSERT_NS_SUCCEEDED(rv); + rv = stream->Available(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)buf1.Length() + buf2.Length() + buf3.Length() - 1, + length); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, 1); + + // Check read is correct + rv = stream->Read(readBuf, 3, &count); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)3, count); + rv = stream->Available(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)buf1.Length() + buf2.Length() + buf3.Length() - 4, + length); + ASSERT_EQ(0, strncmp(readBuf, "ell", count)); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, 4); + + // Seek to start of third input stream + rv = seekStream->Seek(nsISeekableStream::NS_SEEK_SET, + buf1.Length() + buf2.Length()); + ASSERT_NS_SUCCEEDED(rv); + rv = stream->Available(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)buf3.Length(), length); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, int64_t(buf1.Length() + buf2.Length())); + + // Check read is correct + rv = stream->Read(readBuf, 5, &count); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)5, count); + rv = stream->Available(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)buf3.Length() - 5, length); + ASSERT_EQ(0, strncmp(readBuf, "Foo b", count)); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, int64_t(buf1.Length() + buf2.Length() + 5)); + + // Seek back to start of second stream (covers bug 1272371) + rv = seekStream->Seek(nsISeekableStream::NS_SEEK_SET, buf1.Length()); + ASSERT_NS_SUCCEEDED(rv); + rv = stream->Available(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)buf2.Length() + buf3.Length(), length); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, int64_t(buf1.Length())); + + // Check read is correct + rv = stream->Read(readBuf, 6, &count); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)6, count); + rv = stream->Available(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)buf2.Length() - 6 + buf3.Length(), length); + ASSERT_EQ(0, strncmp(readBuf, "The qu", count)); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, int64_t(buf1.Length() + 6)); +} + +TEST(MultiplexInputStream, Seek_CUR) +{ + nsCString buf1; + nsCString buf2; + nsCString buf3; + buf1.AssignLiteral("Hello world"); + buf2.AssignLiteral("The quick brown fox jumped over the lazy dog"); + buf3.AssignLiteral("Foo bar"); + + nsCOMPtr<nsIInputStream> inputStream1; + nsCOMPtr<nsIInputStream> inputStream2; + nsCOMPtr<nsIInputStream> inputStream3; + nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream1), buf1); + ASSERT_NS_SUCCEEDED(rv); + rv = NS_NewCStringInputStream(getter_AddRefs(inputStream2), buf2); + ASSERT_NS_SUCCEEDED(rv); + rv = NS_NewCStringInputStream(getter_AddRefs(inputStream3), buf3); + ASSERT_NS_SUCCEEDED(rv); + + nsCOMPtr<nsIMultiplexInputStream> multiplexStream = + do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); + ASSERT_TRUE(multiplexStream); + nsCOMPtr<nsIInputStream> stream(do_QueryInterface(multiplexStream)); + ASSERT_TRUE(stream); + + rv = multiplexStream->AppendStream(inputStream1); + ASSERT_NS_SUCCEEDED(rv); + rv = multiplexStream->AppendStream(inputStream2); + ASSERT_NS_SUCCEEDED(rv); + rv = multiplexStream->AppendStream(inputStream3); + ASSERT_NS_SUCCEEDED(rv); + + int64_t tell; + uint64_t length; + uint32_t count; + char readBuf[4096]; + nsCOMPtr<nsISeekableStream> seekStream = do_QueryInterface(multiplexStream); + ASSERT_TRUE(seekStream); + + // Seek forward in first input stream + rv = seekStream->Seek(nsISeekableStream::NS_SEEK_CUR, 1); + ASSERT_NS_SUCCEEDED(rv); + rv = stream->Available(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)buf1.Length() + buf2.Length() + buf3.Length() - 1, + length); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, 1); + + // Check read is correct + rv = stream->Read(readBuf, 3, &count); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)3, count); + rv = stream->Available(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)buf1.Length() + buf2.Length() + buf3.Length() - 4, + length); + ASSERT_EQ(0, strncmp(readBuf, "ell", count)); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, 4); + + // Let's go to the second stream + rv = seekStream->Seek(nsISeekableStream::NS_SEEK_CUR, 11); + ASSERT_NS_SUCCEEDED(rv); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, 15); + rv = stream->Read(readBuf, 3, &count); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)3, count); + ASSERT_EQ(0, strncmp(readBuf, "qui", count)); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, 18); + + // Let's go back to the first stream + rv = seekStream->Seek(nsISeekableStream::NS_SEEK_CUR, -9); + ASSERT_NS_SUCCEEDED(rv); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, 9); + rv = stream->Read(readBuf, 3, &count); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)3, count); + ASSERT_EQ(0, strncmp(readBuf, "ldT", count)); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, 12); +} + +TEST(MultiplexInputStream, Seek_END) +{ + nsCString buf1; + nsCString buf2; + nsCString buf3; + buf1.AssignLiteral("Hello world"); + buf2.AssignLiteral("The quick brown fox jumped over the lazy dog"); + buf3.AssignLiteral("Foo bar"); + + nsCOMPtr<nsIInputStream> inputStream1; + nsCOMPtr<nsIInputStream> inputStream2; + nsCOMPtr<nsIInputStream> inputStream3; + nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream1), buf1); + ASSERT_NS_SUCCEEDED(rv); + rv = NS_NewCStringInputStream(getter_AddRefs(inputStream2), buf2); + ASSERT_NS_SUCCEEDED(rv); + rv = NS_NewCStringInputStream(getter_AddRefs(inputStream3), buf3); + ASSERT_NS_SUCCEEDED(rv); + + nsCOMPtr<nsIMultiplexInputStream> multiplexStream = + do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); + ASSERT_TRUE(multiplexStream); + nsCOMPtr<nsIInputStream> stream(do_QueryInterface(multiplexStream)); + ASSERT_TRUE(stream); + + rv = multiplexStream->AppendStream(inputStream1); + ASSERT_NS_SUCCEEDED(rv); + rv = multiplexStream->AppendStream(inputStream2); + ASSERT_NS_SUCCEEDED(rv); + rv = multiplexStream->AppendStream(inputStream3); + ASSERT_NS_SUCCEEDED(rv); + + int64_t tell; + uint64_t length; + nsCOMPtr<nsISeekableStream> seekStream = do_QueryInterface(multiplexStream); + ASSERT_TRUE(seekStream); + + // SEEK_END wants <=0 values + rv = seekStream->Seek(nsISeekableStream::NS_SEEK_END, 1); + ASSERT_NS_FAILED(rv); + + // Let's go to the end. + rv = seekStream->Seek(nsISeekableStream::NS_SEEK_END, 0); + ASSERT_NS_SUCCEEDED(rv); + rv = stream->Available(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)0, length); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, int64_t(buf1.Length() + buf2.Length() + buf3.Length())); + + // -1 + rv = seekStream->Seek(nsISeekableStream::NS_SEEK_END, -1); + ASSERT_NS_SUCCEEDED(rv); + rv = stream->Available(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ((uint64_t)1, length); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, int64_t(buf1.Length() + buf2.Length() + buf3.Length() - 1)); + + // Almost at the beginning + tell = 1; + tell -= buf1.Length(); + tell -= buf2.Length(); + tell -= buf3.Length(); + rv = seekStream->Seek(nsISeekableStream::NS_SEEK_END, tell); + ASSERT_NS_SUCCEEDED(rv); + rv = stream->Available(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(length, buf1.Length() + buf2.Length() + buf3.Length() - 1); + rv = seekStream->Tell(&tell); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(tell, 1); +} + +static already_AddRefed<nsIInputStream> CreateStreamHelper() { + nsCOMPtr<nsIMultiplexInputStream> multiplexStream = + do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); + + nsCString buf1; + buf1.AssignLiteral("Hello"); + + nsCOMPtr<nsIInputStream> inputStream1 = new testing::AsyncStringStream(buf1); + multiplexStream->AppendStream(inputStream1); + + nsCString buf2; + buf2.AssignLiteral(" "); + + nsCOMPtr<nsIInputStream> inputStream2 = new testing::AsyncStringStream(buf2); + multiplexStream->AppendStream(inputStream2); + + nsCString buf3; + buf3.AssignLiteral("World!"); + + nsCOMPtr<nsIInputStream> inputStream3 = new testing::AsyncStringStream(buf3); + multiplexStream->AppendStream(inputStream3); + + nsCOMPtr<nsIInputStream> stream(do_QueryInterface(multiplexStream)); + return stream.forget(); +} + +// AsyncWait - without EventTarget +TEST(MultiplexInputStream, AsyncWait_withoutEventTarget) +{ + nsCOMPtr<nsIInputStream> is = CreateStreamHelper(); + + nsCOMPtr<nsIAsyncInputStream> ais = do_QueryInterface(is); + ASSERT_TRUE(!!ais); + + RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback(); + + ASSERT_EQ(NS_OK, ais->AsyncWait(cb, 0, 0, nullptr)); + ASSERT_TRUE(cb->Called()); +} + +// AsyncWait - with EventTarget +TEST(MultiplexInputStream, AsyncWait_withEventTarget) +{ + nsCOMPtr<nsIInputStream> is = CreateStreamHelper(); + + nsCOMPtr<nsIAsyncInputStream> ais = do_QueryInterface(is); + ASSERT_TRUE(!!ais); + + RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback(); + nsCOMPtr<nsIThread> thread = do_GetCurrentThread(); + + ASSERT_EQ(NS_OK, ais->AsyncWait(cb, 0, 0, thread)); + + ASSERT_FALSE(cb->Called()); + + // Eventually it is called. + MOZ_ALWAYS_TRUE(mozilla::SpinEventLoopUntil( + "xpcom:TEST(MultiplexInputStream, AsyncWait_withEventTarget)"_ns, + [&]() { return cb->Called(); })); + ASSERT_TRUE(cb->Called()); +} + +// AsyncWait - without EventTarget - closureOnly +TEST(MultiplexInputStream, AsyncWait_withoutEventTarget_closureOnly) +{ + nsCOMPtr<nsIInputStream> is = CreateStreamHelper(); + + nsCOMPtr<nsIAsyncInputStream> ais = do_QueryInterface(is); + ASSERT_TRUE(!!ais); + + RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback(); + + ASSERT_EQ(NS_OK, ais->AsyncWait(cb, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, + nullptr)); + ASSERT_FALSE(cb->Called()); + + ais->CloseWithStatus(NS_ERROR_FAILURE); + ASSERT_TRUE(cb->Called()); +} + +// AsyncWait - with EventTarget - closureOnly +TEST(MultiplexInputStream, AsyncWait_withEventTarget_closureOnly) +{ + nsCOMPtr<nsIInputStream> is = CreateStreamHelper(); + + nsCOMPtr<nsIAsyncInputStream> ais = do_QueryInterface(is); + ASSERT_TRUE(!!ais); + + RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback(); + nsCOMPtr<nsIThread> thread = do_GetCurrentThread(); + + ASSERT_EQ(NS_OK, ais->AsyncWait(cb, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, + thread)); + + ASSERT_FALSE(cb->Called()); + ais->CloseWithStatus(NS_ERROR_FAILURE); + ASSERT_FALSE(cb->Called()); + + // Eventually it is called. + MOZ_ALWAYS_TRUE(mozilla::SpinEventLoopUntil( + "xpcom:TEST(MultiplexInputStream, AsyncWait_withEventTarget_closureOnly)"_ns, + [&]() { return cb->Called(); })); + ASSERT_TRUE(cb->Called()); +} + +class ClosedStream final : public nsIInputStream { + public: + NS_DECL_THREADSAFE_ISUPPORTS + + ClosedStream() = default; + + NS_IMETHOD + Available(uint64_t* aLength) override { return NS_BASE_STREAM_CLOSED; } + + NS_IMETHOD + StreamStatus() override { return NS_BASE_STREAM_CLOSED; } + + NS_IMETHOD + Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override { + MOZ_CRASH("This should not be called!"); + return NS_OK; + } + + NS_IMETHOD + ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, + uint32_t* aResult) override { + return NS_ERROR_NOT_IMPLEMENTED; + } + + NS_IMETHOD + Close() override { return NS_OK; } + + NS_IMETHOD + IsNonBlocking(bool* aNonBlocking) override { + *aNonBlocking = true; + return NS_OK; + } + + private: + ~ClosedStream() = default; +}; + +NS_IMPL_ISUPPORTS(ClosedStream, nsIInputStream) + +class AsyncStream final : public nsIAsyncInputStream { + public: + NS_DECL_THREADSAFE_ISUPPORTS + + explicit AsyncStream(int64_t aSize) : mState(eBlocked), mSize(aSize) {} + + void Unblock() { mState = eUnblocked; } + + NS_IMETHOD + Available(uint64_t* aLength) override { + *aLength = mState == eBlocked ? 0 : mSize; + return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_OK; + } + + NS_IMETHOD + StreamStatus() override { + return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_OK; + } + + NS_IMETHOD + Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override { + MOZ_CRASH("This should not be called!"); + return NS_OK; + } + + NS_IMETHOD + ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, + uint32_t* aResult) override { + return NS_ERROR_NOT_IMPLEMENTED; + } + + NS_IMETHOD + Close() override { + mState = eClosed; + return NS_OK; + } + + NS_IMETHOD + IsNonBlocking(bool* aNonBlocking) override { + *aNonBlocking = true; + return NS_OK; + } + + NS_IMETHOD + AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags, + uint32_t aRequestedCount, nsIEventTarget* aEventTarget) override { + MOZ_CRASH("This should not be called!"); + return NS_OK; + } + + NS_IMETHOD + CloseWithStatus(nsresult aStatus) override { return NS_OK; } + + private: + ~AsyncStream() = default; + + enum { eBlocked, eUnblocked, eClosed } mState; + + uint64_t mSize; +}; + +NS_IMPL_ISUPPORTS(AsyncStream, nsIInputStream, nsIAsyncInputStream) + +class BlockingStream final : public nsIInputStream { + public: + NS_DECL_THREADSAFE_ISUPPORTS + + BlockingStream() = default; + + NS_IMETHOD + Available(uint64_t* aLength) override { return NS_BASE_STREAM_CLOSED; } + + NS_IMETHOD + StreamStatus() override { return NS_BASE_STREAM_CLOSED; } + + NS_IMETHOD + Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override { + // We are actually empty. + *aReadCount = 0; + return NS_OK; + } + + NS_IMETHOD + ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, + uint32_t* aResult) override { + return NS_ERROR_NOT_IMPLEMENTED; + } + + NS_IMETHOD + Close() override { return NS_OK; } + + NS_IMETHOD + IsNonBlocking(bool* aNonBlocking) override { + *aNonBlocking = false; + return NS_OK; + } + + private: + ~BlockingStream() = default; +}; + +NS_IMPL_ISUPPORTS(BlockingStream, nsIInputStream) + +TEST(MultiplexInputStream, Available) +{ + nsCOMPtr<nsIMultiplexInputStream> multiplexStream = + do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); + + nsCOMPtr<nsIInputStream> s = do_QueryInterface(multiplexStream); + ASSERT_TRUE(!!s); + + nsCOMPtr<nsIAsyncInputStream> as = do_QueryInterface(multiplexStream); + ASSERT_TRUE(!as); + + uint64_t length; + + // The stream returns NS_BASE_STREAM_CLOSED if there are no substreams. + nsresult rv = s->Available(&length); + ASSERT_EQ(NS_BASE_STREAM_CLOSED, rv); + + rv = multiplexStream->AppendStream(new ClosedStream()); + ASSERT_EQ(NS_OK, rv); + + uint64_t asyncSize = 2; + RefPtr<AsyncStream> asyncStream = new AsyncStream(2); + rv = multiplexStream->AppendStream(asyncStream); + ASSERT_EQ(NS_OK, rv); + + nsCString buffer; + buffer.Assign("World!!!"); + + nsCOMPtr<nsIInputStream> stringStream; + rv = NS_NewCStringInputStream(getter_AddRefs(stringStream), buffer); + ASSERT_EQ(NS_OK, rv); + + rv = multiplexStream->AppendStream(stringStream); + ASSERT_EQ(NS_OK, rv); + + // Now we are async. + as = do_QueryInterface(multiplexStream); + ASSERT_TRUE(!!as); + + // Available should skip the closed stream and return 0 because the + // asyncStream returns 0 and it's async. + rv = s->Available(&length); + ASSERT_EQ(NS_OK, rv); + ASSERT_EQ((uint64_t)0, length); + + asyncStream->Unblock(); + + // Now we should return only the size of the async stream because we don't + // know when this is completed. + rv = s->Available(&length); + ASSERT_EQ(NS_OK, rv); + ASSERT_EQ(asyncSize, length); + + asyncStream->Close(); + + rv = s->Available(&length); + ASSERT_EQ(NS_OK, rv); + ASSERT_EQ(buffer.Length(), length); +} + +class NonBufferableStringStream final : public nsIInputStream { + nsCOMPtr<nsIInputStream> mStream; + + public: + NS_DECL_THREADSAFE_ISUPPORTS + + explicit NonBufferableStringStream(const nsACString& aBuffer) { + NS_NewCStringInputStream(getter_AddRefs(mStream), aBuffer); + } + + NS_IMETHOD + Available(uint64_t* aLength) override { return mStream->Available(aLength); } + + NS_IMETHOD + StreamStatus() override { return mStream->StreamStatus(); } + + NS_IMETHOD + Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override { + return mStream->Read(aBuffer, aCount, aReadCount); + } + + NS_IMETHOD + ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, + uint32_t* aResult) override { + return NS_ERROR_NOT_IMPLEMENTED; + } + + NS_IMETHOD + Close() override { return mStream->Close(); } + + NS_IMETHOD + IsNonBlocking(bool* aNonBlocking) override { + return mStream->IsNonBlocking(aNonBlocking); + } + + private: + ~NonBufferableStringStream() = default; +}; + +NS_IMPL_ISUPPORTS(NonBufferableStringStream, nsIInputStream) + +TEST(MultiplexInputStream, Bufferable) +{ + nsCOMPtr<nsIMultiplexInputStream> multiplexStream = + do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); + + nsCOMPtr<nsIInputStream> s = do_QueryInterface(multiplexStream); + ASSERT_TRUE(!!s); + + nsCString buf1; + buf1.AssignLiteral("Hello "); + nsCOMPtr<nsIInputStream> inputStream1; + nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream1), buf1); + ASSERT_NS_SUCCEEDED(rv); + + nsCString buf2; + buf2.AssignLiteral("world"); + nsCOMPtr<nsIInputStream> inputStream2 = new NonBufferableStringStream(buf2); + + rv = multiplexStream->AppendStream(inputStream1); + ASSERT_NS_SUCCEEDED(rv); + + rv = multiplexStream->AppendStream(inputStream2); + ASSERT_NS_SUCCEEDED(rv); + + nsCOMPtr<nsIInputStream> stream(do_QueryInterface(multiplexStream)); + ASSERT_TRUE(!!stream); + + char buf3[1024]; + uint32_t size = 0; + rv = stream->ReadSegments(NS_CopySegmentToBuffer, buf3, sizeof(buf3), &size); + ASSERT_NS_SUCCEEDED(rv); + + ASSERT_EQ(size, buf1.Length() + buf2.Length()); + ASSERT_TRUE(!strncmp(buf3, "Hello world", size)); +} + +TEST(MultiplexInputStream, QILengthInputStream) +{ + nsCString buf; + buf.AssignLiteral("Hello world"); + + nsCOMPtr<nsIMultiplexInputStream> multiplexStream = + do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); + + // nsMultiplexInputStream doesn't expose nsIInputStreamLength if there are + // no nsIInputStreamLength sub streams. + { + nsCOMPtr<nsIInputStream> inputStream; + nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream), buf); + ASSERT_NS_SUCCEEDED(rv); + + rv = multiplexStream->AppendStream(inputStream); + ASSERT_NS_SUCCEEDED(rv); + + nsCOMPtr<nsIInputStreamLength> fsis = do_QueryInterface(multiplexStream); + ASSERT_TRUE(!fsis); + + nsCOMPtr<nsIAsyncInputStreamLength> afsis = + do_QueryInterface(multiplexStream); + ASSERT_TRUE(!afsis); + } + + // nsMultiplexInputStream exposes nsIInputStreamLength if there is one or + // more nsIInputStreamLength sub streams. + { + RefPtr<testing::LengthInputStream> inputStream = + new testing::LengthInputStream(buf, true, false); + + nsresult rv = multiplexStream->AppendStream(inputStream); + ASSERT_NS_SUCCEEDED(rv); + + nsCOMPtr<nsIInputStreamLength> fsis = do_QueryInterface(multiplexStream); + ASSERT_TRUE(!!fsis); + + nsCOMPtr<nsIAsyncInputStreamLength> afsis = + do_QueryInterface(multiplexStream); + ASSERT_TRUE(!afsis); + } + + // nsMultiplexInputStream exposes nsIAsyncInputStreamLength if there is one + // or more nsIAsyncInputStreamLength sub streams. + { + RefPtr<testing::LengthInputStream> inputStream = + new testing::LengthInputStream(buf, true, true); + + nsresult rv = multiplexStream->AppendStream(inputStream); + ASSERT_NS_SUCCEEDED(rv); + + nsCOMPtr<nsIInputStreamLength> fsis = do_QueryInterface(multiplexStream); + ASSERT_TRUE(!!fsis); + + nsCOMPtr<nsIAsyncInputStreamLength> afsis = + do_QueryInterface(multiplexStream); + ASSERT_TRUE(!!afsis); + } +} + +TEST(MultiplexInputStream, LengthInputStream) +{ + nsCOMPtr<nsIMultiplexInputStream> multiplexStream = + do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); + + // First stream is a a simple one. + nsCString buf; + buf.AssignLiteral("Hello world"); + + nsCOMPtr<nsIInputStream> inputStream; + nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream), buf); + ASSERT_NS_SUCCEEDED(rv); + + rv = multiplexStream->AppendStream(inputStream); + ASSERT_NS_SUCCEEDED(rv); + + // A LengthInputStream, non-async. + RefPtr<testing::LengthInputStream> lengthStream = + new testing::LengthInputStream(buf, true, false); + + rv = multiplexStream->AppendStream(lengthStream); + ASSERT_NS_SUCCEEDED(rv); + + nsCOMPtr<nsIInputStreamLength> fsis = do_QueryInterface(multiplexStream); + ASSERT_TRUE(!!fsis); + + // Size is the sum of the 2 streams. + int64_t length; + rv = fsis->Length(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(int64_t(buf.Length() * 2), length); + + // An async LengthInputStream. + RefPtr<testing::LengthInputStream> asyncLengthStream = + new testing::LengthInputStream(buf, true, true, + NS_BASE_STREAM_WOULD_BLOCK); + + rv = multiplexStream->AppendStream(asyncLengthStream); + ASSERT_NS_SUCCEEDED(rv); + + nsCOMPtr<nsIAsyncInputStreamLength> afsis = + do_QueryInterface(multiplexStream); + ASSERT_TRUE(!!afsis); + + // Now it would block. + rv = fsis->Length(&length); + ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv); + + // Let's read the size async. + RefPtr<testing::LengthCallback> callback = new testing::LengthCallback(); + rv = afsis->AsyncLengthWait(callback, GetCurrentSerialEventTarget()); + ASSERT_EQ(NS_OK, rv); + + MOZ_ALWAYS_TRUE(SpinEventLoopUntil( + "xpcom:TEST(MultiplexInputStream, LengthInputStream) 1"_ns, + [&]() { return callback->Called(); })); + ASSERT_EQ(int64_t(buf.Length() * 3), callback->Size()); + + // Now a negative stream + lengthStream = new testing::LengthInputStream(buf, true, false, NS_OK, true); + + rv = multiplexStream->AppendStream(lengthStream); + ASSERT_NS_SUCCEEDED(rv); + + rv = fsis->Length(&length); + ASSERT_NS_SUCCEEDED(rv); + ASSERT_EQ(-1, length); + + // Another async LengthInputStream. + asyncLengthStream = new testing::LengthInputStream( + buf, true, true, NS_BASE_STREAM_WOULD_BLOCK); + + rv = multiplexStream->AppendStream(asyncLengthStream); + ASSERT_NS_SUCCEEDED(rv); + + afsis = do_QueryInterface(multiplexStream); + ASSERT_TRUE(!!afsis); + + // Let's read the size async. + RefPtr<testing::LengthCallback> callback1 = new testing::LengthCallback(); + rv = afsis->AsyncLengthWait(callback1, GetCurrentSerialEventTarget()); + ASSERT_EQ(NS_OK, rv); + + RefPtr<testing::LengthCallback> callback2 = new testing::LengthCallback(); + rv = afsis->AsyncLengthWait(callback2, GetCurrentSerialEventTarget()); + ASSERT_EQ(NS_OK, rv); + + MOZ_ALWAYS_TRUE(SpinEventLoopUntil( + "xpcom:TEST(MultiplexInputStream, LengthInputStream) 2"_ns, + [&]() { return callback2->Called(); })); + ASSERT_FALSE(callback1->Called()); + ASSERT_TRUE(callback2->Called()); +} + +void TestMultiplexStreamReadWhileWaiting(nsIAsyncInputStream* pipeIn, + nsIAsyncOutputStream* pipeOut) { + // We had an issue where a stream which was read while a message was in-flight + // to report the stream was ready, meaning that the stream reported 0 bytes + // available when checked in the MultiplexInputStream's callback, and was + // skipped over. + + nsCOMPtr<nsIThread> mainThread = NS_GetCurrentThread(); + + nsCOMPtr<nsIMultiplexInputStream> multiplexStream = + do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); + ASSERT_NS_SUCCEEDED(multiplexStream->AppendStream(pipeIn)); + + nsCOMPtr<nsIInputStream> stringStream; + ASSERT_TRUE(NS_SUCCEEDED( + NS_NewCStringInputStream(getter_AddRefs(stringStream), "xxxx\0"_ns))); + ASSERT_NS_SUCCEEDED(multiplexStream->AppendStream(stringStream)); + + nsCOMPtr<nsIAsyncInputStream> asyncMultiplex = + do_QueryInterface(multiplexStream); + ASSERT_TRUE(asyncMultiplex); + + RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback(); + ASSERT_NS_SUCCEEDED(asyncMultiplex->AsyncWait(cb, 0, 0, mainThread)); + EXPECT_FALSE(cb->Called()); + + NS_ProcessPendingEvents(mainThread); + EXPECT_FALSE(cb->Called()); + + uint64_t available; + ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); + EXPECT_EQ(available, 0u); + + // Write some data to the pipe, which should wake up the async wait message to + // be delivered. + char toWrite[] = "1234"; + uint32_t written; + ASSERT_NS_SUCCEEDED(pipeOut->Write(toWrite, sizeof(toWrite), &written)); + EXPECT_EQ(written, sizeof(toWrite)); + EXPECT_FALSE(cb->Called()); + ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); + EXPECT_EQ(available, sizeof(toWrite)); + + // Read that data from the stream + char toRead[sizeof(toWrite)]; + uint32_t read; + ASSERT_TRUE( + NS_SUCCEEDED(asyncMultiplex->Read(toRead, sizeof(toRead), &read))); + EXPECT_EQ(read, sizeof(toRead)); + EXPECT_STREQ(toRead, toWrite); + EXPECT_FALSE(cb->Called()); + ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); + EXPECT_EQ(available, 0u); + + // The multiplex stream will have detected the read and prevented the callback + // from having been called yet. + NS_ProcessPendingEvents(mainThread); + EXPECT_FALSE(cb->Called()); + ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); + EXPECT_EQ(available, 0u); + + // Write more data and close, then make sure we can read everything else in + // the stream. + char toWrite2[] = "56789"; + ASSERT_TRUE( + NS_SUCCEEDED(pipeOut->Write(toWrite2, sizeof(toWrite2), &written))); + EXPECT_EQ(written, sizeof(toWrite2)); + EXPECT_FALSE(cb->Called()); + ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); + EXPECT_EQ(available, sizeof(toWrite2)); + + ASSERT_NS_SUCCEEDED(pipeOut->Close()); + ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); + // XXX: Theoretically if the multiplex stream could detect it, we could report + // `sizeof(toWrite2) + 4` because the stream is complete, but there's no way + // for the multiplex stream to know. + EXPECT_EQ(available, sizeof(toWrite2)); + + NS_ProcessPendingEvents(mainThread); + EXPECT_TRUE(cb->Called()); + + // Read that final bit of data and make sure we read it. + char toRead2[sizeof(toWrite2)]; + ASSERT_TRUE( + NS_SUCCEEDED(asyncMultiplex->Read(toRead2, sizeof(toRead2), &read))); + EXPECT_EQ(read, sizeof(toRead2)); + EXPECT_STREQ(toRead2, toWrite2); + ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); + EXPECT_EQ(available, 5u); + + // Read the extra data as well. + char extraRead[5]; + ASSERT_TRUE( + NS_SUCCEEDED(asyncMultiplex->Read(extraRead, sizeof(extraRead), &read))); + EXPECT_EQ(read, sizeof(extraRead)); + EXPECT_STREQ(extraRead, "xxxx"); + ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); + EXPECT_EQ(available, 0u); +} + +TEST(MultiplexInputStream, ReadWhileWaiting_nsPipe) +{ + nsCOMPtr<nsIAsyncInputStream> pipeIn; + nsCOMPtr<nsIAsyncOutputStream> pipeOut; + NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(pipeOut), true, true); + TestMultiplexStreamReadWhileWaiting(pipeIn, pipeOut); +} + +TEST(MultiplexInputStream, ReadWhileWaiting_DataPipe) +{ + RefPtr<mozilla::ipc::DataPipeReceiver> pipeIn; + RefPtr<mozilla::ipc::DataPipeSender> pipeOut; + ASSERT_TRUE(NS_SUCCEEDED(mozilla::ipc::NewDataPipe( + mozilla::ipc::kDefaultDataPipeCapacity, getter_AddRefs(pipeOut), + getter_AddRefs(pipeIn)))); + TestMultiplexStreamReadWhileWaiting(pipeIn, pipeOut); +} |