Bug 925623 - Support delivery of WebSocket events off-main-thread in WebSocketChannel r=jduell

This commit is contained in:
Steve Workman
2014-03-27 13:58:19 -07:00
parent 59e69d471e
commit 03596e3798
8 changed files with 382 additions and 119 deletions

View File

@@ -34,6 +34,7 @@
#include "nsIProtocolHandler.h"
#include "nsIRandomGenerator.h"
#include "nsISocketTransport.h"
#include "nsThreadUtils.h"
#include "nsAutoPtr.h"
#include "nsNetCID.h"
@@ -45,6 +46,7 @@
#include "nsAlgorithm.h"
#include "nsProxyRelease.h"
#include "nsNetUtil.h"
#include "mozilla/StaticMutex.h"
#include "mozilla/Telemetry.h"
#include "mozilla/TimeStamp.h"
@@ -69,7 +71,7 @@ using namespace mozilla;
namespace mozilla {
namespace net {
NS_IMPL_ISUPPORTS12(WebSocketChannel,
NS_IMPL_ISUPPORTS13(WebSocketChannel,
nsIWebSocketChannel,
nsIHttpUpgradeListener,
nsIRequestObserver,
@@ -81,7 +83,8 @@ NS_IMPL_ISUPPORTS12(WebSocketChannel,
nsIDNSListener,
nsIProtocolProxyCallback,
nsIInterfaceRequestor,
nsIChannelEventSink)
nsIChannelEventSink,
nsIThreadRetargetableRequest)
// We implement RFC 6455, which uses Sec-WebSocket-Version: 13 on the wire.
#define SEC_WEBSOCKET_VERSION "13"
@@ -317,81 +320,88 @@ private:
class nsWSAdmissionManager
{
public:
nsWSAdmissionManager() : mSessionCount(0)
static void Init()
{
MOZ_COUNT_CTOR(nsWSAdmissionManager);
StaticMutexAutoLock lock(sLock);
if (!sManager) {
sManager = new nsWSAdmissionManager();
}
}
class nsOpenConn
static void Shutdown()
{
public:
nsOpenConn(nsCString &addr, WebSocketChannel *channel)
: mAddress(addr), mChannel(channel) { MOZ_COUNT_CTOR(nsOpenConn); }
~nsOpenConn() { MOZ_COUNT_DTOR(nsOpenConn); }
nsCString mAddress;
WebSocketChannel *mChannel;
};
~nsWSAdmissionManager()
{
MOZ_COUNT_DTOR(nsWSAdmissionManager);
for (uint32_t i = 0; i < mQueue.Length(); i++)
delete mQueue[i];
StaticMutexAutoLock lock(sLock);
delete sManager;
sManager = nullptr;
}
// Determine if we will open connection immediately (returns true), or
// delay/queue the connection (returns false)
void ConditionallyConnect(WebSocketChannel *ws)
static void ConditionallyConnect(WebSocketChannel *ws)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
NS_ABORT_IF_FALSE(ws->mConnecting == NOT_CONNECTING, "opening state");
StaticMutexAutoLock lock(sLock);
if (!sManager) {
return;
}
// If there is already another WS channel connecting to this IP address,
// defer BeginOpen and mark as waiting in queue.
bool found = (IndexOf(ws->mAddress) >= 0);
bool found = (sManager->IndexOf(ws->mAddress) >= 0);
// Always add ourselves to queue, even if we'll connect immediately
nsOpenConn *newdata = new nsOpenConn(ws->mAddress, ws);
mQueue.AppendElement(newdata);
sManager->mQueue.AppendElement(newdata);
if (found) {
ws->mConnecting = CONNECTING_QUEUED;
} else {
mFailures.DelayOrBegin(ws);
sManager->mFailures.DelayOrBegin(ws);
}
}
void OnConnected(WebSocketChannel *aChannel)
static void OnConnected(WebSocketChannel *aChannel)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
NS_ABORT_IF_FALSE(aChannel->mConnecting == CONNECTING_IN_PROGRESS,
"Channel completed connect, but not connecting?");
StaticMutexAutoLock lock(sLock);
if (!sManager) {
return;
}
aChannel->mConnecting = NOT_CONNECTING;
// Remove from queue
RemoveFromQueue(aChannel);
sManager->RemoveFromQueue(aChannel);
// Connection succeeded, so stop keeping track of any previous failures
mFailures.Remove(aChannel->mAddress, aChannel->mPort);
sManager->mFailures.Remove(aChannel->mAddress, aChannel->mPort);
// Check for queued connections to same host.
// Note: still need to check for failures, since next websocket with same
// host may have different port
ConnectNext(aChannel->mAddress);
sManager->ConnectNext(aChannel->mAddress);
}
// Called every time a websocket channel ends its session (including going away
// w/o ever successfully creating a connection)
void OnStopSession(WebSocketChannel *aChannel, nsresult aReason)
static void OnStopSession(WebSocketChannel *aChannel, nsresult aReason)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
StaticMutexAutoLock lock(sLock);
if (!sManager) {
return;
}
if (NS_FAILED(aReason)) {
// Have we seen this failure before?
FailDelay *knownFailure = mFailures.Lookup(aChannel->mAddress,
aChannel->mPort);
FailDelay *knownFailure = sManager->mFailures.Lookup(aChannel->mAddress,
aChannel->mPort);
if (knownFailure) {
if (aReason == NS_ERROR_NOT_CONNECTED) {
// Don't count close() before connection as a network error
@@ -406,7 +416,7 @@ public:
// new connection failure: record it.
LOG(("WebSocket: connection to %s, %d failed: [this=%p]",
aChannel->mAddress.get(), (int)aChannel->mPort, aChannel));
mFailures.Add(aChannel->mAddress, aChannel->mPort);
sManager->mFailures.Add(aChannel->mAddress, aChannel->mPort);
}
}
@@ -417,16 +427,67 @@ public:
aChannel->mScriptCloseCode == CLOSE_GOING_AWAY,
"websocket closed while connecting w/o failing?");
RemoveFromQueue(aChannel);
sManager->RemoveFromQueue(aChannel);
bool wasNotQueued = (aChannel->mConnecting != CONNECTING_QUEUED);
aChannel->mConnecting = NOT_CONNECTING;
if (wasNotQueued) {
ConnectNext(aChannel->mAddress);
sManager->ConnectNext(aChannel->mAddress);
}
}
}
static void IncrementSessionCount()
{
StaticMutexAutoLock lock(sLock);
if (!sManager) {
return;
}
sManager->mSessionCount++;
}
static void DecrementSessionCount()
{
StaticMutexAutoLock lock(sLock);
if (!sManager) {
return;
}
sManager->mSessionCount--;
}
static void GetSessionCount(int32_t &aSessionCount)
{
StaticMutexAutoLock lock(sLock);
if (!sManager) {
return;
}
aSessionCount = sManager->mSessionCount;
}
private:
nsWSAdmissionManager() : mSessionCount(0)
{
MOZ_COUNT_CTOR(nsWSAdmissionManager);
}
~nsWSAdmissionManager()
{
MOZ_COUNT_DTOR(nsWSAdmissionManager);
for (uint32_t i = 0; i < mQueue.Length(); i++)
delete mQueue[i];
}
class nsOpenConn
{
public:
nsOpenConn(nsCString &addr, WebSocketChannel *channel)
: mAddress(addr), mChannel(channel) { MOZ_COUNT_CTOR(nsOpenConn); }
~nsOpenConn() { MOZ_COUNT_DTOR(nsOpenConn); }
nsCString mAddress;
WebSocketChannel *mChannel;
};
void ConnectNext(nsCString &hostName)
{
int32_t index = IndexOf(hostName);
@@ -441,23 +502,6 @@ public:
}
}
void IncrementSessionCount()
{
mSessionCount++;
}
void DecrementSessionCount()
{
mSessionCount--;
}
int32_t SessionCount()
{
return mSessionCount;
}
private:
void RemoveFromQueue(WebSocketChannel *aChannel)
{
int32_t index = IndexOf(aChannel);
@@ -499,9 +543,13 @@ private:
nsTArray<nsOpenConn *> mQueue;
FailDelayManager mFailures;
static nsWSAdmissionManager *sManager;
static StaticMutex sLock;
};
static nsWSAdmissionManager *sWebSocketAdmissions = nullptr;
nsWSAdmissionManager *nsWSAdmissionManager::sManager;
StaticMutex nsWSAdmissionManager::sLock;
//-----------------------------------------------------------------------------
// CallOnMessageAvailable
@@ -521,6 +569,8 @@ public:
NS_IMETHOD Run()
{
MOZ_ASSERT(NS_GetCurrentThread() == mChannel->mTargetThread);
if (mLen < 0)
mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData);
else
@@ -553,9 +603,9 @@ public:
NS_IMETHOD Run()
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
MOZ_ASSERT(NS_GetCurrentThread() == mChannel->mTargetThread);
sWebSocketAdmissions->OnStopSession(mChannel, mReason);
nsWSAdmissionManager::OnStopSession(mChannel, mReason);
if (mChannel->mListener) {
mChannel->mListener->OnStop(mChannel->mContext, mReason);
@@ -591,6 +641,8 @@ public:
NS_IMETHOD Run()
{
MOZ_ASSERT(NS_GetCurrentThread() == mChannel->mTargetThread);
mChannel->mListener->OnServerClose(mChannel->mContext, mCode, mReason);
return NS_OK;
}
@@ -620,6 +672,8 @@ public:
NS_IMETHOD Run()
{
MOZ_ASSERT(NS_GetCurrentThread() == mChannel->mTargetThread);
LOG(("WebSocketChannel::CallAcknowledge: Size %u\n", mSize));
mChannel->mListener->OnAcknowledge(mChannel->mContext, mSize);
return NS_OK;
@@ -981,8 +1035,7 @@ WebSocketChannel::WebSocketChannel() :
LOG(("WebSocketChannel::WebSocketChannel() %p\n", this));
if (!sWebSocketAdmissions)
sWebSocketAdmissions = new nsWSAdmissionManager();
nsWSAdmissionManager::Init();
mFramePtr = mBuffer = static_cast<uint8_t *>(moz_xmalloc(mBufferSize));
@@ -1053,8 +1106,7 @@ WebSocketChannel::~WebSocketChannel()
void
WebSocketChannel::Shutdown()
{
delete sWebSocketAdmissions;
sWebSocketAdmissions = nullptr;
nsWSAdmissionManager::Shutdown();
}
void
@@ -1359,15 +1411,11 @@ WebSocketChannel::ProcessInput(uint8_t *buffer, uint32_t count)
return NS_ERROR_CANNOT_CONVERT_DATA;
}
NS_DispatchToMainThread(new CallOnMessageAvailable(this, utf8Data, -1));
nsresult rv;
mTargetThread->Dispatch(new CallOnMessageAvailable(this, utf8Data, -1),
NS_DISPATCH_NORMAL);
if (mConnectionLogService && !mPrivateBrowsing) {
nsAutoCString host;
rv = mURI->GetHostPort(host);
if (NS_SUCCEEDED(rv)) {
mConnectionLogService->NewMsgReceived(host, mSerial, count);
LOG(("Added new msg received for %s",host.get()));
}
mConnectionLogService->NewMsgReceived(mHost, mSerial, count);
LOG(("Added new msg received for %s", mHost.get()));
}
}
} else if (opcode & kControlFrameMask) {
@@ -1411,8 +1459,9 @@ WebSocketChannel::ProcessInput(uint8_t *buffer, uint32_t count)
mCloseTimer = nullptr;
}
if (mListener) {
NS_DispatchToMainThread(new CallOnServerClose(this, mServerCloseCode,
mServerCloseReason));
mTargetThread->Dispatch(new CallOnServerClose(this, mServerCloseCode,
mServerCloseReason),
NS_DISPATCH_NORMAL);
}
if (mClientClosed)
@@ -1447,17 +1496,13 @@ WebSocketChannel::ProcessInput(uint8_t *buffer, uint32_t count)
LOG(("WebSocketChannel:: binary frame received\n"));
if (mListener) {
nsCString binaryData((const char *)payload, payloadLength);
NS_DispatchToMainThread(new CallOnMessageAvailable(this, binaryData,
payloadLength));
mTargetThread->Dispatch(new CallOnMessageAvailable(this, binaryData,
payloadLength),
NS_DISPATCH_NORMAL);
// To add the header to 'Networking Dashboard' log
nsresult rv;
if (mConnectionLogService && !mPrivateBrowsing) {
nsAutoCString host;
rv = mURI->GetHostPort(host);
if (NS_SUCCEEDED(rv)) {
mConnectionLogService->NewMsgReceived(host, mSerial, count);
LOG(("Added new received msg for %s",host.get()));
}
mConnectionLogService->NewMsgReceived(mHost, mSerial, count);
LOG(("Added new received msg for %s", mHost.get()));
}
}
} else if (opcode != kContinuation) {
@@ -1870,12 +1915,8 @@ WebSocketChannel::CleanupConnection()
mTransport = nullptr;
}
nsresult rv;
if (mConnectionLogService && !mPrivateBrowsing) {
nsAutoCString host;
rv = mURI->GetHostPort(host);
if (NS_SUCCEEDED(rv))
mConnectionLogService->RemoveHost(host, mSerial);
mConnectionLogService->RemoveHost(mHost, mSerial);
}
DecrementSessionCount();
@@ -1940,8 +1981,10 @@ WebSocketChannel::StopSession(nsresult reason)
} while (NS_SUCCEEDED(rv) && count > 0 && total < 32000);
}
if (!mTCPClosed && mTransport && sWebSocketAdmissions &&
sWebSocketAdmissions->SessionCount() < kLingeringCloseThreshold) {
int32_t sessionCount = kLingeringCloseThreshold;
nsWSAdmissionManager::GetSessionCount(sessionCount);
if (!mTCPClosed && mTransport && sessionCount < kLingeringCloseThreshold) {
// 7.1.1 says that the client SHOULD wait for the server to close the TCP
// connection. This is so we can reuse port numbers before 2 MSL expires,
@@ -1980,7 +2023,8 @@ WebSocketChannel::StopSession(nsresult reason)
if (!mCalledOnStop) {
mCalledOnStop = 1;
NS_DispatchToMainThread(new CallOnStop(this, reason));
mTargetThread->Dispatch(new CallOnStop(this, reason),
NS_DISPATCH_NORMAL);
}
return;
@@ -2039,7 +2083,7 @@ void
WebSocketChannel::IncrementSessionCount()
{
if (!mIncrementedSessionCount) {
sWebSocketAdmissions->IncrementSessionCount();
nsWSAdmissionManager::IncrementSessionCount();
mIncrementedSessionCount = 1;
}
}
@@ -2052,7 +2096,7 @@ WebSocketChannel::DecrementSessionCount()
// atomic, and mIncrementedSessionCount/mDecrementedSessionCount are set at
// times when they'll never be a race condition for checking/setting them.
if (mIncrementedSessionCount && !mDecrementedSessionCount) {
sWebSocketAdmissions->DecrementSessionCount();
nsWSAdmissionManager::DecrementSessionCount();
mDecrementedSessionCount = 1;
}
}
@@ -2266,7 +2310,7 @@ WebSocketChannel::StartWebsocketData()
// We're now done CONNECTING, which means we can now open another,
// perhaps parallel, connection to the same host if one
// is pending
sWebSocketAdmissions->OnConnected(this);
nsWSAdmissionManager::OnConnected(this);
LOG(("WebSocketChannel::StartWebsocketData Notifying Listener %p\n",
mListener.get()));
@@ -2354,7 +2398,7 @@ WebSocketChannel::OnLookupComplete(nsICancelable *aRequest,
}
LOG(("WebSocket OnLookupComplete: Proceeding to ConditionallyConnect\n"));
sWebSocketAdmissions->ConditionallyConnect(this);
nsWSAdmissionManager::ConditionallyConnect(this);
return NS_OK;
}
@@ -2511,7 +2555,7 @@ WebSocketChannel::AsyncOnChannelRedirect(
// Mark old channel as successfully connected so we'll clear any FailDelay
// associated with the old URI. Note: no need to also call OnStopSession:
// it's a no-op for successful, already-connected channels.
sWebSocketAdmissions->OnConnected(this);
nsWSAdmissionManager::OnConnected(this);
// ApplyForAdmission as if we were starting from fresh...
mAddress.Truncate();
@@ -2629,6 +2673,11 @@ WebSocketChannel::AsyncOpen(nsIURI *aURI,
nsresult rv;
// Ensure target thread is set.
if (!mTargetThread) {
mTargetThread = do_GetMainThread();
}
mSocketThread = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
if (NS_FAILED(rv)) {
NS_WARNING("unable to continue without socket transport service");
@@ -2688,16 +2737,17 @@ WebSocketChannel::AsyncOpen(nsIURI *aURI,
}
}
if (sWebSocketAdmissions)
int32_t sessionCount = -1;
nsWSAdmissionManager::GetSessionCount(sessionCount);
if (sessionCount >= 0) {
LOG(("WebSocketChannel::AsyncOpen %p sessionCount=%d max=%d\n", this,
sWebSocketAdmissions->SessionCount(), mMaxConcurrentConnections));
sessionCount, mMaxConcurrentConnections));
}
if (sWebSocketAdmissions &&
sWebSocketAdmissions->SessionCount() >= mMaxConcurrentConnections)
{
if (sessionCount >= mMaxConcurrentConnections) {
LOG(("WebSocketChannel: max concurrency %d exceeded (%d)",
mMaxConcurrentConnections,
sWebSocketAdmissions->SessionCount()));
sessionCount));
// WebSocket connections are expected to be long lived, so return
// an error here instead of queueing
@@ -2706,6 +2756,7 @@ WebSocketChannel::AsyncOpen(nsIURI *aURI,
mOriginalURI = aURI;
mURI = mOriginalURI;
mURI->GetHostPort(mHost);
mOrigin = aOrigin;
nsCOMPtr<nsIURI> localURI;
@@ -2756,11 +2807,8 @@ WebSocketChannel::AsyncOpen(nsIURI *aURI,
mPrivateBrowsing = NS_UsePrivateBrowsing(localChannel);
if (mConnectionLogService && !mPrivateBrowsing) {
nsAutoCString host;
rv = mURI->GetHostPort(host);
if (NS_SUCCEEDED(rv)) {
mConnectionLogService->AddHost(host, mSerial, BaseWebSocketChannel::mEncrypted);
}
mConnectionLogService->AddHost(mHost, mSerial,
BaseWebSocketChannel::mEncrypted);
}
rv = ApplyForAdmission();
@@ -2844,7 +2892,7 @@ nsresult
WebSocketChannel::SendMsgCommon(const nsACString *aMsg, bool aIsBinary,
uint32_t aLength, nsIInputStream *aStream)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
NS_ABORT_IF_FALSE(NS_GetCurrentThread() == mTargetThread, "not target thread");
if (mRequestedClose) {
LOG(("WebSocketChannel:: Error: send when closed\n"));
@@ -2862,14 +2910,9 @@ WebSocketChannel::SendMsgCommon(const nsACString *aMsg, bool aIsBinary,
return NS_ERROR_FILE_TOO_BIG;
}
nsresult rv;
if (mConnectionLogService && !mPrivateBrowsing) {
nsAutoCString host;
rv = mURI->GetHostPort(host);
if (NS_SUCCEEDED(rv)) {
mConnectionLogService->NewMsgSent(host, mSerial, aLength);
LOG(("Added new msg sent for %s",host.get()));
}
mConnectionLogService->NewMsgSent(mHost, mSerial, aLength);
LOG(("Added new msg sent for %s", mHost.get()));
}
return mSocketThread->Dispatch(
@@ -3196,7 +3239,7 @@ WebSocketChannel::OnOutputStreamReady(nsIAsyncOutputStream *aStream)
CountSentBytes(amtSent);
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
mSocketOut->AsyncWait(this, 0, 0, nullptr);
mSocketOut->AsyncWait(this, 0, 0, mSocketThread);
return NS_OK;
}
@@ -3217,8 +3260,9 @@ WebSocketChannel::OnOutputStreamReady(nsIAsyncOutputStream *aStream)
} else {
if (amtSent == toSend) {
if (!mStopped) {
NS_DispatchToMainThread(new CallAcknowledge(this,
mCurrentOut->Length()));
mTargetThread->Dispatch(new CallAcknowledge(this,
mCurrentOut->Length()),
NS_DISPATCH_NORMAL);
}
DeleteCurrentOutGoingMessage();
PrimeNewOutgoingMessage();