/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "gtest/gtest.h" #include "mozilla/gtest/MozAssertions.h" #include "mozilla/ipc/DataPipe.h" #include "mozilla/SpinEventLoopUntil.h" #include "nsIAsyncInputStream.h" #include "nsComponentManagerUtils.h" #include "nsIAsyncOutputStream.h" #include "nsIInputStream.h" #include "nsIMultiplexInputStream.h" #include "nsIPipe.h" #include "nsISeekableStream.h" #include "nsStreamUtils.h" #include "nsThreadUtils.h" #include "nsIThread.h" #include "Helpers.h" using mozilla::GetCurrentSerialEventTarget; using mozilla::SpinEventLoopUntil; TEST(MultiplexInputStream, Seek_SET) { nsCString buf1; nsCString buf2; nsCString buf3; buf1.AssignLiteral("Hello world"); buf2.AssignLiteral("The quick brown fox jumped over the lazy dog"); buf3.AssignLiteral("Foo bar"); nsCOMPtr inputStream1; nsCOMPtr inputStream2; nsCOMPtr inputStream3; nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream1), buf1); ASSERT_NS_SUCCEEDED(rv); rv = NS_NewCStringInputStream(getter_AddRefs(inputStream2), buf2); ASSERT_NS_SUCCEEDED(rv); rv = NS_NewCStringInputStream(getter_AddRefs(inputStream3), buf3); ASSERT_NS_SUCCEEDED(rv); nsCOMPtr multiplexStream = do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); ASSERT_TRUE(multiplexStream); nsCOMPtr stream(do_QueryInterface(multiplexStream)); ASSERT_TRUE(stream); rv = multiplexStream->AppendStream(inputStream1); ASSERT_NS_SUCCEEDED(rv); rv = multiplexStream->AppendStream(inputStream2); ASSERT_NS_SUCCEEDED(rv); rv = multiplexStream->AppendStream(inputStream3); ASSERT_NS_SUCCEEDED(rv); int64_t tell; uint64_t length; uint32_t count; char readBuf[4096]; nsCOMPtr seekStream = do_QueryInterface(multiplexStream); ASSERT_TRUE(seekStream); // Seek forward in first input stream rv = seekStream->Seek(nsISeekableStream::NS_SEEK_SET, 1); ASSERT_NS_SUCCEEDED(rv); rv = stream->Available(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)buf1.Length() + buf2.Length() + buf3.Length() - 1, length); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, 1); // Check read is correct rv = stream->Read(readBuf, 3, &count); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)3, count); rv = stream->Available(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)buf1.Length() + buf2.Length() + buf3.Length() - 4, length); ASSERT_EQ(0, strncmp(readBuf, "ell", count)); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, 4); // Seek to start of third input stream rv = seekStream->Seek(nsISeekableStream::NS_SEEK_SET, buf1.Length() + buf2.Length()); ASSERT_NS_SUCCEEDED(rv); rv = stream->Available(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)buf3.Length(), length); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, int64_t(buf1.Length() + buf2.Length())); // Check read is correct rv = stream->Read(readBuf, 5, &count); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)5, count); rv = stream->Available(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)buf3.Length() - 5, length); ASSERT_EQ(0, strncmp(readBuf, "Foo b", count)); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, int64_t(buf1.Length() + buf2.Length() + 5)); // Seek back to start of second stream (covers bug 1272371) rv = seekStream->Seek(nsISeekableStream::NS_SEEK_SET, buf1.Length()); ASSERT_NS_SUCCEEDED(rv); rv = stream->Available(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)buf2.Length() + buf3.Length(), length); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, int64_t(buf1.Length())); // Check read is correct rv = stream->Read(readBuf, 6, &count); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)6, count); rv = stream->Available(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)buf2.Length() - 6 + buf3.Length(), length); ASSERT_EQ(0, strncmp(readBuf, "The qu", count)); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, int64_t(buf1.Length() + 6)); } TEST(MultiplexInputStream, Seek_CUR) { nsCString buf1; nsCString buf2; nsCString buf3; buf1.AssignLiteral("Hello world"); buf2.AssignLiteral("The quick brown fox jumped over the lazy dog"); buf3.AssignLiteral("Foo bar"); nsCOMPtr inputStream1; nsCOMPtr inputStream2; nsCOMPtr inputStream3; nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream1), buf1); ASSERT_NS_SUCCEEDED(rv); rv = NS_NewCStringInputStream(getter_AddRefs(inputStream2), buf2); ASSERT_NS_SUCCEEDED(rv); rv = NS_NewCStringInputStream(getter_AddRefs(inputStream3), buf3); ASSERT_NS_SUCCEEDED(rv); nsCOMPtr multiplexStream = do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); ASSERT_TRUE(multiplexStream); nsCOMPtr stream(do_QueryInterface(multiplexStream)); ASSERT_TRUE(stream); rv = multiplexStream->AppendStream(inputStream1); ASSERT_NS_SUCCEEDED(rv); rv = multiplexStream->AppendStream(inputStream2); ASSERT_NS_SUCCEEDED(rv); rv = multiplexStream->AppendStream(inputStream3); ASSERT_NS_SUCCEEDED(rv); int64_t tell; uint64_t length; uint32_t count; char readBuf[4096]; nsCOMPtr seekStream = do_QueryInterface(multiplexStream); ASSERT_TRUE(seekStream); // Seek forward in first input stream rv = seekStream->Seek(nsISeekableStream::NS_SEEK_CUR, 1); ASSERT_NS_SUCCEEDED(rv); rv = stream->Available(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)buf1.Length() + buf2.Length() + buf3.Length() - 1, length); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, 1); // Check read is correct rv = stream->Read(readBuf, 3, &count); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)3, count); rv = stream->Available(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)buf1.Length() + buf2.Length() + buf3.Length() - 4, length); ASSERT_EQ(0, strncmp(readBuf, "ell", count)); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, 4); // Let's go to the second stream rv = seekStream->Seek(nsISeekableStream::NS_SEEK_CUR, 11); ASSERT_NS_SUCCEEDED(rv); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, 15); rv = stream->Read(readBuf, 3, &count); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)3, count); ASSERT_EQ(0, strncmp(readBuf, "qui", count)); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, 18); // Let's go back to the first stream rv = seekStream->Seek(nsISeekableStream::NS_SEEK_CUR, -9); ASSERT_NS_SUCCEEDED(rv); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, 9); rv = stream->Read(readBuf, 3, &count); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)3, count); ASSERT_EQ(0, strncmp(readBuf, "ldT", count)); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, 12); } TEST(MultiplexInputStream, Seek_END) { nsCString buf1; nsCString buf2; nsCString buf3; buf1.AssignLiteral("Hello world"); buf2.AssignLiteral("The quick brown fox jumped over the lazy dog"); buf3.AssignLiteral("Foo bar"); nsCOMPtr inputStream1; nsCOMPtr inputStream2; nsCOMPtr inputStream3; nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream1), buf1); ASSERT_NS_SUCCEEDED(rv); rv = NS_NewCStringInputStream(getter_AddRefs(inputStream2), buf2); ASSERT_NS_SUCCEEDED(rv); rv = NS_NewCStringInputStream(getter_AddRefs(inputStream3), buf3); ASSERT_NS_SUCCEEDED(rv); nsCOMPtr multiplexStream = do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); ASSERT_TRUE(multiplexStream); nsCOMPtr stream(do_QueryInterface(multiplexStream)); ASSERT_TRUE(stream); rv = multiplexStream->AppendStream(inputStream1); ASSERT_NS_SUCCEEDED(rv); rv = multiplexStream->AppendStream(inputStream2); ASSERT_NS_SUCCEEDED(rv); rv = multiplexStream->AppendStream(inputStream3); ASSERT_NS_SUCCEEDED(rv); int64_t tell; uint64_t length; nsCOMPtr seekStream = do_QueryInterface(multiplexStream); ASSERT_TRUE(seekStream); // SEEK_END wants <=0 values rv = seekStream->Seek(nsISeekableStream::NS_SEEK_END, 1); ASSERT_NS_FAILED(rv); // Let's go to the end. rv = seekStream->Seek(nsISeekableStream::NS_SEEK_END, 0); ASSERT_NS_SUCCEEDED(rv); rv = stream->Available(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)0, length); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, int64_t(buf1.Length() + buf2.Length() + buf3.Length())); // -1 rv = seekStream->Seek(nsISeekableStream::NS_SEEK_END, -1); ASSERT_NS_SUCCEEDED(rv); rv = stream->Available(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ((uint64_t)1, length); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, int64_t(buf1.Length() + buf2.Length() + buf3.Length() - 1)); // Almost at the beginning tell = 1; tell -= buf1.Length(); tell -= buf2.Length(); tell -= buf3.Length(); rv = seekStream->Seek(nsISeekableStream::NS_SEEK_END, tell); ASSERT_NS_SUCCEEDED(rv); rv = stream->Available(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(length, buf1.Length() + buf2.Length() + buf3.Length() - 1); rv = seekStream->Tell(&tell); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(tell, 1); } static already_AddRefed CreateStreamHelper() { nsCOMPtr multiplexStream = do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); nsCString buf1; buf1.AssignLiteral("Hello"); nsCOMPtr inputStream1 = new testing::AsyncStringStream(buf1); multiplexStream->AppendStream(inputStream1); nsCString buf2; buf2.AssignLiteral(" "); nsCOMPtr inputStream2 = new testing::AsyncStringStream(buf2); multiplexStream->AppendStream(inputStream2); nsCString buf3; buf3.AssignLiteral("World!"); nsCOMPtr inputStream3 = new testing::AsyncStringStream(buf3); multiplexStream->AppendStream(inputStream3); nsCOMPtr stream(do_QueryInterface(multiplexStream)); return stream.forget(); } // AsyncWait - without EventTarget TEST(MultiplexInputStream, AsyncWait_withoutEventTarget) { nsCOMPtr is = CreateStreamHelper(); nsCOMPtr ais = do_QueryInterface(is); ASSERT_TRUE(!!ais); RefPtr cb = new testing::InputStreamCallback(); ASSERT_EQ(NS_OK, ais->AsyncWait(cb, 0, 0, nullptr)); ASSERT_TRUE(cb->Called()); } // AsyncWait - with EventTarget TEST(MultiplexInputStream, AsyncWait_withEventTarget) { nsCOMPtr is = CreateStreamHelper(); nsCOMPtr ais = do_QueryInterface(is); ASSERT_TRUE(!!ais); RefPtr cb = new testing::InputStreamCallback(); nsCOMPtr thread = do_GetCurrentThread(); ASSERT_EQ(NS_OK, ais->AsyncWait(cb, 0, 0, thread)); ASSERT_FALSE(cb->Called()); // Eventually it is called. MOZ_ALWAYS_TRUE(mozilla::SpinEventLoopUntil( "xpcom:TEST(MultiplexInputStream, AsyncWait_withEventTarget)"_ns, [&]() { return cb->Called(); })); ASSERT_TRUE(cb->Called()); } // AsyncWait - without EventTarget - closureOnly TEST(MultiplexInputStream, AsyncWait_withoutEventTarget_closureOnly) { nsCOMPtr is = CreateStreamHelper(); nsCOMPtr ais = do_QueryInterface(is); ASSERT_TRUE(!!ais); RefPtr cb = new testing::InputStreamCallback(); ASSERT_EQ(NS_OK, ais->AsyncWait(cb, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, nullptr)); ASSERT_FALSE(cb->Called()); ais->CloseWithStatus(NS_ERROR_FAILURE); ASSERT_TRUE(cb->Called()); } // AsyncWait - with EventTarget - closureOnly TEST(MultiplexInputStream, AsyncWait_withEventTarget_closureOnly) { nsCOMPtr is = CreateStreamHelper(); nsCOMPtr ais = do_QueryInterface(is); ASSERT_TRUE(!!ais); RefPtr cb = new testing::InputStreamCallback(); nsCOMPtr thread = do_GetCurrentThread(); ASSERT_EQ(NS_OK, ais->AsyncWait(cb, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, thread)); ASSERT_FALSE(cb->Called()); ais->CloseWithStatus(NS_ERROR_FAILURE); ASSERT_FALSE(cb->Called()); // Eventually it is called. MOZ_ALWAYS_TRUE(mozilla::SpinEventLoopUntil( "xpcom:TEST(MultiplexInputStream, AsyncWait_withEventTarget_closureOnly)"_ns, [&]() { return cb->Called(); })); ASSERT_TRUE(cb->Called()); } class ClosedStream final : public nsIInputStream { public: NS_DECL_THREADSAFE_ISUPPORTS ClosedStream() = default; NS_IMETHOD Available(uint64_t* aLength) override { return NS_BASE_STREAM_CLOSED; } NS_IMETHOD StreamStatus() override { return NS_BASE_STREAM_CLOSED; } NS_IMETHOD Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override { MOZ_CRASH("This should not be called!"); return NS_OK; } NS_IMETHOD ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, uint32_t* aResult) override { return NS_ERROR_NOT_IMPLEMENTED; } NS_IMETHOD Close() override { return NS_OK; } NS_IMETHOD IsNonBlocking(bool* aNonBlocking) override { *aNonBlocking = true; return NS_OK; } private: ~ClosedStream() = default; }; NS_IMPL_ISUPPORTS(ClosedStream, nsIInputStream) class AsyncStream final : public nsIAsyncInputStream { public: NS_DECL_THREADSAFE_ISUPPORTS explicit AsyncStream(int64_t aSize) : mState(eBlocked), mSize(aSize) {} void Unblock() { mState = eUnblocked; } NS_IMETHOD Available(uint64_t* aLength) override { *aLength = mState == eBlocked ? 0 : mSize; return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_OK; } NS_IMETHOD StreamStatus() override { return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_OK; } NS_IMETHOD Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override { MOZ_CRASH("This should not be called!"); return NS_OK; } NS_IMETHOD ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, uint32_t* aResult) override { return NS_ERROR_NOT_IMPLEMENTED; } NS_IMETHOD Close() override { mState = eClosed; return NS_OK; } NS_IMETHOD IsNonBlocking(bool* aNonBlocking) override { *aNonBlocking = true; return NS_OK; } NS_IMETHOD AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags, uint32_t aRequestedCount, nsIEventTarget* aEventTarget) override { MOZ_CRASH("This should not be called!"); return NS_OK; } NS_IMETHOD CloseWithStatus(nsresult aStatus) override { return NS_OK; } private: ~AsyncStream() = default; enum { eBlocked, eUnblocked, eClosed } mState; uint64_t mSize; }; NS_IMPL_ISUPPORTS(AsyncStream, nsIInputStream, nsIAsyncInputStream) class BlockingStream final : public nsIInputStream { public: NS_DECL_THREADSAFE_ISUPPORTS BlockingStream() = default; NS_IMETHOD Available(uint64_t* aLength) override { return NS_BASE_STREAM_CLOSED; } NS_IMETHOD StreamStatus() override { return NS_BASE_STREAM_CLOSED; } NS_IMETHOD Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override { // We are actually empty. *aReadCount = 0; return NS_OK; } NS_IMETHOD ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, uint32_t* aResult) override { return NS_ERROR_NOT_IMPLEMENTED; } NS_IMETHOD Close() override { return NS_OK; } NS_IMETHOD IsNonBlocking(bool* aNonBlocking) override { *aNonBlocking = false; return NS_OK; } private: ~BlockingStream() = default; }; NS_IMPL_ISUPPORTS(BlockingStream, nsIInputStream) TEST(MultiplexInputStream, Available) { nsCOMPtr multiplexStream = do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); nsCOMPtr s = do_QueryInterface(multiplexStream); ASSERT_TRUE(!!s); nsCOMPtr as = do_QueryInterface(multiplexStream); ASSERT_TRUE(!as); uint64_t length; // The stream returns NS_BASE_STREAM_CLOSED if there are no substreams. nsresult rv = s->Available(&length); ASSERT_EQ(NS_BASE_STREAM_CLOSED, rv); rv = multiplexStream->AppendStream(new ClosedStream()); ASSERT_EQ(NS_OK, rv); uint64_t asyncSize = 2; RefPtr asyncStream = new AsyncStream(2); rv = multiplexStream->AppendStream(asyncStream); ASSERT_EQ(NS_OK, rv); nsCString buffer; buffer.Assign("World!!!"); nsCOMPtr stringStream; rv = NS_NewCStringInputStream(getter_AddRefs(stringStream), buffer); ASSERT_EQ(NS_OK, rv); rv = multiplexStream->AppendStream(stringStream); ASSERT_EQ(NS_OK, rv); // Now we are async. as = do_QueryInterface(multiplexStream); ASSERT_TRUE(!!as); // Available should skip the closed stream and return 0 because the // asyncStream returns 0 and it's async. rv = s->Available(&length); ASSERT_EQ(NS_OK, rv); ASSERT_EQ((uint64_t)0, length); asyncStream->Unblock(); // Now we should return only the size of the async stream because we don't // know when this is completed. rv = s->Available(&length); ASSERT_EQ(NS_OK, rv); ASSERT_EQ(asyncSize, length); asyncStream->Close(); rv = s->Available(&length); ASSERT_EQ(NS_OK, rv); ASSERT_EQ(buffer.Length(), length); } class NonBufferableStringStream final : public nsIInputStream { nsCOMPtr mStream; public: NS_DECL_THREADSAFE_ISUPPORTS explicit NonBufferableStringStream(const nsACString& aBuffer) { NS_NewCStringInputStream(getter_AddRefs(mStream), aBuffer); } NS_IMETHOD Available(uint64_t* aLength) override { return mStream->Available(aLength); } NS_IMETHOD StreamStatus() override { return mStream->StreamStatus(); } NS_IMETHOD Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override { return mStream->Read(aBuffer, aCount, aReadCount); } NS_IMETHOD ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, uint32_t* aResult) override { return NS_ERROR_NOT_IMPLEMENTED; } NS_IMETHOD Close() override { return mStream->Close(); } NS_IMETHOD IsNonBlocking(bool* aNonBlocking) override { return mStream->IsNonBlocking(aNonBlocking); } private: ~NonBufferableStringStream() = default; }; NS_IMPL_ISUPPORTS(NonBufferableStringStream, nsIInputStream) TEST(MultiplexInputStream, Bufferable) { nsCOMPtr multiplexStream = do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); nsCOMPtr s = do_QueryInterface(multiplexStream); ASSERT_TRUE(!!s); nsCString buf1; buf1.AssignLiteral("Hello "); nsCOMPtr inputStream1; nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream1), buf1); ASSERT_NS_SUCCEEDED(rv); nsCString buf2; buf2.AssignLiteral("world"); nsCOMPtr inputStream2 = new NonBufferableStringStream(buf2); rv = multiplexStream->AppendStream(inputStream1); ASSERT_NS_SUCCEEDED(rv); rv = multiplexStream->AppendStream(inputStream2); ASSERT_NS_SUCCEEDED(rv); nsCOMPtr stream(do_QueryInterface(multiplexStream)); ASSERT_TRUE(!!stream); char buf3[1024]; uint32_t size = 0; rv = stream->ReadSegments(NS_CopySegmentToBuffer, buf3, sizeof(buf3), &size); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(size, buf1.Length() + buf2.Length()); ASSERT_TRUE(!strncmp(buf3, "Hello world", size)); } TEST(MultiplexInputStream, QILengthInputStream) { nsCString buf; buf.AssignLiteral("Hello world"); nsCOMPtr multiplexStream = do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); // nsMultiplexInputStream doesn't expose nsIInputStreamLength if there are // no nsIInputStreamLength sub streams. { nsCOMPtr inputStream; nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream), buf); ASSERT_NS_SUCCEEDED(rv); rv = multiplexStream->AppendStream(inputStream); ASSERT_NS_SUCCEEDED(rv); nsCOMPtr fsis = do_QueryInterface(multiplexStream); ASSERT_TRUE(!fsis); nsCOMPtr afsis = do_QueryInterface(multiplexStream); ASSERT_TRUE(!afsis); } // nsMultiplexInputStream exposes nsIInputStreamLength if there is one or // more nsIInputStreamLength sub streams. { RefPtr inputStream = new testing::LengthInputStream(buf, true, false); nsresult rv = multiplexStream->AppendStream(inputStream); ASSERT_NS_SUCCEEDED(rv); nsCOMPtr fsis = do_QueryInterface(multiplexStream); ASSERT_TRUE(!!fsis); nsCOMPtr afsis = do_QueryInterface(multiplexStream); ASSERT_TRUE(!afsis); } // nsMultiplexInputStream exposes nsIAsyncInputStreamLength if there is one // or more nsIAsyncInputStreamLength sub streams. { RefPtr inputStream = new testing::LengthInputStream(buf, true, true); nsresult rv = multiplexStream->AppendStream(inputStream); ASSERT_NS_SUCCEEDED(rv); nsCOMPtr fsis = do_QueryInterface(multiplexStream); ASSERT_TRUE(!!fsis); nsCOMPtr afsis = do_QueryInterface(multiplexStream); ASSERT_TRUE(!!afsis); } } TEST(MultiplexInputStream, LengthInputStream) { nsCOMPtr multiplexStream = do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); // First stream is a a simple one. nsCString buf; buf.AssignLiteral("Hello world"); nsCOMPtr inputStream; nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream), buf); ASSERT_NS_SUCCEEDED(rv); rv = multiplexStream->AppendStream(inputStream); ASSERT_NS_SUCCEEDED(rv); // A LengthInputStream, non-async. RefPtr lengthStream = new testing::LengthInputStream(buf, true, false); rv = multiplexStream->AppendStream(lengthStream); ASSERT_NS_SUCCEEDED(rv); nsCOMPtr fsis = do_QueryInterface(multiplexStream); ASSERT_TRUE(!!fsis); // Size is the sum of the 2 streams. int64_t length; rv = fsis->Length(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(int64_t(buf.Length() * 2), length); // An async LengthInputStream. RefPtr asyncLengthStream = new testing::LengthInputStream(buf, true, true, NS_BASE_STREAM_WOULD_BLOCK); rv = multiplexStream->AppendStream(asyncLengthStream); ASSERT_NS_SUCCEEDED(rv); nsCOMPtr afsis = do_QueryInterface(multiplexStream); ASSERT_TRUE(!!afsis); // Now it would block. rv = fsis->Length(&length); ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv); // Let's read the size async. RefPtr callback = new testing::LengthCallback(); rv = afsis->AsyncLengthWait(callback, GetCurrentSerialEventTarget()); ASSERT_EQ(NS_OK, rv); MOZ_ALWAYS_TRUE(SpinEventLoopUntil( "xpcom:TEST(MultiplexInputStream, LengthInputStream) 1"_ns, [&]() { return callback->Called(); })); ASSERT_EQ(int64_t(buf.Length() * 3), callback->Size()); // Now a negative stream lengthStream = new testing::LengthInputStream(buf, true, false, NS_OK, true); rv = multiplexStream->AppendStream(lengthStream); ASSERT_NS_SUCCEEDED(rv); rv = fsis->Length(&length); ASSERT_NS_SUCCEEDED(rv); ASSERT_EQ(-1, length); // Another async LengthInputStream. asyncLengthStream = new testing::LengthInputStream( buf, true, true, NS_BASE_STREAM_WOULD_BLOCK); rv = multiplexStream->AppendStream(asyncLengthStream); ASSERT_NS_SUCCEEDED(rv); afsis = do_QueryInterface(multiplexStream); ASSERT_TRUE(!!afsis); // Let's read the size async. RefPtr callback1 = new testing::LengthCallback(); rv = afsis->AsyncLengthWait(callback1, GetCurrentSerialEventTarget()); ASSERT_EQ(NS_OK, rv); RefPtr callback2 = new testing::LengthCallback(); rv = afsis->AsyncLengthWait(callback2, GetCurrentSerialEventTarget()); ASSERT_EQ(NS_OK, rv); MOZ_ALWAYS_TRUE(SpinEventLoopUntil( "xpcom:TEST(MultiplexInputStream, LengthInputStream) 2"_ns, [&]() { return callback2->Called(); })); ASSERT_FALSE(callback1->Called()); ASSERT_TRUE(callback2->Called()); } void TestMultiplexStreamReadWhileWaiting(nsIAsyncInputStream* pipeIn, nsIAsyncOutputStream* pipeOut) { // We had an issue where a stream which was read while a message was in-flight // to report the stream was ready, meaning that the stream reported 0 bytes // available when checked in the MultiplexInputStream's callback, and was // skipped over. nsCOMPtr mainThread = NS_GetCurrentThread(); nsCOMPtr multiplexStream = do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1"); ASSERT_NS_SUCCEEDED(multiplexStream->AppendStream(pipeIn)); nsCOMPtr stringStream; ASSERT_TRUE(NS_SUCCEEDED( NS_NewCStringInputStream(getter_AddRefs(stringStream), "xxxx\0"_ns))); ASSERT_NS_SUCCEEDED(multiplexStream->AppendStream(stringStream)); nsCOMPtr asyncMultiplex = do_QueryInterface(multiplexStream); ASSERT_TRUE(asyncMultiplex); RefPtr cb = new testing::InputStreamCallback(); ASSERT_NS_SUCCEEDED(asyncMultiplex->AsyncWait(cb, 0, 0, mainThread)); EXPECT_FALSE(cb->Called()); NS_ProcessPendingEvents(mainThread); EXPECT_FALSE(cb->Called()); uint64_t available; ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); EXPECT_EQ(available, 0u); // Write some data to the pipe, which should wake up the async wait message to // be delivered. char toWrite[] = "1234"; uint32_t written; ASSERT_NS_SUCCEEDED(pipeOut->Write(toWrite, sizeof(toWrite), &written)); EXPECT_EQ(written, sizeof(toWrite)); EXPECT_FALSE(cb->Called()); ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); EXPECT_EQ(available, sizeof(toWrite)); // Read that data from the stream char toRead[sizeof(toWrite)]; uint32_t read; ASSERT_TRUE( NS_SUCCEEDED(asyncMultiplex->Read(toRead, sizeof(toRead), &read))); EXPECT_EQ(read, sizeof(toRead)); EXPECT_STREQ(toRead, toWrite); EXPECT_FALSE(cb->Called()); ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); EXPECT_EQ(available, 0u); // The multiplex stream will have detected the read and prevented the callback // from having been called yet. NS_ProcessPendingEvents(mainThread); EXPECT_FALSE(cb->Called()); ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); EXPECT_EQ(available, 0u); // Write more data and close, then make sure we can read everything else in // the stream. char toWrite2[] = "56789"; ASSERT_TRUE( NS_SUCCEEDED(pipeOut->Write(toWrite2, sizeof(toWrite2), &written))); EXPECT_EQ(written, sizeof(toWrite2)); EXPECT_FALSE(cb->Called()); ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); EXPECT_EQ(available, sizeof(toWrite2)); ASSERT_NS_SUCCEEDED(pipeOut->Close()); ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); // XXX: Theoretically if the multiplex stream could detect it, we could report // `sizeof(toWrite2) + 4` because the stream is complete, but there's no way // for the multiplex stream to know. EXPECT_EQ(available, sizeof(toWrite2)); NS_ProcessPendingEvents(mainThread); EXPECT_TRUE(cb->Called()); // Read that final bit of data and make sure we read it. char toRead2[sizeof(toWrite2)]; ASSERT_TRUE( NS_SUCCEEDED(asyncMultiplex->Read(toRead2, sizeof(toRead2), &read))); EXPECT_EQ(read, sizeof(toRead2)); EXPECT_STREQ(toRead2, toWrite2); ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); EXPECT_EQ(available, 5u); // Read the extra data as well. char extraRead[5]; ASSERT_TRUE( NS_SUCCEEDED(asyncMultiplex->Read(extraRead, sizeof(extraRead), &read))); EXPECT_EQ(read, sizeof(extraRead)); EXPECT_STREQ(extraRead, "xxxx"); ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available)); EXPECT_EQ(available, 0u); } TEST(MultiplexInputStream, ReadWhileWaiting_nsPipe) { nsCOMPtr pipeIn; nsCOMPtr pipeOut; NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(pipeOut), true, true); TestMultiplexStreamReadWhileWaiting(pipeIn, pipeOut); } TEST(MultiplexInputStream, ReadWhileWaiting_DataPipe) { RefPtr pipeIn; RefPtr pipeOut; ASSERT_TRUE(NS_SUCCEEDED(mozilla::ipc::NewDataPipe( mozilla::ipc::kDefaultDataPipeCapacity, getter_AddRefs(pipeOut), getter_AddRefs(pipeIn)))); TestMultiplexStreamReadWhileWaiting(pipeIn, pipeOut); }