958 lines
29 KiB
C++
958 lines
29 KiB
C++
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
|
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
|
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
#include "gtest/gtest.h"
|
|
#include "mozilla/gtest/MozAssertions.h"
|
|
#include "mozilla/ipc/DataPipe.h"
|
|
#include "mozilla/SpinEventLoopUntil.h"
|
|
#include "nsIAsyncInputStream.h"
|
|
#include "nsComponentManagerUtils.h"
|
|
#include "nsIAsyncOutputStream.h"
|
|
#include "nsIInputStream.h"
|
|
#include "nsIMultiplexInputStream.h"
|
|
#include "nsIPipe.h"
|
|
#include "nsISeekableStream.h"
|
|
#include "nsStreamUtils.h"
|
|
#include "nsThreadUtils.h"
|
|
#include "nsIThread.h"
|
|
#include "Helpers.h"
|
|
|
|
using mozilla::GetCurrentSerialEventTarget;
|
|
using mozilla::SpinEventLoopUntil;
|
|
|
|
TEST(MultiplexInputStream, Seek_SET)
|
|
{
|
|
nsCString buf1;
|
|
nsCString buf2;
|
|
nsCString buf3;
|
|
buf1.AssignLiteral("Hello world");
|
|
buf2.AssignLiteral("The quick brown fox jumped over the lazy dog");
|
|
buf3.AssignLiteral("Foo bar");
|
|
|
|
nsCOMPtr<nsIInputStream> inputStream1;
|
|
nsCOMPtr<nsIInputStream> inputStream2;
|
|
nsCOMPtr<nsIInputStream> inputStream3;
|
|
nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream1), buf1);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = NS_NewCStringInputStream(getter_AddRefs(inputStream2), buf2);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = NS_NewCStringInputStream(getter_AddRefs(inputStream3), buf3);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
nsCOMPtr<nsIMultiplexInputStream> multiplexStream =
|
|
do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1");
|
|
ASSERT_TRUE(multiplexStream);
|
|
nsCOMPtr<nsIInputStream> stream(do_QueryInterface(multiplexStream));
|
|
ASSERT_TRUE(stream);
|
|
|
|
rv = multiplexStream->AppendStream(inputStream1);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = multiplexStream->AppendStream(inputStream2);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = multiplexStream->AppendStream(inputStream3);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
int64_t tell;
|
|
uint64_t length;
|
|
uint32_t count;
|
|
char readBuf[4096];
|
|
nsCOMPtr<nsISeekableStream> seekStream = do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(seekStream);
|
|
|
|
// Seek forward in first input stream
|
|
rv = seekStream->Seek(nsISeekableStream::NS_SEEK_SET, 1);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = stream->Available(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)buf1.Length() + buf2.Length() + buf3.Length() - 1,
|
|
length);
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, 1);
|
|
|
|
// Check read is correct
|
|
rv = stream->Read(readBuf, 3, &count);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)3, count);
|
|
rv = stream->Available(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)buf1.Length() + buf2.Length() + buf3.Length() - 4,
|
|
length);
|
|
ASSERT_EQ(0, strncmp(readBuf, "ell", count));
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, 4);
|
|
|
|
// Seek to start of third input stream
|
|
rv = seekStream->Seek(nsISeekableStream::NS_SEEK_SET,
|
|
buf1.Length() + buf2.Length());
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = stream->Available(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)buf3.Length(), length);
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, int64_t(buf1.Length() + buf2.Length()));
|
|
|
|
// Check read is correct
|
|
rv = stream->Read(readBuf, 5, &count);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)5, count);
|
|
rv = stream->Available(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)buf3.Length() - 5, length);
|
|
ASSERT_EQ(0, strncmp(readBuf, "Foo b", count));
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, int64_t(buf1.Length() + buf2.Length() + 5));
|
|
|
|
// Seek back to start of second stream (covers bug 1272371)
|
|
rv = seekStream->Seek(nsISeekableStream::NS_SEEK_SET, buf1.Length());
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = stream->Available(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)buf2.Length() + buf3.Length(), length);
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, int64_t(buf1.Length()));
|
|
|
|
// Check read is correct
|
|
rv = stream->Read(readBuf, 6, &count);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)6, count);
|
|
rv = stream->Available(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)buf2.Length() - 6 + buf3.Length(), length);
|
|
ASSERT_EQ(0, strncmp(readBuf, "The qu", count));
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, int64_t(buf1.Length() + 6));
|
|
}
|
|
|
|
TEST(MultiplexInputStream, Seek_CUR)
|
|
{
|
|
nsCString buf1;
|
|
nsCString buf2;
|
|
nsCString buf3;
|
|
buf1.AssignLiteral("Hello world");
|
|
buf2.AssignLiteral("The quick brown fox jumped over the lazy dog");
|
|
buf3.AssignLiteral("Foo bar");
|
|
|
|
nsCOMPtr<nsIInputStream> inputStream1;
|
|
nsCOMPtr<nsIInputStream> inputStream2;
|
|
nsCOMPtr<nsIInputStream> inputStream3;
|
|
nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream1), buf1);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = NS_NewCStringInputStream(getter_AddRefs(inputStream2), buf2);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = NS_NewCStringInputStream(getter_AddRefs(inputStream3), buf3);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
nsCOMPtr<nsIMultiplexInputStream> multiplexStream =
|
|
do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1");
|
|
ASSERT_TRUE(multiplexStream);
|
|
nsCOMPtr<nsIInputStream> stream(do_QueryInterface(multiplexStream));
|
|
ASSERT_TRUE(stream);
|
|
|
|
rv = multiplexStream->AppendStream(inputStream1);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = multiplexStream->AppendStream(inputStream2);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = multiplexStream->AppendStream(inputStream3);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
int64_t tell;
|
|
uint64_t length;
|
|
uint32_t count;
|
|
char readBuf[4096];
|
|
nsCOMPtr<nsISeekableStream> seekStream = do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(seekStream);
|
|
|
|
// Seek forward in first input stream
|
|
rv = seekStream->Seek(nsISeekableStream::NS_SEEK_CUR, 1);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = stream->Available(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)buf1.Length() + buf2.Length() + buf3.Length() - 1,
|
|
length);
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, 1);
|
|
|
|
// Check read is correct
|
|
rv = stream->Read(readBuf, 3, &count);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)3, count);
|
|
rv = stream->Available(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)buf1.Length() + buf2.Length() + buf3.Length() - 4,
|
|
length);
|
|
ASSERT_EQ(0, strncmp(readBuf, "ell", count));
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, 4);
|
|
|
|
// Let's go to the second stream
|
|
rv = seekStream->Seek(nsISeekableStream::NS_SEEK_CUR, 11);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, 15);
|
|
rv = stream->Read(readBuf, 3, &count);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)3, count);
|
|
ASSERT_EQ(0, strncmp(readBuf, "qui", count));
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, 18);
|
|
|
|
// Let's go back to the first stream
|
|
rv = seekStream->Seek(nsISeekableStream::NS_SEEK_CUR, -9);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, 9);
|
|
rv = stream->Read(readBuf, 3, &count);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)3, count);
|
|
ASSERT_EQ(0, strncmp(readBuf, "ldT", count));
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, 12);
|
|
}
|
|
|
|
TEST(MultiplexInputStream, Seek_END)
|
|
{
|
|
nsCString buf1;
|
|
nsCString buf2;
|
|
nsCString buf3;
|
|
buf1.AssignLiteral("Hello world");
|
|
buf2.AssignLiteral("The quick brown fox jumped over the lazy dog");
|
|
buf3.AssignLiteral("Foo bar");
|
|
|
|
nsCOMPtr<nsIInputStream> inputStream1;
|
|
nsCOMPtr<nsIInputStream> inputStream2;
|
|
nsCOMPtr<nsIInputStream> inputStream3;
|
|
nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream1), buf1);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = NS_NewCStringInputStream(getter_AddRefs(inputStream2), buf2);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = NS_NewCStringInputStream(getter_AddRefs(inputStream3), buf3);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
nsCOMPtr<nsIMultiplexInputStream> multiplexStream =
|
|
do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1");
|
|
ASSERT_TRUE(multiplexStream);
|
|
nsCOMPtr<nsIInputStream> stream(do_QueryInterface(multiplexStream));
|
|
ASSERT_TRUE(stream);
|
|
|
|
rv = multiplexStream->AppendStream(inputStream1);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = multiplexStream->AppendStream(inputStream2);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = multiplexStream->AppendStream(inputStream3);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
int64_t tell;
|
|
uint64_t length;
|
|
nsCOMPtr<nsISeekableStream> seekStream = do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(seekStream);
|
|
|
|
// SEEK_END wants <=0 values
|
|
rv = seekStream->Seek(nsISeekableStream::NS_SEEK_END, 1);
|
|
ASSERT_NS_FAILED(rv);
|
|
|
|
// Let's go to the end.
|
|
rv = seekStream->Seek(nsISeekableStream::NS_SEEK_END, 0);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = stream->Available(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)0, length);
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, int64_t(buf1.Length() + buf2.Length() + buf3.Length()));
|
|
|
|
// -1
|
|
rv = seekStream->Seek(nsISeekableStream::NS_SEEK_END, -1);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = stream->Available(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ((uint64_t)1, length);
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, int64_t(buf1.Length() + buf2.Length() + buf3.Length() - 1));
|
|
|
|
// Almost at the beginning
|
|
tell = 1;
|
|
tell -= buf1.Length();
|
|
tell -= buf2.Length();
|
|
tell -= buf3.Length();
|
|
rv = seekStream->Seek(nsISeekableStream::NS_SEEK_END, tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
rv = stream->Available(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(length, buf1.Length() + buf2.Length() + buf3.Length() - 1);
|
|
rv = seekStream->Tell(&tell);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(tell, 1);
|
|
}
|
|
|
|
static already_AddRefed<nsIInputStream> CreateStreamHelper() {
|
|
nsCOMPtr<nsIMultiplexInputStream> multiplexStream =
|
|
do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1");
|
|
|
|
nsCString buf1;
|
|
buf1.AssignLiteral("Hello");
|
|
|
|
nsCOMPtr<nsIInputStream> inputStream1 = new testing::AsyncStringStream(buf1);
|
|
multiplexStream->AppendStream(inputStream1);
|
|
|
|
nsCString buf2;
|
|
buf2.AssignLiteral(" ");
|
|
|
|
nsCOMPtr<nsIInputStream> inputStream2 = new testing::AsyncStringStream(buf2);
|
|
multiplexStream->AppendStream(inputStream2);
|
|
|
|
nsCString buf3;
|
|
buf3.AssignLiteral("World!");
|
|
|
|
nsCOMPtr<nsIInputStream> inputStream3 = new testing::AsyncStringStream(buf3);
|
|
multiplexStream->AppendStream(inputStream3);
|
|
|
|
nsCOMPtr<nsIInputStream> stream(do_QueryInterface(multiplexStream));
|
|
return stream.forget();
|
|
}
|
|
|
|
// AsyncWait - without EventTarget
|
|
TEST(MultiplexInputStream, AsyncWait_withoutEventTarget)
|
|
{
|
|
nsCOMPtr<nsIInputStream> is = CreateStreamHelper();
|
|
|
|
nsCOMPtr<nsIAsyncInputStream> ais = do_QueryInterface(is);
|
|
ASSERT_TRUE(!!ais);
|
|
|
|
RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
|
|
|
|
ASSERT_EQ(NS_OK, ais->AsyncWait(cb, 0, 0, nullptr));
|
|
ASSERT_TRUE(cb->Called());
|
|
}
|
|
|
|
// AsyncWait - with EventTarget
|
|
TEST(MultiplexInputStream, AsyncWait_withEventTarget)
|
|
{
|
|
nsCOMPtr<nsIInputStream> is = CreateStreamHelper();
|
|
|
|
nsCOMPtr<nsIAsyncInputStream> ais = do_QueryInterface(is);
|
|
ASSERT_TRUE(!!ais);
|
|
|
|
RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
|
|
nsCOMPtr<nsIThread> thread = do_GetCurrentThread();
|
|
|
|
ASSERT_EQ(NS_OK, ais->AsyncWait(cb, 0, 0, thread));
|
|
|
|
ASSERT_FALSE(cb->Called());
|
|
|
|
// Eventually it is called.
|
|
MOZ_ALWAYS_TRUE(mozilla::SpinEventLoopUntil(
|
|
"xpcom:TEST(MultiplexInputStream, AsyncWait_withEventTarget)"_ns,
|
|
[&]() { return cb->Called(); }));
|
|
ASSERT_TRUE(cb->Called());
|
|
}
|
|
|
|
// AsyncWait - without EventTarget - closureOnly
|
|
TEST(MultiplexInputStream, AsyncWait_withoutEventTarget_closureOnly)
|
|
{
|
|
nsCOMPtr<nsIInputStream> is = CreateStreamHelper();
|
|
|
|
nsCOMPtr<nsIAsyncInputStream> ais = do_QueryInterface(is);
|
|
ASSERT_TRUE(!!ais);
|
|
|
|
RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
|
|
|
|
ASSERT_EQ(NS_OK, ais->AsyncWait(cb, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0,
|
|
nullptr));
|
|
ASSERT_FALSE(cb->Called());
|
|
|
|
ais->CloseWithStatus(NS_ERROR_FAILURE);
|
|
ASSERT_TRUE(cb->Called());
|
|
}
|
|
|
|
// AsyncWait - with EventTarget - closureOnly
|
|
TEST(MultiplexInputStream, AsyncWait_withEventTarget_closureOnly)
|
|
{
|
|
nsCOMPtr<nsIInputStream> is = CreateStreamHelper();
|
|
|
|
nsCOMPtr<nsIAsyncInputStream> ais = do_QueryInterface(is);
|
|
ASSERT_TRUE(!!ais);
|
|
|
|
RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
|
|
nsCOMPtr<nsIThread> thread = do_GetCurrentThread();
|
|
|
|
ASSERT_EQ(NS_OK, ais->AsyncWait(cb, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0,
|
|
thread));
|
|
|
|
ASSERT_FALSE(cb->Called());
|
|
ais->CloseWithStatus(NS_ERROR_FAILURE);
|
|
ASSERT_FALSE(cb->Called());
|
|
|
|
// Eventually it is called.
|
|
MOZ_ALWAYS_TRUE(mozilla::SpinEventLoopUntil(
|
|
"xpcom:TEST(MultiplexInputStream, AsyncWait_withEventTarget_closureOnly)"_ns,
|
|
[&]() { return cb->Called(); }));
|
|
ASSERT_TRUE(cb->Called());
|
|
}
|
|
|
|
class ClosedStream final : public nsIInputStream {
|
|
public:
|
|
NS_DECL_THREADSAFE_ISUPPORTS
|
|
|
|
ClosedStream() = default;
|
|
|
|
NS_IMETHOD
|
|
Available(uint64_t* aLength) override { return NS_BASE_STREAM_CLOSED; }
|
|
|
|
NS_IMETHOD
|
|
StreamStatus() override { return NS_BASE_STREAM_CLOSED; }
|
|
|
|
NS_IMETHOD
|
|
Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override {
|
|
MOZ_CRASH("This should not be called!");
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
|
|
uint32_t* aResult) override {
|
|
return NS_ERROR_NOT_IMPLEMENTED;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
Close() override { return NS_OK; }
|
|
|
|
NS_IMETHOD
|
|
IsNonBlocking(bool* aNonBlocking) override {
|
|
*aNonBlocking = true;
|
|
return NS_OK;
|
|
}
|
|
|
|
private:
|
|
~ClosedStream() = default;
|
|
};
|
|
|
|
NS_IMPL_ISUPPORTS(ClosedStream, nsIInputStream)
|
|
|
|
class AsyncStream final : public nsIAsyncInputStream {
|
|
public:
|
|
NS_DECL_THREADSAFE_ISUPPORTS
|
|
|
|
explicit AsyncStream(int64_t aSize) : mState(eBlocked), mSize(aSize) {}
|
|
|
|
void Unblock() { mState = eUnblocked; }
|
|
|
|
NS_IMETHOD
|
|
Available(uint64_t* aLength) override {
|
|
*aLength = mState == eBlocked ? 0 : mSize;
|
|
return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_OK;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
StreamStatus() override {
|
|
return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_OK;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override {
|
|
MOZ_CRASH("This should not be called!");
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
|
|
uint32_t* aResult) override {
|
|
return NS_ERROR_NOT_IMPLEMENTED;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
Close() override {
|
|
mState = eClosed;
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
IsNonBlocking(bool* aNonBlocking) override {
|
|
*aNonBlocking = true;
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
|
|
uint32_t aRequestedCount, nsIEventTarget* aEventTarget) override {
|
|
MOZ_CRASH("This should not be called!");
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
CloseWithStatus(nsresult aStatus) override { return NS_OK; }
|
|
|
|
private:
|
|
~AsyncStream() = default;
|
|
|
|
enum { eBlocked, eUnblocked, eClosed } mState;
|
|
|
|
uint64_t mSize;
|
|
};
|
|
|
|
NS_IMPL_ISUPPORTS(AsyncStream, nsIInputStream, nsIAsyncInputStream)
|
|
|
|
class BlockingStream final : public nsIInputStream {
|
|
public:
|
|
NS_DECL_THREADSAFE_ISUPPORTS
|
|
|
|
BlockingStream() = default;
|
|
|
|
NS_IMETHOD
|
|
Available(uint64_t* aLength) override { return NS_BASE_STREAM_CLOSED; }
|
|
|
|
NS_IMETHOD
|
|
StreamStatus() override { return NS_BASE_STREAM_CLOSED; }
|
|
|
|
NS_IMETHOD
|
|
Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override {
|
|
// We are actually empty.
|
|
*aReadCount = 0;
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
|
|
uint32_t* aResult) override {
|
|
return NS_ERROR_NOT_IMPLEMENTED;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
Close() override { return NS_OK; }
|
|
|
|
NS_IMETHOD
|
|
IsNonBlocking(bool* aNonBlocking) override {
|
|
*aNonBlocking = false;
|
|
return NS_OK;
|
|
}
|
|
|
|
private:
|
|
~BlockingStream() = default;
|
|
};
|
|
|
|
NS_IMPL_ISUPPORTS(BlockingStream, nsIInputStream)
|
|
|
|
TEST(MultiplexInputStream, Available)
|
|
{
|
|
nsCOMPtr<nsIMultiplexInputStream> multiplexStream =
|
|
do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1");
|
|
|
|
nsCOMPtr<nsIInputStream> s = do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!!s);
|
|
|
|
nsCOMPtr<nsIAsyncInputStream> as = do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!as);
|
|
|
|
uint64_t length;
|
|
|
|
// The stream returns NS_BASE_STREAM_CLOSED if there are no substreams.
|
|
nsresult rv = s->Available(&length);
|
|
ASSERT_EQ(NS_BASE_STREAM_CLOSED, rv);
|
|
|
|
rv = multiplexStream->AppendStream(new ClosedStream());
|
|
ASSERT_EQ(NS_OK, rv);
|
|
|
|
uint64_t asyncSize = 2;
|
|
RefPtr<AsyncStream> asyncStream = new AsyncStream(2);
|
|
rv = multiplexStream->AppendStream(asyncStream);
|
|
ASSERT_EQ(NS_OK, rv);
|
|
|
|
nsCString buffer;
|
|
buffer.Assign("World!!!");
|
|
|
|
nsCOMPtr<nsIInputStream> stringStream;
|
|
rv = NS_NewCStringInputStream(getter_AddRefs(stringStream), buffer);
|
|
ASSERT_EQ(NS_OK, rv);
|
|
|
|
rv = multiplexStream->AppendStream(stringStream);
|
|
ASSERT_EQ(NS_OK, rv);
|
|
|
|
// Now we are async.
|
|
as = do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!!as);
|
|
|
|
// Available should skip the closed stream and return 0 because the
|
|
// asyncStream returns 0 and it's async.
|
|
rv = s->Available(&length);
|
|
ASSERT_EQ(NS_OK, rv);
|
|
ASSERT_EQ((uint64_t)0, length);
|
|
|
|
asyncStream->Unblock();
|
|
|
|
// Now we should return only the size of the async stream because we don't
|
|
// know when this is completed.
|
|
rv = s->Available(&length);
|
|
ASSERT_EQ(NS_OK, rv);
|
|
ASSERT_EQ(asyncSize, length);
|
|
|
|
asyncStream->Close();
|
|
|
|
rv = s->Available(&length);
|
|
ASSERT_EQ(NS_OK, rv);
|
|
ASSERT_EQ(buffer.Length(), length);
|
|
}
|
|
|
|
class NonBufferableStringStream final : public nsIInputStream {
|
|
nsCOMPtr<nsIInputStream> mStream;
|
|
|
|
public:
|
|
NS_DECL_THREADSAFE_ISUPPORTS
|
|
|
|
explicit NonBufferableStringStream(const nsACString& aBuffer) {
|
|
NS_NewCStringInputStream(getter_AddRefs(mStream), aBuffer);
|
|
}
|
|
|
|
NS_IMETHOD
|
|
Available(uint64_t* aLength) override { return mStream->Available(aLength); }
|
|
|
|
NS_IMETHOD
|
|
StreamStatus() override { return mStream->StreamStatus(); }
|
|
|
|
NS_IMETHOD
|
|
Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override {
|
|
return mStream->Read(aBuffer, aCount, aReadCount);
|
|
}
|
|
|
|
NS_IMETHOD
|
|
ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
|
|
uint32_t* aResult) override {
|
|
return NS_ERROR_NOT_IMPLEMENTED;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
Close() override { return mStream->Close(); }
|
|
|
|
NS_IMETHOD
|
|
IsNonBlocking(bool* aNonBlocking) override {
|
|
return mStream->IsNonBlocking(aNonBlocking);
|
|
}
|
|
|
|
private:
|
|
~NonBufferableStringStream() = default;
|
|
};
|
|
|
|
NS_IMPL_ISUPPORTS(NonBufferableStringStream, nsIInputStream)
|
|
|
|
TEST(MultiplexInputStream, Bufferable)
|
|
{
|
|
nsCOMPtr<nsIMultiplexInputStream> multiplexStream =
|
|
do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1");
|
|
|
|
nsCOMPtr<nsIInputStream> s = do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!!s);
|
|
|
|
nsCString buf1;
|
|
buf1.AssignLiteral("Hello ");
|
|
nsCOMPtr<nsIInputStream> inputStream1;
|
|
nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream1), buf1);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
nsCString buf2;
|
|
buf2.AssignLiteral("world");
|
|
nsCOMPtr<nsIInputStream> inputStream2 = new NonBufferableStringStream(buf2);
|
|
|
|
rv = multiplexStream->AppendStream(inputStream1);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
rv = multiplexStream->AppendStream(inputStream2);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
nsCOMPtr<nsIInputStream> stream(do_QueryInterface(multiplexStream));
|
|
ASSERT_TRUE(!!stream);
|
|
|
|
char buf3[1024];
|
|
uint32_t size = 0;
|
|
rv = stream->ReadSegments(NS_CopySegmentToBuffer, buf3, sizeof(buf3), &size);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
ASSERT_EQ(size, buf1.Length() + buf2.Length());
|
|
ASSERT_TRUE(!strncmp(buf3, "Hello world", size));
|
|
}
|
|
|
|
TEST(MultiplexInputStream, QILengthInputStream)
|
|
{
|
|
nsCString buf;
|
|
buf.AssignLiteral("Hello world");
|
|
|
|
nsCOMPtr<nsIMultiplexInputStream> multiplexStream =
|
|
do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1");
|
|
|
|
// nsMultiplexInputStream doesn't expose nsIInputStreamLength if there are
|
|
// no nsIInputStreamLength sub streams.
|
|
{
|
|
nsCOMPtr<nsIInputStream> inputStream;
|
|
nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream), buf);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
rv = multiplexStream->AppendStream(inputStream);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
nsCOMPtr<nsIInputStreamLength> fsis = do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!fsis);
|
|
|
|
nsCOMPtr<nsIAsyncInputStreamLength> afsis =
|
|
do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!afsis);
|
|
}
|
|
|
|
// nsMultiplexInputStream exposes nsIInputStreamLength if there is one or
|
|
// more nsIInputStreamLength sub streams.
|
|
{
|
|
RefPtr<testing::LengthInputStream> inputStream =
|
|
new testing::LengthInputStream(buf, true, false);
|
|
|
|
nsresult rv = multiplexStream->AppendStream(inputStream);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
nsCOMPtr<nsIInputStreamLength> fsis = do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!!fsis);
|
|
|
|
nsCOMPtr<nsIAsyncInputStreamLength> afsis =
|
|
do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!afsis);
|
|
}
|
|
|
|
// nsMultiplexInputStream exposes nsIAsyncInputStreamLength if there is one
|
|
// or more nsIAsyncInputStreamLength sub streams.
|
|
{
|
|
RefPtr<testing::LengthInputStream> inputStream =
|
|
new testing::LengthInputStream(buf, true, true);
|
|
|
|
nsresult rv = multiplexStream->AppendStream(inputStream);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
nsCOMPtr<nsIInputStreamLength> fsis = do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!!fsis);
|
|
|
|
nsCOMPtr<nsIAsyncInputStreamLength> afsis =
|
|
do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!!afsis);
|
|
}
|
|
}
|
|
|
|
TEST(MultiplexInputStream, LengthInputStream)
|
|
{
|
|
nsCOMPtr<nsIMultiplexInputStream> multiplexStream =
|
|
do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1");
|
|
|
|
// First stream is a a simple one.
|
|
nsCString buf;
|
|
buf.AssignLiteral("Hello world");
|
|
|
|
nsCOMPtr<nsIInputStream> inputStream;
|
|
nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream), buf);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
rv = multiplexStream->AppendStream(inputStream);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
// A LengthInputStream, non-async.
|
|
RefPtr<testing::LengthInputStream> lengthStream =
|
|
new testing::LengthInputStream(buf, true, false);
|
|
|
|
rv = multiplexStream->AppendStream(lengthStream);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
nsCOMPtr<nsIInputStreamLength> fsis = do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!!fsis);
|
|
|
|
// Size is the sum of the 2 streams.
|
|
int64_t length;
|
|
rv = fsis->Length(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(int64_t(buf.Length() * 2), length);
|
|
|
|
// An async LengthInputStream.
|
|
RefPtr<testing::LengthInputStream> asyncLengthStream =
|
|
new testing::LengthInputStream(buf, true, true,
|
|
NS_BASE_STREAM_WOULD_BLOCK);
|
|
|
|
rv = multiplexStream->AppendStream(asyncLengthStream);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
nsCOMPtr<nsIAsyncInputStreamLength> afsis =
|
|
do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!!afsis);
|
|
|
|
// Now it would block.
|
|
rv = fsis->Length(&length);
|
|
ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
|
|
|
|
// Let's read the size async.
|
|
RefPtr<testing::LengthCallback> callback = new testing::LengthCallback();
|
|
rv = afsis->AsyncLengthWait(callback, GetCurrentSerialEventTarget());
|
|
ASSERT_EQ(NS_OK, rv);
|
|
|
|
MOZ_ALWAYS_TRUE(SpinEventLoopUntil(
|
|
"xpcom:TEST(MultiplexInputStream, LengthInputStream) 1"_ns,
|
|
[&]() { return callback->Called(); }));
|
|
ASSERT_EQ(int64_t(buf.Length() * 3), callback->Size());
|
|
|
|
// Now a negative stream
|
|
lengthStream = new testing::LengthInputStream(buf, true, false, NS_OK, true);
|
|
|
|
rv = multiplexStream->AppendStream(lengthStream);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
rv = fsis->Length(&length);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
ASSERT_EQ(-1, length);
|
|
|
|
// Another async LengthInputStream.
|
|
asyncLengthStream = new testing::LengthInputStream(
|
|
buf, true, true, NS_BASE_STREAM_WOULD_BLOCK);
|
|
|
|
rv = multiplexStream->AppendStream(asyncLengthStream);
|
|
ASSERT_NS_SUCCEEDED(rv);
|
|
|
|
afsis = do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(!!afsis);
|
|
|
|
// Let's read the size async.
|
|
RefPtr<testing::LengthCallback> callback1 = new testing::LengthCallback();
|
|
rv = afsis->AsyncLengthWait(callback1, GetCurrentSerialEventTarget());
|
|
ASSERT_EQ(NS_OK, rv);
|
|
|
|
RefPtr<testing::LengthCallback> callback2 = new testing::LengthCallback();
|
|
rv = afsis->AsyncLengthWait(callback2, GetCurrentSerialEventTarget());
|
|
ASSERT_EQ(NS_OK, rv);
|
|
|
|
MOZ_ALWAYS_TRUE(SpinEventLoopUntil(
|
|
"xpcom:TEST(MultiplexInputStream, LengthInputStream) 2"_ns,
|
|
[&]() { return callback2->Called(); }));
|
|
ASSERT_FALSE(callback1->Called());
|
|
ASSERT_TRUE(callback2->Called());
|
|
}
|
|
|
|
void TestMultiplexStreamReadWhileWaiting(nsIAsyncInputStream* pipeIn,
|
|
nsIAsyncOutputStream* pipeOut) {
|
|
// We had an issue where a stream which was read while a message was in-flight
|
|
// to report the stream was ready, meaning that the stream reported 0 bytes
|
|
// available when checked in the MultiplexInputStream's callback, and was
|
|
// skipped over.
|
|
|
|
nsCOMPtr<nsIThread> mainThread = NS_GetCurrentThread();
|
|
|
|
nsCOMPtr<nsIMultiplexInputStream> multiplexStream =
|
|
do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1");
|
|
ASSERT_NS_SUCCEEDED(multiplexStream->AppendStream(pipeIn));
|
|
|
|
nsCOMPtr<nsIInputStream> stringStream;
|
|
ASSERT_TRUE(NS_SUCCEEDED(
|
|
NS_NewCStringInputStream(getter_AddRefs(stringStream), "xxxx\0"_ns)));
|
|
ASSERT_NS_SUCCEEDED(multiplexStream->AppendStream(stringStream));
|
|
|
|
nsCOMPtr<nsIAsyncInputStream> asyncMultiplex =
|
|
do_QueryInterface(multiplexStream);
|
|
ASSERT_TRUE(asyncMultiplex);
|
|
|
|
RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
|
|
ASSERT_NS_SUCCEEDED(asyncMultiplex->AsyncWait(cb, 0, 0, mainThread));
|
|
EXPECT_FALSE(cb->Called());
|
|
|
|
NS_ProcessPendingEvents(mainThread);
|
|
EXPECT_FALSE(cb->Called());
|
|
|
|
uint64_t available;
|
|
ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available));
|
|
EXPECT_EQ(available, 0u);
|
|
|
|
// Write some data to the pipe, which should wake up the async wait message to
|
|
// be delivered.
|
|
char toWrite[] = "1234";
|
|
uint32_t written;
|
|
ASSERT_NS_SUCCEEDED(pipeOut->Write(toWrite, sizeof(toWrite), &written));
|
|
EXPECT_EQ(written, sizeof(toWrite));
|
|
EXPECT_FALSE(cb->Called());
|
|
ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available));
|
|
EXPECT_EQ(available, sizeof(toWrite));
|
|
|
|
// Read that data from the stream
|
|
char toRead[sizeof(toWrite)];
|
|
uint32_t read;
|
|
ASSERT_TRUE(
|
|
NS_SUCCEEDED(asyncMultiplex->Read(toRead, sizeof(toRead), &read)));
|
|
EXPECT_EQ(read, sizeof(toRead));
|
|
EXPECT_STREQ(toRead, toWrite);
|
|
EXPECT_FALSE(cb->Called());
|
|
ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available));
|
|
EXPECT_EQ(available, 0u);
|
|
|
|
// The multiplex stream will have detected the read and prevented the callback
|
|
// from having been called yet.
|
|
NS_ProcessPendingEvents(mainThread);
|
|
EXPECT_FALSE(cb->Called());
|
|
ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available));
|
|
EXPECT_EQ(available, 0u);
|
|
|
|
// Write more data and close, then make sure we can read everything else in
|
|
// the stream.
|
|
char toWrite2[] = "56789";
|
|
ASSERT_TRUE(
|
|
NS_SUCCEEDED(pipeOut->Write(toWrite2, sizeof(toWrite2), &written)));
|
|
EXPECT_EQ(written, sizeof(toWrite2));
|
|
EXPECT_FALSE(cb->Called());
|
|
ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available));
|
|
EXPECT_EQ(available, sizeof(toWrite2));
|
|
|
|
ASSERT_NS_SUCCEEDED(pipeOut->Close());
|
|
ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available));
|
|
// XXX: Theoretically if the multiplex stream could detect it, we could report
|
|
// `sizeof(toWrite2) + 4` because the stream is complete, but there's no way
|
|
// for the multiplex stream to know.
|
|
EXPECT_EQ(available, sizeof(toWrite2));
|
|
|
|
NS_ProcessPendingEvents(mainThread);
|
|
EXPECT_TRUE(cb->Called());
|
|
|
|
// Read that final bit of data and make sure we read it.
|
|
char toRead2[sizeof(toWrite2)];
|
|
ASSERT_TRUE(
|
|
NS_SUCCEEDED(asyncMultiplex->Read(toRead2, sizeof(toRead2), &read)));
|
|
EXPECT_EQ(read, sizeof(toRead2));
|
|
EXPECT_STREQ(toRead2, toWrite2);
|
|
ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available));
|
|
EXPECT_EQ(available, 5u);
|
|
|
|
// Read the extra data as well.
|
|
char extraRead[5];
|
|
ASSERT_TRUE(
|
|
NS_SUCCEEDED(asyncMultiplex->Read(extraRead, sizeof(extraRead), &read)));
|
|
EXPECT_EQ(read, sizeof(extraRead));
|
|
EXPECT_STREQ(extraRead, "xxxx");
|
|
ASSERT_NS_SUCCEEDED(asyncMultiplex->Available(&available));
|
|
EXPECT_EQ(available, 0u);
|
|
}
|
|
|
|
TEST(MultiplexInputStream, ReadWhileWaiting_nsPipe)
|
|
{
|
|
nsCOMPtr<nsIAsyncInputStream> pipeIn;
|
|
nsCOMPtr<nsIAsyncOutputStream> pipeOut;
|
|
NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(pipeOut), true, true);
|
|
TestMultiplexStreamReadWhileWaiting(pipeIn, pipeOut);
|
|
}
|
|
|
|
TEST(MultiplexInputStream, ReadWhileWaiting_DataPipe)
|
|
{
|
|
RefPtr<mozilla::ipc::DataPipeReceiver> pipeIn;
|
|
RefPtr<mozilla::ipc::DataPipeSender> pipeOut;
|
|
ASSERT_TRUE(NS_SUCCEEDED(mozilla::ipc::NewDataPipe(
|
|
mozilla::ipc::kDefaultDataPipeCapacity, getter_AddRefs(pipeOut),
|
|
getter_AddRefs(pipeIn))));
|
|
TestMultiplexStreamReadWhileWaiting(pipeIn, pipeOut);
|
|
}
|