summaryrefslogtreecommitdiffstats
path: root/netwerk/base/ThrottleQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'netwerk/base/ThrottleQueue.cpp')
-rw-r--r--netwerk/base/ThrottleQueue.cpp410
1 files changed, 410 insertions, 0 deletions
diff --git a/netwerk/base/ThrottleQueue.cpp b/netwerk/base/ThrottleQueue.cpp
new file mode 100644
index 0000000000..4313a6ecb3
--- /dev/null
+++ b/netwerk/base/ThrottleQueue.cpp
@@ -0,0 +1,410 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "ThrottleQueue.h"
+#include "mozilla/net/InputChannelThrottleQueueParent.h"
+#include "nsISeekableStream.h"
+#include "nsIAsyncInputStream.h"
+#include "nsIOService.h"
+#include "nsSocketTransportService2.h"
+#include "nsStreamUtils.h"
+#include "nsNetUtil.h"
+
+namespace mozilla {
+namespace net {
+
+//-----------------------------------------------------------------------------
+
+class ThrottleInputStream final : public nsIAsyncInputStream,
+ public nsISeekableStream {
+ public:
+ ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
+
+ NS_DECL_THREADSAFE_ISUPPORTS
+ NS_DECL_NSIINPUTSTREAM
+ NS_DECL_NSISEEKABLESTREAM
+ NS_DECL_NSITELLABLESTREAM
+ NS_DECL_NSIASYNCINPUTSTREAM
+
+ void AllowInput();
+
+ private:
+ ~ThrottleInputStream();
+
+ nsCOMPtr<nsIInputStream> mStream;
+ RefPtr<ThrottleQueue> mQueue;
+ nsresult mClosedStatus;
+
+ nsCOMPtr<nsIInputStreamCallback> mCallback;
+ nsCOMPtr<nsIEventTarget> mEventTarget;
+};
+
+NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream,
+ nsITellableStream, nsISeekableStream)
+
+ThrottleInputStream::ThrottleInputStream(nsIInputStream* aStream,
+ ThrottleQueue* aQueue)
+ : mStream(aStream), mQueue(aQueue), mClosedStatus(NS_OK) {
+ MOZ_ASSERT(aQueue != nullptr);
+}
+
+ThrottleInputStream::~ThrottleInputStream() { Close(); }
+
+NS_IMETHODIMP
+ThrottleInputStream::Close() {
+ if (NS_FAILED(mClosedStatus)) {
+ return mClosedStatus;
+ }
+
+ if (mQueue) {
+ mQueue->DequeueStream(this);
+ mQueue = nullptr;
+ mClosedStatus = NS_BASE_STREAM_CLOSED;
+ }
+ return mStream->Close();
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Available(uint64_t* aResult) {
+ if (NS_FAILED(mClosedStatus)) {
+ return mClosedStatus;
+ }
+
+ return mStream->Available(aResult);
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::StreamStatus() {
+ if (NS_FAILED(mClosedStatus)) {
+ return mClosedStatus;
+ }
+
+ return mStream->StreamStatus();
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) {
+ if (NS_FAILED(mClosedStatus)) {
+ return mClosedStatus;
+ }
+
+ uint32_t realCount;
+ nsresult rv = mQueue->Available(aCount, &realCount);
+ if (NS_FAILED(rv)) {
+ return rv;
+ }
+
+ if (realCount == 0) {
+ return NS_BASE_STREAM_WOULD_BLOCK;
+ }
+
+ rv = mStream->Read(aBuf, realCount, aResult);
+ if (NS_SUCCEEDED(rv) && *aResult > 0) {
+ mQueue->RecordRead(*aResult);
+ }
+ return rv;
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
+ uint32_t aCount, uint32_t* aResult) {
+ if (NS_FAILED(mClosedStatus)) {
+ return mClosedStatus;
+ }
+
+ uint32_t realCount;
+ nsresult rv = mQueue->Available(aCount, &realCount);
+ if (NS_FAILED(rv)) {
+ return rv;
+ }
+ MOZ_ASSERT(realCount <= aCount);
+
+ if (realCount == 0) {
+ return NS_BASE_STREAM_WOULD_BLOCK;
+ }
+
+ rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
+ if (NS_SUCCEEDED(rv) && *aResult > 0) {
+ mQueue->RecordRead(*aResult);
+ }
+ return rv;
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::IsNonBlocking(bool* aNonBlocking) {
+ *aNonBlocking = true;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset) {
+ if (NS_FAILED(mClosedStatus)) {
+ return mClosedStatus;
+ }
+
+ nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
+ if (!sstream) {
+ return NS_ERROR_FAILURE;
+ }
+
+ return sstream->Seek(aWhence, aOffset);
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Tell(int64_t* aResult) {
+ if (NS_FAILED(mClosedStatus)) {
+ return mClosedStatus;
+ }
+
+ nsCOMPtr<nsITellableStream> sstream = do_QueryInterface(mStream);
+ if (!sstream) {
+ return NS_ERROR_FAILURE;
+ }
+
+ return sstream->Tell(aResult);
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::SetEOF() {
+ if (NS_FAILED(mClosedStatus)) {
+ return mClosedStatus;
+ }
+
+ nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
+ if (!sstream) {
+ return NS_ERROR_FAILURE;
+ }
+
+ return sstream->SetEOF();
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::CloseWithStatus(nsresult aStatus) {
+ if (NS_FAILED(mClosedStatus)) {
+ // Already closed, ignore.
+ return NS_OK;
+ }
+ if (NS_SUCCEEDED(aStatus)) {
+ aStatus = NS_BASE_STREAM_CLOSED;
+ }
+
+ mClosedStatus = Close();
+ if (NS_SUCCEEDED(mClosedStatus)) {
+ mClosedStatus = aStatus;
+ }
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
+ uint32_t aFlags, uint32_t aRequestedCount,
+ nsIEventTarget* aEventTarget) {
+ if (aFlags != 0) {
+ return NS_ERROR_ILLEGAL_VALUE;
+ }
+
+ mCallback = aCallback;
+ mEventTarget = aEventTarget;
+ if (mCallback) {
+ mQueue->QueueStream(this);
+ } else {
+ mQueue->DequeueStream(this);
+ }
+ return NS_OK;
+}
+
+void ThrottleInputStream::AllowInput() {
+ MOZ_ASSERT(mCallback);
+ nsCOMPtr<nsIInputStreamCallback> callbackEvent = NS_NewInputStreamReadyEvent(
+ "ThrottleInputStream::AllowInput", mCallback, mEventTarget);
+ mCallback = nullptr;
+ mEventTarget = nullptr;
+ callbackEvent->OnInputStreamReady(this);
+}
+
+//-----------------------------------------------------------------------------
+
+// static
+already_AddRefed<nsIInputChannelThrottleQueue> ThrottleQueue::Create() {
+ MOZ_ASSERT(XRE_IsParentProcess());
+
+ nsCOMPtr<nsIInputChannelThrottleQueue> tq;
+ if (nsIOService::UseSocketProcess()) {
+ tq = new InputChannelThrottleQueueParent();
+ } else {
+ tq = new ThrottleQueue();
+ }
+
+ return tq.forget();
+}
+
+NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback,
+ nsINamed)
+
+ThrottleQueue::ThrottleQueue()
+
+{
+ nsresult rv;
+ nsCOMPtr<nsIEventTarget> sts;
+ nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
+ if (NS_SUCCEEDED(rv)) {
+ sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
+ }
+ if (NS_SUCCEEDED(rv)) mTimer = NS_NewTimer(sts);
+}
+
+ThrottleQueue::~ThrottleQueue() {
+ if (mTimer && mTimerArmed) {
+ mTimer->Cancel();
+ }
+ mTimer = nullptr;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::RecordRead(uint32_t aBytesRead) {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+ ThrottleEntry entry;
+ entry.mTime = TimeStamp::Now();
+ entry.mBytesRead = aBytesRead;
+ mReadEvents.AppendElement(entry);
+ mBytesProcessed += aBytesRead;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable) {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+ TimeStamp now = TimeStamp::Now();
+ TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
+ size_t i;
+
+ // Remove all stale events.
+ for (i = 0; i < mReadEvents.Length(); ++i) {
+ if (mReadEvents[i].mTime >= oneSecondAgo) {
+ break;
+ }
+ }
+ mReadEvents.RemoveElementsAt(0, i);
+
+ uint32_t totalBytes = 0;
+ for (i = 0; i < mReadEvents.Length(); ++i) {
+ totalBytes += mReadEvents[i].mBytesRead;
+ }
+
+ uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
+ double prob = static_cast<double>(rand()) / RAND_MAX;
+ uint32_t thisSliceBytes =
+ mMeanBytesPerSecond - spread + static_cast<uint32_t>(2 * spread * prob);
+
+ if (totalBytes >= thisSliceBytes) {
+ *aAvailable = 0;
+ } else {
+ *aAvailable = std::min(thisSliceBytes, aRemaining);
+ }
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond) {
+ // Can be called on any thread.
+ if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 ||
+ aMaxBytesPerSecond < aMeanBytesPerSecond) {
+ return NS_ERROR_ILLEGAL_VALUE;
+ }
+
+ mMeanBytesPerSecond = aMeanBytesPerSecond;
+ mMaxBytesPerSecond = aMaxBytesPerSecond;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::BytesProcessed(uint64_t* aResult) {
+ *aResult = mBytesProcessed;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::WrapStream(nsIInputStream* aInputStream,
+ nsIAsyncInputStream** aResult) {
+ nsCOMPtr<nsIAsyncInputStream> result =
+ new ThrottleInputStream(aInputStream, this);
+ result.forget(aResult);
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::Notify(nsITimer* aTimer) {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+ // A notified reader may need to push itself back on the queue.
+ // Swap out the list of readers so that this works properly.
+ nsTArray<RefPtr<ThrottleInputStream>> events = std::move(mAsyncEvents);
+
+ // Optimistically notify all the waiting readers, and then let them
+ // requeue if there isn't enough bandwidth.
+ for (size_t i = 0; i < events.Length(); ++i) {
+ events[i]->AllowInput();
+ }
+
+ mTimerArmed = false;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::GetName(nsACString& aName) {
+ aName.AssignLiteral("net::ThrottleQueue");
+ return NS_OK;
+}
+
+void ThrottleQueue::QueueStream(ThrottleInputStream* aStream) {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+ if (mAsyncEvents.IndexOf(aStream) ==
+ nsTArray<RefPtr<mozilla::net::ThrottleInputStream>>::NoIndex) {
+ mAsyncEvents.AppendElement(aStream);
+
+ if (!mTimerArmed) {
+ uint32_t ms = 1000;
+ if (mReadEvents.Length() > 0) {
+ TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
+ TimeStamp now = TimeStamp::Now();
+
+ if (t > now) {
+ ms = static_cast<uint32_t>((t - now).ToMilliseconds());
+ } else {
+ ms = 1;
+ }
+ }
+
+ if (NS_SUCCEEDED(
+ mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
+ mTimerArmed = true;
+ }
+ }
+ }
+}
+
+void ThrottleQueue::DequeueStream(ThrottleInputStream* aStream) {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+ mAsyncEvents.RemoveElement(aStream);
+}
+
+NS_IMETHODIMP
+ThrottleQueue::GetMeanBytesPerSecond(uint32_t* aMeanBytesPerSecond) {
+ NS_ENSURE_ARG(aMeanBytesPerSecond);
+
+ *aMeanBytesPerSecond = mMeanBytesPerSecond;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::GetMaxBytesPerSecond(uint32_t* aMaxBytesPerSecond) {
+ NS_ENSURE_ARG(aMaxBytesPerSecond);
+
+ *aMaxBytesPerSecond = mMaxBytesPerSecond;
+ return NS_OK;
+}
+
+} // namespace net
+} // namespace mozilla