Bug 1135424 - Switch the MDSM to a task queue. r=mattwoodrow

This commit is contained in:
Bobby Holley
2015-03-10 09:59:30 -07:00
parent d173d75a25
commit df2a904e12
11 changed files with 201 additions and 394 deletions

View File

@@ -13,7 +13,7 @@
#include <stdint.h>
#include "MediaDecoderStateMachine.h"
#include "MediaDecoderStateMachineScheduler.h"
#include "MediaTimer.h"
#include "AudioSink.h"
#include "nsTArray.h"
#include "MediaDecoder.h"
@@ -201,9 +201,9 @@ MediaDecoderStateMachine::MediaDecoderStateMachine(MediaDecoder* aDecoder,
MediaDecoderReader* aReader,
bool aRealTime) :
mDecoder(aDecoder),
mScheduler(new MediaDecoderStateMachineScheduler(
aDecoder->GetReentrantMonitor(),
&MediaDecoderStateMachine::TimeoutExpired, this, aRealTime)),
mRealTime(aRealTime),
mDispatchedStateMachine(false),
mDelayedScheduler(this),
mState(DECODER_STATE_DECODING_NONE),
mPlayDuration(0),
mStartTime(-1),
@@ -248,6 +248,11 @@ MediaDecoderStateMachine::MediaDecoderStateMachine(MediaDecoder* aDecoder,
MOZ_COUNT_CTOR(MediaDecoderStateMachine);
NS_ASSERTION(NS_IsMainThread(), "Should be on main thread.");
// Set up our task queue.
RefPtr<SharedThreadPool> threadPool(
SharedThreadPool::Get(NS_LITERAL_CSTRING("Media State Machine"), 1));
mTaskQueue = new MediaTaskQueue(threadPool.forget());
static bool sPrefCacheInit = false;
if (!sPrefCacheInit) {
sPrefCacheInit = true;
@@ -387,7 +392,7 @@ void MediaDecoderStateMachine::SendStreamData()
{
MOZ_ASSERT(OnStateMachineThread(), "Should be on state machine thread");
AssertCurrentThreadInMonitor();
MOZ_ASSERT(!mAudioSink, "Should've been stopped in CallRunStateMachine()");
MOZ_ASSERT(!mAudioSink, "Should've been stopped in RunStateMachine()");
DecodedStreamData* stream = mDecoder->GetDecodedStream();
@@ -405,7 +410,7 @@ void MediaDecoderStateMachine::SendStreamData()
mediaStream->AddAudioTrack(audioTrackId, mInfo.mAudio.mRate, 0, audio,
SourceMediaStream::ADDTRACK_QUEUED);
stream->mStream->DispatchWhenNotEnoughBuffered(audioTrackId,
GetStateMachineThread(), GetWakeDecoderRunnable());
TaskQueue(), GetWakeDecoderRunnable());
stream->mNextAudioTime = mStartTime + stream->mInitialTime;
}
if (mInfo.HasVideo()) {
@@ -414,7 +419,7 @@ void MediaDecoderStateMachine::SendStreamData()
mediaStream->AddTrack(videoTrackId, 0, video,
SourceMediaStream::ADDTRACK_QUEUED);
stream->mStream->DispatchWhenNotEnoughBuffered(videoTrackId,
GetStateMachineThread(), GetWakeDecoderRunnable());
TaskQueue(), GetWakeDecoderRunnable());
// TODO: We can't initialize |mNextVideoTime| until |mStartTime|
// is set. This is a good indication that DecodedStreamData is in
@@ -570,7 +575,7 @@ bool MediaDecoderStateMachine::HaveEnoughDecodedAudio(int64_t aAmpleAudioUSecs)
return false;
}
stream->mStream->DispatchWhenNotEnoughBuffered(audioTrackId,
GetStateMachineThread(), GetWakeDecoderRunnable());
TaskQueue(), GetWakeDecoderRunnable());
}
return true;
@@ -593,7 +598,7 @@ bool MediaDecoderStateMachine::HaveEnoughDecodedVideo()
return false;
}
stream->mStream->DispatchWhenNotEnoughBuffered(videoTrackId,
GetStateMachineThread(), GetWakeDecoderRunnable());
TaskQueue(), GetWakeDecoderRunnable());
}
return true;
@@ -855,7 +860,7 @@ MediaDecoderStateMachine::OnNotDecoded(MediaData::Type aType,
"Readers that send WAITING_FOR_DATA need to implement WaitForData");
WaitRequestRef(aType).Begin(ProxyMediaCall(DecodeTaskQueue(), mReader.get(), __func__,
&MediaDecoderReader::WaitForData, aType)
->RefableThen(mScheduler.get(), __func__, this,
->RefableThen(TaskQueue(), __func__, this,
&MediaDecoderStateMachine::OnWaitForDataResolved,
&MediaDecoderStateMachine::OnWaitForDataRejected));
return;
@@ -1084,7 +1089,7 @@ MediaDecoderStateMachine::CheckIfSeekComplete()
mDecodeToSeekTarget = false;
RefPtr<nsIRunnable> task(
NS_NewRunnableMethod(this, &MediaDecoderStateMachine::SeekCompleted));
nsresult rv = GetStateMachineThread()->Dispatch(task, NS_DISPATCH_NORMAL);
nsresult rv = TaskQueue()->Dispatch(task);
if (NS_FAILED(rv)) {
DecodeError();
}
@@ -1146,10 +1151,7 @@ nsresult MediaDecoderStateMachine::Init(MediaDecoderStateMachine* aCloneDonor)
cloneReader = aCloneDonor->mReader;
}
nsresult rv = mScheduler->Init();
NS_ENSURE_SUCCESS(rv, rv);
rv = mReader->Init(cloneReader);
nsresult rv = mReader->Init(cloneReader);
NS_ENSURE_SUCCESS(rv, rv);
return NS_OK;
@@ -1343,7 +1345,7 @@ int64_t MediaDecoderStateMachine::GetCurrentTimeUs() const
}
bool MediaDecoderStateMachine::IsRealTime() const {
return mScheduler->IsRealTime();
return mRealTime;
}
int64_t MediaDecoderStateMachine::GetDuration()
@@ -1513,9 +1515,8 @@ void MediaDecoderStateMachine::Shutdown()
// Change state before issuing shutdown request to threads so those
// threads can start exiting cleanly during the Shutdown call.
DECODER_LOG("Changed state to SHUTDOWN");
ScheduleStateMachine();
SetState(DECODER_STATE_SHUTDOWN);
mScheduler->ScheduleAndShutdown();
if (mAudioSink) {
mAudioSink->PrepareToShutdown();
}
@@ -1774,7 +1775,7 @@ MediaDecoderStateMachine::EnqueueDecodeFirstFrameTask()
RefPtr<nsIRunnable> task(
NS_NewRunnableMethod(this, &MediaDecoderStateMachine::CallDecodeFirstFrame));
nsresult rv = GetStateMachineThread()->Dispatch(task, NS_DISPATCH_NORMAL);
nsresult rv = TaskQueue()->Dispatch(task);
NS_ENSURE_SUCCESS(rv, rv);
return NS_OK;
}
@@ -1921,7 +1922,7 @@ MediaDecoderStateMachine::InitiateSeek()
mSeekRequest.Begin(ProxyMediaCall(DecodeTaskQueue(), mReader.get(), __func__,
&MediaDecoderReader::Seek, mCurrentSeek.mTarget.mTime,
GetEndTime())
->RefableThen(mScheduler.get(), __func__, this,
->RefableThen(TaskQueue(), __func__, this,
&MediaDecoderStateMachine::OnSeekCompleted,
&MediaDecoderStateMachine::OnSeekFailed));
}
@@ -1969,7 +1970,7 @@ MediaDecoderStateMachine::EnsureAudioDecodeTaskQueued()
mAudioDataRequest.Begin(ProxyMediaCall(DecodeTaskQueue(), mReader.get(),
__func__, &MediaDecoderReader::RequestAudioData)
->RefableThen(mScheduler.get(), __func__, this,
->RefableThen(TaskQueue(), __func__, this,
&MediaDecoderStateMachine::OnAudioDecoded,
&MediaDecoderStateMachine::OnAudioNotDecoded));
@@ -2029,7 +2030,7 @@ MediaDecoderStateMachine::EnsureVideoDecodeTaskQueued()
mVideoDataRequest.Begin(ProxyMediaCall(DecodeTaskQueue(), mReader.get(), __func__,
&MediaDecoderReader::RequestVideoData,
skipToNextKeyFrame, currentTime)
->RefableThen(mScheduler.get(), __func__, this,
->RefableThen(TaskQueue(), __func__, this,
&MediaDecoderStateMachine::OnVideoDecoded,
&MediaDecoderStateMachine::OnVideoNotDecoded));
return NS_OK;
@@ -2167,9 +2168,9 @@ MediaDecoderStateMachine::DecodeError()
// Change state to shutdown before sending error report to MediaDecoder
// and the HTMLMediaElement, so that our pipeline can start exiting
// cleanly during the sync dispatch below.
DECODER_WARN("Decode error, changed state to SHUTDOWN due to error");
ScheduleStateMachine();
SetState(DECODER_STATE_SHUTDOWN);
mScheduler->ScheduleAndShutdown();
DECODER_WARN("Decode error, changed state to SHUTDOWN due to error");
mDecoder->GetReentrantMonitor().NotifyAll();
// Dispatch the event to call DecodeError synchronously. This ensures
@@ -2322,12 +2323,12 @@ MediaDecoderStateMachine::DecodeFirstFrame()
if (HasAudio()) {
RefPtr<nsIRunnable> decodeTask(
NS_NewRunnableMethod(this, &MediaDecoderStateMachine::DispatchAudioDecodeTaskIfNeeded));
AudioQueue().AddPopListener(decodeTask, GetStateMachineThread());
AudioQueue().AddPopListener(decodeTask, TaskQueue());
}
if (HasVideo()) {
RefPtr<nsIRunnable> decodeTask(
NS_NewRunnableMethod(this, &MediaDecoderStateMachine::DispatchVideoDecodeTaskIfNeeded));
VideoQueue().AddPopListener(decodeTask, GetStateMachineThread());
VideoQueue().AddPopListener(decodeTask, TaskQueue());
}
if (IsRealTime()) {
@@ -2345,7 +2346,7 @@ MediaDecoderStateMachine::DecodeFirstFrame()
if (HasAudio()) {
mAudioDataRequest.Begin(ProxyMediaCall(DecodeTaskQueue(), mReader.get(),
__func__, &MediaDecoderReader::RequestAudioData)
->RefableThen(mScheduler.get(), __func__, this,
->RefableThen(TaskQueue(), __func__, this,
&MediaDecoderStateMachine::OnAudioDecoded,
&MediaDecoderStateMachine::OnAudioNotDecoded));
}
@@ -2353,7 +2354,7 @@ MediaDecoderStateMachine::DecodeFirstFrame()
mVideoDecodeStartTime = TimeStamp::Now();
mVideoDataRequest.Begin(ProxyMediaCall(DecodeTaskQueue(), mReader.get(),
__func__, &MediaDecoderReader::RequestVideoData, false, int64_t(0))
->RefableThen(mScheduler.get(), __func__, this,
->RefableThen(TaskQueue(), __func__, this,
&MediaDecoderStateMachine::OnVideoDecoded,
&MediaDecoderStateMachine::OnVideoNotDecoded));
}
@@ -2560,6 +2561,8 @@ public:
NS_ASSERTION(NS_IsMainThread(), "Must be on main thread.");
MOZ_ASSERT(mStateMachine);
MOZ_ASSERT(mDecoder);
mStateMachine->TaskQueue()->BeginShutdown();
mStateMachine->TaskQueue()->AwaitShutdownAndIdle();
mStateMachine->BreakCycles();
mDecoder->BreakCycles();
mStateMachine = nullptr;
@@ -2594,7 +2597,7 @@ void
MediaDecoderStateMachine::ShutdownReader()
{
MOZ_ASSERT(OnDecodeThread());
mReader->Shutdown()->Then(mScheduler.get(), __func__, this,
mReader->Shutdown()->Then(TaskQueue(), __func__, this,
&MediaDecoderStateMachine::FinishShutdown,
&MediaDecoderStateMachine::FinishShutdown);
}
@@ -2630,15 +2633,24 @@ MediaDecoderStateMachine::FinishShutdown()
// finished and released its monitor/references. That event then will
// dispatch an event to the main thread to release the decoder and
// state machine.
GetStateMachineThread()->Dispatch(
new nsDispatchDisposeEvent(mDecoder, this), NS_DISPATCH_NORMAL);
RefPtr<nsIRunnable> task = new nsDispatchDisposeEvent(mDecoder, this);
TaskQueue()->Dispatch(task);
DECODER_LOG("Dispose Event Dispatched");
}
nsresult MediaDecoderStateMachine::RunStateMachine()
{
AssertCurrentThreadInMonitor();
MOZ_ASSERT(OnStateMachineThread());
ReentrantMonitorAutoEnter mon(mDecoder->GetReentrantMonitor());
mDelayedScheduler.Reset(); // Must happen on state machine thread.
mDispatchedStateMachine = false;
// If audio is being captured, stop the audio sink if it's running
if (mAudioCaptured) {
StopAudioThread();
}
MediaResource* resource = mDecoder->GetResource();
NS_ENSURE_TRUE(resource, NS_ERROR_NULL_POINTER);
@@ -2739,7 +2751,7 @@ nsresult MediaDecoderStateMachine::RunStateMachine()
DECODER_LOG("Buffering: wait %ds, timeout in %.3lfs %s",
mBufferingWait, mBufferingWait - elapsed.ToSeconds(),
(mQuickBuffering ? "(quick exit)" : ""));
ScheduleStateMachine(USECS_PER_S);
ScheduleStateMachineIn(USECS_PER_S);
return NS_OK;
}
} else if (OutOfDecodedAudio() || OutOfDecodedVideo()) {
@@ -3063,7 +3075,7 @@ void MediaDecoderStateMachine::AdvanceFrame()
// Don't go straight back to the state machine loop since that might
// cause us to start decoding again and we could flip-flop between
// decoding and quick-buffering.
ScheduleStateMachine(USECS_PER_S);
ScheduleStateMachineIn(USECS_PER_S);
return;
}
}
@@ -3125,7 +3137,12 @@ void MediaDecoderStateMachine::AdvanceFrame()
// ready state. Post an update to do so.
UpdateReadyState();
ScheduleStateMachine(remainingTime / mPlaybackRate);
int64_t delay = remainingTime / mPlaybackRate;
if (delay > 0) {
ScheduleStateMachineIn(delay);
} else {
ScheduleStateMachine();
}
}
nsresult
@@ -3371,33 +3388,59 @@ void MediaDecoderStateMachine::SetPlayStartTime(const TimeStamp& aTimeStamp)
}
}
nsresult MediaDecoderStateMachine::CallRunStateMachine()
{
AssertCurrentThreadInMonitor();
NS_ASSERTION(OnStateMachineThread(), "Should be on state machine thread.");
// If audio is being captured, stop the audio sink if it's running
if (mAudioCaptured) {
StopAudioThread();
}
return RunStateMachine();
}
nsresult MediaDecoderStateMachine::TimeoutExpired(void* aClosure)
{
MediaDecoderStateMachine* p = static_cast<MediaDecoderStateMachine*>(aClosure);
return p->CallRunStateMachine();
}
void MediaDecoderStateMachine::ScheduleStateMachineWithLockAndWakeDecoder() {
ReentrantMonitorAutoEnter mon(mDecoder->GetReentrantMonitor());
DispatchAudioDecodeTaskIfNeeded();
DispatchVideoDecodeTaskIfNeeded();
}
nsresult MediaDecoderStateMachine::ScheduleStateMachine(int64_t aUsecs) {
return mScheduler->Schedule(aUsecs);
void
MediaDecoderStateMachine::ScheduleStateMachine() {
AssertCurrentThreadInMonitor();
if (mState == DECODER_STATE_SHUTDOWN) {
NS_WARNING("Refusing to schedule shutdown state machine");
return;
}
if (mDispatchedStateMachine) {
return;
}
mDispatchedStateMachine = true;
RefPtr<nsIRunnable> task =
NS_NewRunnableMethod(this, &MediaDecoderStateMachine::RunStateMachine);
nsresult rv = TaskQueue()->Dispatch(task);
MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
(void) rv;
}
void
MediaDecoderStateMachine::ScheduleStateMachineIn(int64_t aMicroseconds)
{
AssertCurrentThreadInMonitor();
MOZ_ASSERT(OnStateMachineThread()); // mDelayedScheduler.Ensure() may Disconnect()
// the promise, which must happen on the state
// machine thread.
MOZ_ASSERT(aMicroseconds > 0);
if (mState == DECODER_STATE_SHUTDOWN) {
NS_WARNING("Refusing to schedule shutdown state machine");
return;
}
if (mDispatchedStateMachine) {
return;
}
// Real-time weirdness.
if (IsRealTime()) {
aMicroseconds = std::min(aMicroseconds, int64_t(40000));
}
TimeStamp now = TimeStamp::Now();
TimeStamp target = now + TimeDuration::FromMicroseconds(aMicroseconds);
SAMPLE_LOG("Scheduling state machine for %lf ms from now", (target - now).ToMilliseconds());
mDelayedScheduler.Ensure(target);
}
bool MediaDecoderStateMachine::OnDecodeThread() const
@@ -3407,17 +3450,12 @@ bool MediaDecoderStateMachine::OnDecodeThread() const
bool MediaDecoderStateMachine::OnStateMachineThread() const
{
return mScheduler->OnStateMachineThread();
}
nsIEventTarget* MediaDecoderStateMachine::GetStateMachineThread() const
{
return mScheduler->GetStateMachineThread();
return TaskQueue()->IsCurrentThreadIn();
}
bool MediaDecoderStateMachine::IsStateMachineScheduled() const
{
return mScheduler->IsScheduled();
return mDispatchedStateMachine || mDelayedScheduler.IsScheduled();
}
void MediaDecoderStateMachine::SetPlaybackRate(double aPlaybackRate)