summaryrefslogtreecommitdiffstats
path: root/xpcom/tests/gtest/TestPipes.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'xpcom/tests/gtest/TestPipes.cpp')
-rw-r--r--xpcom/tests/gtest/TestPipes.cpp1041
1 files changed, 1041 insertions, 0 deletions
diff --git a/xpcom/tests/gtest/TestPipes.cpp b/xpcom/tests/gtest/TestPipes.cpp
new file mode 100644
index 0000000000..b00baa17d0
--- /dev/null
+++ b/xpcom/tests/gtest/TestPipes.cpp
@@ -0,0 +1,1041 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include <algorithm>
+#include "gtest/gtest.h"
+#include "Helpers.h"
+#include "mozilla/ReentrantMonitor.h"
+#include "mozilla/Printf.h"
+#include "nsCOMPtr.h"
+#include "nsCRT.h"
+#include "nsIAsyncInputStream.h"
+#include "nsIAsyncOutputStream.h"
+#include "nsIBufferedStreams.h"
+#include "nsIClassInfo.h"
+#include "nsICloneableInputStream.h"
+#include "nsIInputStream.h"
+#include "nsIOutputStream.h"
+#include "nsIPipe.h"
+#include "nsITellableStream.h"
+#include "nsIThread.h"
+#include "nsIRunnable.h"
+#include "nsStreamUtils.h"
+#include "nsString.h"
+#include "nsThreadUtils.h"
+#include "prinrval.h"
+
+using namespace mozilla;
+
+#define ITERATIONS 33333
+char kTestPattern[] = "My hovercraft is full of eels.\n";
+
+bool gTrace = false;
+
+static nsresult WriteAll(nsIOutputStream* os, const char* buf, uint32_t bufLen,
+ uint32_t* lenWritten) {
+ const char* p = buf;
+ *lenWritten = 0;
+ while (bufLen) {
+ uint32_t n;
+ nsresult rv = os->Write(p, bufLen, &n);
+ if (NS_FAILED(rv)) return rv;
+ p += n;
+ bufLen -= n;
+ *lenWritten += n;
+ }
+ return NS_OK;
+}
+
+class nsReceiver final : public Runnable {
+ public:
+ NS_IMETHOD Run() override {
+ nsresult rv;
+ char buf[101];
+ uint32_t count;
+ PRIntervalTime start = PR_IntervalNow();
+ while (true) {
+ rv = mIn->Read(buf, 100, &count);
+ if (NS_FAILED(rv)) {
+ printf("read failed\n");
+ break;
+ }
+ if (count == 0) {
+ // printf("EOF count = %d\n", mCount);
+ break;
+ }
+
+ if (gTrace) {
+ buf[count] = '\0';
+ printf("read: %s\n", buf);
+ }
+ mCount += count;
+ }
+ PRIntervalTime end = PR_IntervalNow();
+ printf("read %d bytes, time = %dms\n", mCount,
+ PR_IntervalToMilliseconds(end - start));
+ return rv;
+ }
+
+ explicit nsReceiver(nsIInputStream* in)
+ : Runnable("nsReceiver"), mIn(in), mCount(0) {}
+
+ uint32_t GetBytesRead() { return mCount; }
+
+ private:
+ ~nsReceiver() = default;
+
+ protected:
+ nsCOMPtr<nsIInputStream> mIn;
+ uint32_t mCount;
+};
+
+static nsresult TestPipe(nsIInputStream* in, nsIOutputStream* out) {
+ RefPtr<nsReceiver> receiver = new nsReceiver(in);
+ nsresult rv;
+
+ nsCOMPtr<nsIThread> thread;
+ rv = NS_NewNamedThread("TestPipe", getter_AddRefs(thread), receiver);
+ if (NS_FAILED(rv)) return rv;
+
+ uint32_t total = 0;
+ PRIntervalTime start = PR_IntervalNow();
+ for (uint32_t i = 0; i < ITERATIONS; i++) {
+ uint32_t writeCount;
+ SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
+ uint32_t len = strlen(buf.get());
+ rv = WriteAll(out, buf.get(), len, &writeCount);
+ if (gTrace) {
+ printf("wrote: ");
+ for (uint32_t j = 0; j < writeCount; j++) {
+ putc(buf.get()[j], stdout);
+ }
+ printf("\n");
+ }
+ if (NS_FAILED(rv)) return rv;
+ total += writeCount;
+ }
+ rv = out->Close();
+ if (NS_FAILED(rv)) return rv;
+
+ PRIntervalTime end = PR_IntervalNow();
+
+ thread->Shutdown();
+
+ printf("wrote %d bytes, time = %dms\n", total,
+ PR_IntervalToMilliseconds(end - start));
+ EXPECT_EQ(receiver->GetBytesRead(), total);
+
+ return NS_OK;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class nsShortReader final : public Runnable {
+ public:
+ NS_IMETHOD Run() override {
+ nsresult rv;
+ char buf[101];
+ uint32_t count;
+ uint32_t total = 0;
+ while (true) {
+ // if (gTrace)
+ // printf("calling Read\n");
+ rv = mIn->Read(buf, 100, &count);
+ if (NS_FAILED(rv)) {
+ printf("read failed\n");
+ break;
+ }
+ if (count == 0) {
+ break;
+ }
+
+ if (gTrace) {
+ // For next |printf()| call and possible others elsewhere.
+ buf[count] = '\0';
+
+ printf("read %d bytes: %s\n", count, buf);
+ }
+
+ Received(count);
+ total += count;
+ }
+ printf("read %d bytes\n", total);
+ return rv;
+ }
+
+ explicit nsShortReader(nsIInputStream* in)
+ : Runnable("nsShortReader"), mIn(in), mReceived(0) {
+ mMon = new ReentrantMonitor("nsShortReader");
+ }
+
+ void Received(uint32_t count) {
+ ReentrantMonitorAutoEnter mon(*mMon);
+ mReceived += count;
+ mon.Notify();
+ }
+
+ uint32_t WaitForReceipt(const uint32_t aWriteCount) {
+ ReentrantMonitorAutoEnter mon(*mMon);
+ uint32_t result = mReceived;
+
+ while (result < aWriteCount) {
+ mon.Wait();
+
+ EXPECT_TRUE(mReceived > result);
+ result = mReceived;
+ }
+
+ mReceived = 0;
+ return result;
+ }
+
+ private:
+ ~nsShortReader() = default;
+
+ protected:
+ nsCOMPtr<nsIInputStream> mIn;
+ uint32_t mReceived;
+ ReentrantMonitor* mMon;
+};
+
+static nsresult TestShortWrites(nsIInputStream* in, nsIOutputStream* out) {
+ RefPtr<nsShortReader> receiver = new nsShortReader(in);
+ nsresult rv;
+
+ nsCOMPtr<nsIThread> thread;
+ rv = NS_NewNamedThread("TestShortWrites", getter_AddRefs(thread), receiver);
+ if (NS_FAILED(rv)) return rv;
+
+ uint32_t total = 0;
+ for (uint32_t i = 0; i < ITERATIONS; i++) {
+ uint32_t writeCount;
+ SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
+ uint32_t len = strlen(buf.get());
+ len = len * rand() / RAND_MAX;
+ len = std::min(1u, len);
+ rv = WriteAll(out, buf.get(), len, &writeCount);
+ if (NS_FAILED(rv)) return rv;
+ EXPECT_EQ(writeCount, len);
+ total += writeCount;
+
+ if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
+ // printf("calling Flush\n");
+ out->Flush();
+ // printf("calling WaitForReceipt\n");
+
+#ifdef DEBUG
+ const uint32_t received = receiver->WaitForReceipt(writeCount);
+ EXPECT_EQ(received, writeCount);
+#endif
+ }
+ rv = out->Close();
+ if (NS_FAILED(rv)) return rv;
+
+ thread->Shutdown();
+
+ printf("wrote %d bytes\n", total);
+
+ return NS_OK;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class nsPump final : public Runnable {
+ public:
+ NS_IMETHOD Run() override {
+ nsresult rv;
+ uint32_t count;
+ while (true) {
+ rv = mOut->WriteFrom(mIn, ~0U, &count);
+ if (NS_FAILED(rv)) {
+ printf("Write failed\n");
+ break;
+ }
+ if (count == 0) {
+ printf("EOF count = %d\n", mCount);
+ break;
+ }
+
+ if (gTrace) {
+ printf("Wrote: %d\n", count);
+ }
+ mCount += count;
+ }
+ mOut->Close();
+ return rv;
+ }
+
+ nsPump(nsIInputStream* in, nsIOutputStream* out)
+ : Runnable("nsPump"), mIn(in), mOut(out), mCount(0) {}
+
+ private:
+ ~nsPump() = default;
+
+ protected:
+ nsCOMPtr<nsIInputStream> mIn;
+ nsCOMPtr<nsIOutputStream> mOut;
+ uint32_t mCount;
+};
+
+TEST(Pipes, ChainedPipes)
+{
+ nsresult rv;
+ if (gTrace) {
+ printf("TestChainedPipes\n");
+ }
+
+ nsCOMPtr<nsIInputStream> in1;
+ nsCOMPtr<nsIOutputStream> out1;
+ rv = NS_NewPipe(getter_AddRefs(in1), getter_AddRefs(out1), 20, 1999);
+ if (NS_FAILED(rv)) return;
+
+ nsCOMPtr<nsIInputStream> in2;
+ nsCOMPtr<nsIOutputStream> out2;
+ rv = NS_NewPipe(getter_AddRefs(in2), getter_AddRefs(out2), 200, 401);
+ if (NS_FAILED(rv)) return;
+
+ RefPtr<nsPump> pump = new nsPump(in1, out2);
+ if (pump == nullptr) return;
+
+ nsCOMPtr<nsIThread> thread;
+ rv = NS_NewNamedThread("ChainedPipePump", getter_AddRefs(thread), pump);
+ if (NS_FAILED(rv)) return;
+
+ RefPtr<nsReceiver> receiver = new nsReceiver(in2);
+ if (receiver == nullptr) return;
+
+ nsCOMPtr<nsIThread> receiverThread;
+ rv = NS_NewNamedThread("ChainedPipeRecv", getter_AddRefs(receiverThread),
+ receiver);
+ if (NS_FAILED(rv)) return;
+
+ uint32_t total = 0;
+ for (uint32_t i = 0; i < ITERATIONS; i++) {
+ uint32_t writeCount;
+ SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
+ uint32_t len = strlen(buf.get());
+ len = len * rand() / RAND_MAX;
+ len = std::max(1u, len);
+ rv = WriteAll(out1, buf.get(), len, &writeCount);
+ if (NS_FAILED(rv)) return;
+ EXPECT_EQ(writeCount, len);
+ total += writeCount;
+
+ if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
+ }
+ if (gTrace) {
+ printf("wrote total of %d bytes\n", total);
+ }
+ rv = out1->Close();
+ if (NS_FAILED(rv)) return;
+
+ thread->Shutdown();
+ receiverThread->Shutdown();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void RunTests(uint32_t segSize, uint32_t segCount) {
+ nsresult rv;
+ nsCOMPtr<nsIInputStream> in;
+ nsCOMPtr<nsIOutputStream> out;
+ uint32_t bufSize = segSize * segCount;
+ if (gTrace) {
+ printf("Testing New Pipes: segment size %d buffer size %d\n", segSize,
+ bufSize);
+ printf("Testing long writes...\n");
+ }
+ rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
+ EXPECT_TRUE(NS_SUCCEEDED(rv));
+ rv = TestPipe(in, out);
+ EXPECT_TRUE(NS_SUCCEEDED(rv));
+
+ if (gTrace) {
+ printf("Testing short writes...\n");
+ }
+ rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
+ EXPECT_TRUE(NS_SUCCEEDED(rv));
+ rv = TestShortWrites(in, out);
+ EXPECT_TRUE(NS_SUCCEEDED(rv));
+}
+
+TEST(Pipes, Main)
+{
+ RunTests(16, 1);
+ RunTests(4096, 16);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+static const uint32_t DEFAULT_SEGMENT_SIZE = 4 * 1024;
+
+// An alternate pipe testing routing that uses NS_ConsumeStream() instead of
+// manual read loop.
+static void TestPipe2(uint32_t aNumBytes,
+ uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
+ nsCOMPtr<nsIInputStream> reader;
+ nsCOMPtr<nsIOutputStream> writer;
+
+ uint32_t maxSize = std::max(aNumBytes, aSegmentSize);
+
+ nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
+ aSegmentSize, maxSize);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsTArray<char> inputData;
+ testing::CreateData(aNumBytes, inputData);
+ testing::WriteAllAndClose(writer, inputData);
+ testing::ConsumeAndValidateStream(reader, inputData);
+}
+
+} // namespace
+
+TEST(Pipes, Blocking_32k)
+{ TestPipe2(32 * 1024); }
+
+TEST(Pipes, Blocking_64k)
+{ TestPipe2(64 * 1024); }
+
+TEST(Pipes, Blocking_128k)
+{ TestPipe2(128 * 1024); }
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+// Utility routine to validate pipe clone before. There are many knobs.
+//
+// aTotalBytes Total number of bytes to write to the pipe.
+// aNumWrites How many separate write calls should be made. Bytes
+// are evenly distributed over these write calls.
+// aNumInitialClones How many clones of the pipe input stream should be
+// made before writing begins.
+// aNumToCloseAfterWrite How many streams should be closed after each write.
+// One stream is always kept open. This verifies that
+// closing one stream does not effect other open
+// streams.
+// aNumToCloneAfterWrite How many clones to create after each write. Occurs
+// after closing any streams. This tests cloning
+// active streams on a pipe that is being written to.
+// aNumStreamToReadPerWrite How many streams to read fully after each write.
+// This tests reading cloned streams at different rates
+// while the pipe is being written to.
+static void TestPipeClone(uint32_t aTotalBytes, uint32_t aNumWrites,
+ uint32_t aNumInitialClones,
+ uint32_t aNumToCloseAfterWrite,
+ uint32_t aNumToCloneAfterWrite,
+ uint32_t aNumStreamsToReadPerWrite,
+ uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
+ nsCOMPtr<nsIInputStream> reader;
+ nsCOMPtr<nsIOutputStream> writer;
+
+ uint32_t maxSize = std::max(aTotalBytes, aSegmentSize);
+
+ // Use async input streams so we can NS_ConsumeStream() the current data
+ // while the pipe is still being written to.
+ nsresult rv =
+ NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), aSegmentSize,
+ maxSize, true, false); // non-blocking - reader, writer
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(reader);
+ ASSERT_TRUE(cloneable);
+ ASSERT_TRUE(cloneable->GetCloneable());
+
+ nsTArray<nsCString> outputDataList;
+
+ nsTArray<nsCOMPtr<nsIInputStream>> streamList;
+
+ // first stream is our original reader from the pipe
+ streamList.AppendElement(reader);
+ outputDataList.AppendElement();
+
+ // Clone the initial input stream the specified number of times
+ // before performing any writes.
+ for (uint32_t i = 0; i < aNumInitialClones; ++i) {
+ nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
+ rv = cloneable->Clone(getter_AddRefs(*clone));
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+ ASSERT_TRUE(*clone);
+
+ outputDataList.AppendElement();
+ }
+
+ nsTArray<char> inputData;
+ testing::CreateData(aTotalBytes, inputData);
+
+ const uint32_t bytesPerWrite = ((aTotalBytes - 1) / aNumWrites) + 1;
+ uint32_t offset = 0;
+ uint32_t remaining = aTotalBytes;
+ uint32_t nextStreamToRead = 0;
+
+ while (remaining) {
+ uint32_t numToWrite = std::min(bytesPerWrite, remaining);
+ testing::Write(writer, inputData, offset, numToWrite);
+ offset += numToWrite;
+ remaining -= numToWrite;
+
+ // Close the specified number of streams. This allows us to
+ // test that one closed clone does not break other open clones.
+ for (uint32_t i = 0; i < aNumToCloseAfterWrite && streamList.Length() > 1;
+ ++i) {
+ uint32_t lastIndex = streamList.Length() - 1;
+ streamList[lastIndex]->Close();
+ streamList.RemoveElementAt(lastIndex);
+ outputDataList.RemoveElementAt(lastIndex);
+
+ if (nextStreamToRead >= streamList.Length()) {
+ nextStreamToRead = 0;
+ }
+ }
+
+ // Create the specified number of clones. This lets us verify
+ // that we can create clones in the middle of pipe reading and
+ // writing.
+ for (uint32_t i = 0; i < aNumToCloneAfterWrite; ++i) {
+ nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
+ rv = cloneable->Clone(getter_AddRefs(*clone));
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+ ASSERT_TRUE(*clone);
+
+ // Initialize the new output data to make whats been read to data for
+ // the original stream. First stream is always the original stream.
+ nsCString* outputData = outputDataList.AppendElement();
+ *outputData = outputDataList[0];
+ }
+
+ // Read the specified number of streams. This lets us verify that we
+ // can read from the clones at different rates while the pipe is being
+ // written to.
+ for (uint32_t i = 0; i < aNumStreamsToReadPerWrite; ++i) {
+ nsCOMPtr<nsIInputStream>& stream = streamList[nextStreamToRead];
+ nsCString& outputData = outputDataList[nextStreamToRead];
+
+ // Can't use ConsumeAndValidateStream() here because we're not
+ // guaranteed the exact amount read. It should just be at least
+ // as many as numToWrite.
+ nsAutoCString tmpOutputData;
+ rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
+ ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
+ ASSERT_GE(tmpOutputData.Length(), numToWrite);
+
+ outputData += tmpOutputData;
+
+ nextStreamToRead += 1;
+ if (nextStreamToRead >= streamList.Length()) {
+ // Note: When we wrap around on the streams being read, its possible
+ // we will trigger a segment to be deleted from the pipe. It
+ // would be nice to validate this here, but we don't have any
+ // QI'able interface that would let us check easily.
+
+ nextStreamToRead = 0;
+ }
+ }
+ }
+
+ rv = writer->Close();
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsDependentCSubstring inputString(inputData.Elements(), inputData.Length());
+
+ // Finally, read the remaining bytes from each stream. This may be
+ // different amounts of data depending on how much reading we did while
+ // writing. Verify that the end result matches the input data.
+ for (uint32_t i = 0; i < streamList.Length(); ++i) {
+ nsCOMPtr<nsIInputStream>& stream = streamList[i];
+ nsCString& outputData = outputDataList[i];
+
+ nsAutoCString tmpOutputData;
+ rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
+ ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
+ stream->Close();
+
+ // Append to total amount read from the stream
+ outputData += tmpOutputData;
+
+ ASSERT_EQ(inputString.Length(), outputData.Length());
+ ASSERT_TRUE(inputString.Equals(outputData));
+ }
+}
+
+} // namespace
+
+TEST(Pipes, Clone_BeforeWrite_ReadAtEnd)
+{
+ TestPipeClone(32 * 1024, // total bytes
+ 16, // num writes
+ 3, // num initial clones
+ 0, // num streams to close after each write
+ 0, // num clones to add after each write
+ 0); // num streams to read after each write
+}
+
+TEST(Pipes, Clone_BeforeWrite_ReadDuringWrite)
+{
+ // Since this reads all streams on every write, it should trigger the
+ // pipe cursor roll back optimization. Currently we can only verify
+ // this with logging.
+
+ TestPipeClone(32 * 1024, // total bytes
+ 16, // num writes
+ 3, // num initial clones
+ 0, // num streams to close after each write
+ 0, // num clones to add after each write
+ 4); // num streams to read after each write
+}
+
+TEST(Pipes, Clone_DuringWrite_ReadAtEnd)
+{
+ TestPipeClone(32 * 1024, // total bytes
+ 16, // num writes
+ 0, // num initial clones
+ 0, // num streams to close after each write
+ 1, // num clones to add after each write
+ 0); // num streams to read after each write
+}
+
+TEST(Pipes, Clone_DuringWrite_ReadDuringWrite)
+{
+ TestPipeClone(32 * 1024, // total bytes
+ 16, // num writes
+ 0, // num initial clones
+ 0, // num streams to close after each write
+ 1, // num clones to add after each write
+ 1); // num streams to read after each write
+}
+
+TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)
+{
+ // Since this reads streams faster than we clone new ones, it should
+ // trigger pipe segment deletion periodically. Currently we can
+ // only verify this with logging.
+
+ TestPipeClone(32 * 1024, // total bytes
+ 16, // num writes
+ 1, // num initial clones
+ 1, // num streams to close after each write
+ 2, // num clones to add after each write
+ 3); // num streams to read after each write
+}
+
+TEST(Pipes, Write_AsyncWait)
+{
+ nsCOMPtr<nsIAsyncInputStream> reader;
+ nsCOMPtr<nsIAsyncOutputStream> writer;
+
+ const uint32_t segmentSize = 1024;
+ const uint32_t numSegments = 1;
+
+ nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
+ true, true, // non-blocking - reader, writer
+ segmentSize, numSegments);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsTArray<char> inputData;
+ testing::CreateData(segmentSize, inputData);
+
+ uint32_t numWritten = 0;
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
+
+ RefPtr<testing::OutputStreamCallback> cb =
+ new testing::OutputStreamCallback();
+
+ rv = writer->AsyncWait(cb, 0, 0, nullptr);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ ASSERT_FALSE(cb->Called());
+
+ testing::ConsumeAndValidateStream(reader, inputData);
+
+ ASSERT_TRUE(cb->Called());
+}
+
+TEST(Pipes, Write_AsyncWait_Clone)
+{
+ nsCOMPtr<nsIAsyncInputStream> reader;
+ nsCOMPtr<nsIAsyncOutputStream> writer;
+
+ const uint32_t segmentSize = 1024;
+ const uint32_t numSegments = 1;
+
+ nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
+ true, true, // non-blocking - reader, writer
+ segmentSize, numSegments);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsCOMPtr<nsIInputStream> clone;
+ rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsTArray<char> inputData;
+ testing::CreateData(segmentSize, inputData);
+
+ uint32_t numWritten = 0;
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ // This attempts to write data beyond the original pipe size limit. It
+ // should fail since neither side of the clone has been read yet.
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
+
+ RefPtr<testing::OutputStreamCallback> cb =
+ new testing::OutputStreamCallback();
+
+ rv = writer->AsyncWait(cb, 0, 0, nullptr);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ ASSERT_FALSE(cb->Called());
+
+ // Consume data on the original stream, but the clone still has not been read.
+ testing::ConsumeAndValidateStream(reader, inputData);
+
+ // A clone that is not being read should not stall the other input stream
+ // reader. Therefore the writer callback should trigger when the fastest
+ // reader drains the other input stream.
+ ASSERT_TRUE(cb->Called());
+
+ // Attempt to write data. This will buffer data beyond the pipe size limit in
+ // order for the clone stream to still work. This is allowed because the
+ // other input stream has drained its buffered segments and is ready for more
+ // data.
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ // Again, this should fail since the origin stream has not been read again.
+ // The pipe size should still restrict how far ahead we can buffer even
+ // when there is a cloned stream not being read.
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_FAILED(rv));
+
+ cb = new testing::OutputStreamCallback();
+ rv = writer->AsyncWait(cb, 0, 0, nullptr);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ // The write should again be blocked since we have written data and the
+ // main reader is at its maximum advance buffer.
+ ASSERT_FALSE(cb->Called());
+
+ nsTArray<char> expectedCloneData;
+ expectedCloneData.AppendElements(inputData);
+ expectedCloneData.AppendElements(inputData);
+
+ // We should now be able to consume the entire backlog of buffered data on
+ // the cloned stream.
+ testing::ConsumeAndValidateStream(clone, expectedCloneData);
+
+ // Draining the clone side should also trigger the AsyncWait() writer
+ // callback
+ ASSERT_TRUE(cb->Called());
+
+ // Finally, we should be able to consume the remaining data on the original
+ // reader.
+ testing::ConsumeAndValidateStream(reader, inputData);
+}
+
+TEST(Pipes, Write_AsyncWait_Clone_CloseOriginal)
+{
+ nsCOMPtr<nsIAsyncInputStream> reader;
+ nsCOMPtr<nsIAsyncOutputStream> writer;
+
+ const uint32_t segmentSize = 1024;
+ const uint32_t numSegments = 1;
+
+ nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
+ true, true, // non-blocking - reader, writer
+ segmentSize, numSegments);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsCOMPtr<nsIInputStream> clone;
+ rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsTArray<char> inputData;
+ testing::CreateData(segmentSize, inputData);
+
+ uint32_t numWritten = 0;
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ // This attempts to write data beyond the original pipe size limit. It
+ // should fail since neither side of the clone has been read yet.
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
+
+ RefPtr<testing::OutputStreamCallback> cb =
+ new testing::OutputStreamCallback();
+
+ rv = writer->AsyncWait(cb, 0, 0, nullptr);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ ASSERT_FALSE(cb->Called());
+
+ // Consume data on the original stream, but the clone still has not been read.
+ testing::ConsumeAndValidateStream(reader, inputData);
+
+ // A clone that is not being read should not stall the other input stream
+ // reader. Therefore the writer callback should trigger when the fastest
+ // reader drains the other input stream.
+ ASSERT_TRUE(cb->Called());
+
+ // Attempt to write data. This will buffer data beyond the pipe size limit in
+ // order for the clone stream to still work. This is allowed because the
+ // other input stream has drained its buffered segments and is ready for more
+ // data.
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ // Again, this should fail since the origin stream has not been read again.
+ // The pipe size should still restrict how far ahead we can buffer even
+ // when there is a cloned stream not being read.
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_FAILED(rv));
+
+ cb = new testing::OutputStreamCallback();
+ rv = writer->AsyncWait(cb, 0, 0, nullptr);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ // The write should again be blocked since we have written data and the
+ // main reader is at its maximum advance buffer.
+ ASSERT_FALSE(cb->Called());
+
+ // Close the original reader input stream. This was the fastest reader,
+ // so we should have a single stream that is buffered beyond our nominal
+ // limit.
+ reader->Close();
+
+ // Because the clone stream is still buffered the writable callback should
+ // not be fired.
+ ASSERT_FALSE(cb->Called());
+
+ // And we should not be able to perform a write.
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_FAILED(rv));
+
+ // Create another clone stream. Now we have two streams that exceed our
+ // maximum size limit
+ nsCOMPtr<nsIInputStream> clone2;
+ rv = NS_CloneInputStream(clone, getter_AddRefs(clone2));
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsTArray<char> expectedCloneData;
+ expectedCloneData.AppendElements(inputData);
+ expectedCloneData.AppendElements(inputData);
+
+ // We should now be able to consume the entire backlog of buffered data on
+ // the cloned stream.
+ testing::ConsumeAndValidateStream(clone, expectedCloneData);
+
+ // The pipe should now be writable because we have two open streams, one of
+ // which is completely drained.
+ ASSERT_TRUE(cb->Called());
+
+ // Write again to reach our limit again.
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ // The stream is again non-writeable.
+ cb = new testing::OutputStreamCallback();
+ rv = writer->AsyncWait(cb, 0, 0, nullptr);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+ ASSERT_FALSE(cb->Called());
+
+ // Close the empty stream. This is different from our previous close since
+ // before we were closing a stream with some data still buffered.
+ clone->Close();
+
+ // The pipe should not be writable. The second clone is still fully buffered
+ // over our limit.
+ ASSERT_FALSE(cb->Called());
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_FAILED(rv));
+
+ // Finally consume all of the buffered data on the second clone.
+ expectedCloneData.AppendElements(inputData);
+ testing::ConsumeAndValidateStream(clone2, expectedCloneData);
+
+ // Draining the final clone should make the pipe writable again.
+ ASSERT_TRUE(cb->Called());
+}
+
+TEST(Pipes, Read_AsyncWait)
+{
+ nsCOMPtr<nsIAsyncInputStream> reader;
+ nsCOMPtr<nsIAsyncOutputStream> writer;
+
+ const uint32_t segmentSize = 1024;
+ const uint32_t numSegments = 1;
+
+ nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
+ true, true, // non-blocking - reader, writer
+ segmentSize, numSegments);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsTArray<char> inputData;
+ testing::CreateData(segmentSize, inputData);
+
+ RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
+
+ rv = reader->AsyncWait(cb, 0, 0, nullptr);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ ASSERT_FALSE(cb->Called());
+
+ uint32_t numWritten = 0;
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ ASSERT_TRUE(cb->Called());
+
+ testing::ConsumeAndValidateStream(reader, inputData);
+}
+
+TEST(Pipes, Read_AsyncWait_Clone)
+{
+ nsCOMPtr<nsIAsyncInputStream> reader;
+ nsCOMPtr<nsIAsyncOutputStream> writer;
+
+ const uint32_t segmentSize = 1024;
+ const uint32_t numSegments = 1;
+
+ nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
+ true, true, // non-blocking - reader, writer
+ segmentSize, numSegments);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsCOMPtr<nsIInputStream> clone;
+ rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsCOMPtr<nsIAsyncInputStream> asyncClone = do_QueryInterface(clone);
+ ASSERT_TRUE(asyncClone);
+
+ nsTArray<char> inputData;
+ testing::CreateData(segmentSize, inputData);
+
+ RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
+
+ RefPtr<testing::InputStreamCallback> cb2 = new testing::InputStreamCallback();
+
+ rv = reader->AsyncWait(cb, 0, 0, nullptr);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ ASSERT_FALSE(cb->Called());
+
+ rv = asyncClone->AsyncWait(cb2, 0, 0, nullptr);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ ASSERT_FALSE(cb2->Called());
+
+ uint32_t numWritten = 0;
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ ASSERT_TRUE(cb->Called());
+ ASSERT_TRUE(cb2->Called());
+
+ testing::ConsumeAndValidateStream(reader, inputData);
+}
+
+namespace {
+
+nsresult CloseDuringReadFunc(nsIInputStream* aReader, void* aClosure,
+ const char* aFromSegment, uint32_t aToOffset,
+ uint32_t aCount, uint32_t* aWriteCountOut) {
+ MOZ_RELEASE_ASSERT(aReader);
+ MOZ_RELEASE_ASSERT(aClosure);
+ MOZ_RELEASE_ASSERT(aFromSegment);
+ MOZ_RELEASE_ASSERT(aWriteCountOut);
+ MOZ_RELEASE_ASSERT(aToOffset == 0);
+
+ // This is insanity and you probably should not do this under normal
+ // conditions. We want to simulate the case where the pipe is closed
+ // (possibly from other end on another thread) simultaneously with the
+ // read. This is the easiest way to do trigger this case in a synchronous
+ // gtest.
+ MOZ_ALWAYS_SUCCEEDS(aReader->Close());
+
+ nsTArray<char>* buffer = static_cast<nsTArray<char>*>(aClosure);
+ buffer->AppendElements(aFromSegment, aCount);
+
+ *aWriteCountOut = aCount;
+
+ return NS_OK;
+}
+
+void TestCloseDuringRead(uint32_t aSegmentSize, uint32_t aDataSize) {
+ nsCOMPtr<nsIInputStream> reader;
+ nsCOMPtr<nsIOutputStream> writer;
+
+ const uint32_t maxSize = aSegmentSize;
+
+ nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
+ aSegmentSize, maxSize);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsTArray<char> inputData;
+
+ testing::CreateData(aDataSize, inputData);
+
+ uint32_t numWritten = 0;
+ rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsTArray<char> outputData;
+
+ uint32_t numRead = 0;
+ rv = reader->ReadSegments(CloseDuringReadFunc, &outputData,
+ inputData.Length(), &numRead);
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+ ASSERT_EQ(inputData.Length(), numRead);
+
+ ASSERT_EQ(inputData, outputData);
+
+ uint64_t available;
+ rv = reader->Available(&available);
+ ASSERT_EQ(NS_BASE_STREAM_CLOSED, rv);
+}
+
+} // namespace
+
+TEST(Pipes, Close_During_Read_Partial_Segment)
+{ TestCloseDuringRead(1024, 512); }
+
+TEST(Pipes, Close_During_Read_Full_Segment)
+{ TestCloseDuringRead(1024, 1024); }
+
+TEST(Pipes, Interfaces)
+{
+ nsCOMPtr<nsIInputStream> reader;
+ nsCOMPtr<nsIOutputStream> writer;
+
+ nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer));
+ ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+ nsCOMPtr<nsIAsyncInputStream> readerType1 = do_QueryInterface(reader);
+ ASSERT_TRUE(readerType1);
+
+ nsCOMPtr<nsITellableStream> readerType2 = do_QueryInterface(reader);
+ ASSERT_TRUE(readerType2);
+
+ nsCOMPtr<nsISearchableInputStream> readerType3 = do_QueryInterface(reader);
+ ASSERT_TRUE(readerType3);
+
+ nsCOMPtr<nsICloneableInputStream> readerType4 = do_QueryInterface(reader);
+ ASSERT_TRUE(readerType4);
+
+ nsCOMPtr<nsIClassInfo> readerType5 = do_QueryInterface(reader);
+ ASSERT_TRUE(readerType5);
+
+ nsCOMPtr<nsIBufferedInputStream> readerType6 = do_QueryInterface(reader);
+ ASSERT_TRUE(readerType6);
+}