diff options
Diffstat (limited to 'dom/media/webcodecs/DecoderTemplate.cpp')
-rw-r--r-- | dom/media/webcodecs/DecoderTemplate.cpp | 102 |
1 files changed, 58 insertions, 44 deletions
diff --git a/dom/media/webcodecs/DecoderTemplate.cpp b/dom/media/webcodecs/DecoderTemplate.cpp index 2fc2471a24..896f83b352 100644 --- a/dom/media/webcodecs/DecoderTemplate.cpp +++ b/dom/media/webcodecs/DecoderTemplate.cpp @@ -85,28 +85,26 @@ DecoderTemplate<DecoderType>::ConfigureMessage::Create( template <typename DecoderType> DecoderTemplate<DecoderType>::DecodeMessage::DecodeMessage( - Id aId, ConfigId aConfigId, UniquePtr<InputTypeInternal>&& aData) + SeqId aSeqId, ConfigId aConfigId, UniquePtr<InputTypeInternal>&& aData) : ControlMessage( - nsPrintfCString("decode #%zu (config #%d)", aId, aConfigId)), - mId(aId), + nsPrintfCString("decode #%zu (config #%d)", aSeqId, aConfigId)), + mSeqId(aSeqId), mData(std::move(aData)) {} -template <typename DecoderType> -DecoderTemplate<DecoderType>::FlushMessage::FlushMessage(Id aId, - ConfigId aConfigId, - Promise* aPromise) - : ControlMessage( - nsPrintfCString("flush #%zu (config #%d)", aId, aConfigId)), - mId(aId), - mPromise(aPromise) {} +static int64_t GenerateUniqueId() { + // This needs to be atomic since this can run on the main thread or worker + // thread. + static std::atomic<int64_t> sNextId = 0; + return ++sNextId; +} template <typename DecoderType> -void DecoderTemplate<DecoderType>::FlushMessage::RejectPromiseIfAny( - const nsresult& aReason) { - if (mPromise) { - mPromise->MaybeReject(aReason); - } -} +DecoderTemplate<DecoderType>::FlushMessage::FlushMessage(SeqId aSeqId, + ConfigId aConfigId) + : ControlMessage( + nsPrintfCString("flush #%zu (config #%d)", aSeqId, aConfigId)), + mSeqId(aSeqId), + mUniqueId(GenerateUniqueId()) {} /* * Below are DecoderTemplate implementation @@ -221,10 +219,16 @@ already_AddRefed<Promise> DecoderTemplate<DecoderType>::Flush( mKeyChunkRequired = true; - mControlMessageQueue.emplace(UniquePtr<ControlMessage>( - new FlushMessage(++mFlushCounter, mLatestConfigureId, p))); - LOG("%s %p enqueues %s", DecoderType::Name.get(), this, - mControlMessageQueue.back()->ToString().get()); + auto msg = UniquePtr<ControlMessage>( + new FlushMessage(++mFlushCounter, mLatestConfigureId)); + const auto flushPromiseId = msg->AsFlushMessage()->mUniqueId; + MOZ_ASSERT(!mPendingFlushPromises.Contains(flushPromiseId)); + mPendingFlushPromises.Insert(flushPromiseId, p); + + mControlMessageQueue.emplace(std::move(msg)); + + LOG("%s %p enqueues %s, with unique id %" PRId64, DecoderType::Name.get(), + this, mControlMessageQueue.back()->ToString().get(), flushPromiseId); ProcessControlMessageQueue(); return p.forget(); } @@ -264,7 +268,7 @@ Result<Ok, nsresult> DecoderTemplate<DecoderType>::ResetInternal( mDecodeCounter = 0; mFlushCounter = 0; - CancelPendingControlMessages(aResult); + CancelPendingControlMessagesAndFlushPromises(aResult); DestroyDecoderAgentIfAny(); if (mDecodeQueueSize > 0) { @@ -390,7 +394,7 @@ void DecoderTemplate<DecoderType>::ProcessControlMessageQueue() { } template <typename DecoderType> -void DecoderTemplate<DecoderType>::CancelPendingControlMessages( +void DecoderTemplate<DecoderType>::CancelPendingControlMessagesAndFlushPromises( const nsresult& aResult) { AssertIsOnOwningThread(); @@ -399,11 +403,6 @@ void DecoderTemplate<DecoderType>::CancelPendingControlMessages( LOG("%s %p cancels current %s", DecoderType::Name.get(), this, mProcessingMessage->ToString().get()); mProcessingMessage->Cancel(); - - if (FlushMessage* flush = mProcessingMessage->AsFlushMessage()) { - flush->RejectPromiseIfAny(aResult); - } - mProcessingMessage.reset(); } @@ -411,14 +410,18 @@ void DecoderTemplate<DecoderType>::CancelPendingControlMessages( while (!mControlMessageQueue.empty()) { LOG("%s %p cancels pending %s", DecoderType::Name.get(), this, mControlMessageQueue.front()->ToString().get()); - MOZ_ASSERT(!mControlMessageQueue.front()->IsProcessing()); - if (FlushMessage* flush = mControlMessageQueue.front()->AsFlushMessage()) { - flush->RejectPromiseIfAny(aResult); - } - mControlMessageQueue.pop(); } + + // If there are pending flush promises, reject them. + mPendingFlushPromises.ForEach( + [&](const int64_t& id, const RefPtr<Promise>& p) { + LOG("%s %p, reject the promise for flush %" PRId64 " (unique id)", + DecoderType::Name.get(), this, id); + p->MaybeReject(aResult); + }); + mPendingFlushPromises.Clear(); } template <typename DecoderType> @@ -565,7 +568,6 @@ MessageProcessedResult DecoderTemplate<DecoderType>::ProcessDecodeMessage( mProcessingMessage.reset(); QueueATask("Error during decode", [self = RefPtr{this}]() MOZ_CAN_RUN_SCRIPT_BOUNDARY { - MOZ_ASSERT(self->mState != CodecState::Closed); self->CloseInternal(NS_ERROR_DOM_ENCODING_NOT_SUPPORTED_ERR); }); return MessageProcessedResult::Processed; @@ -696,6 +698,8 @@ MessageProcessedResult DecoderTemplate<DecoderType>::ProcessFlushMessage( msg->Complete(); + const auto flushPromiseId = msg->mUniqueId; + // If flush failed, it means decoder fails to decode the data // sent before, so we treat it like decode error. We reject // the promise first and then queue a task to close @@ -705,14 +709,15 @@ MessageProcessedResult DecoderTemplate<DecoderType>::ProcessFlushMessage( LOGE("%s %p, DecoderAgent #%d failed to flush: %s", DecoderType::Name.get(), self.get(), id, error.Description().get()); - RefPtr<Promise> promise = msg->TakePromise(); // Reject with an EncodingError instead of the error we got // above. self->QueueATask( "Error during flush runnable", - [self = RefPtr{this}, promise]() MOZ_CAN_RUN_SCRIPT_BOUNDARY { - promise->MaybeReject( - NS_ERROR_DOM_ENCODING_NOT_SUPPORTED_ERR); + [self = RefPtr{this}]() MOZ_CAN_RUN_SCRIPT_BOUNDARY { + // If Reset() was invoked before this task executes, the + // promise in mPendingFlushPromises is handled there. + // Otherwise, the promise is going to be rejected by + // CloseInternal() below. self->mProcessingMessage.reset(); MOZ_ASSERT(self->mState != CodecState::Closed); self->CloseInternal( @@ -733,14 +738,23 @@ MessageProcessedResult DecoderTemplate<DecoderType>::ProcessFlushMessage( msgStr.get()); } - RefPtr<Promise> promise = msg->TakePromise(); self->QueueATask( "Flush: output decoding data task", - [self = RefPtr{self}, promise, data = std::move(data)]() - MOZ_CAN_RUN_SCRIPT_BOUNDARY { - self->OutputDecodedData(std::move(data)); - promise->MaybeResolveWithUndefined(); - }); + [self = RefPtr{self}, data = std::move(data), + flushPromiseId]() MOZ_CAN_RUN_SCRIPT_BOUNDARY { + self->OutputDecodedData(std::move(data)); + // If Reset() was invoked before this task executes, or + // during the output callback above in the execution of this + // task, the promise in mPendingFlushPromises is handled + // there. Otherwise, the promise is resolved here. + if (Maybe<RefPtr<Promise>> p = + self->mPendingFlushPromises.Take(flushPromiseId)) { + LOG("%s %p, resolving the promise for flush %" PRId64 + " (unique id)", + DecoderType::Name.get(), self.get(), flushPromiseId); + p.value()->MaybeResolveWithUndefined(); + } + }); self->mProcessingMessage.reset(); self->ProcessControlMessageQueue(); }) |