bug 738914 - orange websocket pipeline::isdone from main thread r=jduell

This commit is contained in:
Patrick McManus
2012-04-25 22:02:12 -04:00
parent 606fd8b9fa
commit 4a1bdd64df
5 changed files with 115 additions and 18 deletions

View File

@@ -4498,18 +4498,8 @@ nsHttpChannel::OnStopRequest(nsIRequest *request, nsISupports *ctxt, nsresult st
if (mUpgradeProtocolCallback && stickyConn &&
mResponseHead && mResponseHead->Status() == 101) {
nsCOMPtr<nsISocketTransport> socketTransport;
nsCOMPtr<nsIAsyncInputStream> socketIn;
nsCOMPtr<nsIAsyncOutputStream> socketOut;
nsresult rv;
rv = stickyConn->TakeTransport(getter_AddRefs(socketTransport),
getter_AddRefs(socketIn),
getter_AddRefs(socketOut));
if (NS_SUCCEEDED(rv))
mUpgradeProtocolCallback->OnTransportAvailable(socketTransport,
socketIn,
socketOut);
gHttpHandler->ConnMgr()->CompleteUpgrade(stickyConn,
mUpgradeProtocolCallback);
}
}

View File

@@ -40,6 +40,7 @@
#include "nsHttpConnection.h"
#include "nsHttpPipeline.h"
#include "nsHttpHandler.h"
#include "nsIHttpChannelInternal.h"
#include "nsNetCID.h"
#include "nsCOMPtr.h"
#include "nsNetUtil.h"
@@ -398,6 +399,32 @@ nsHttpConnectionMgr::ReclaimConnection(nsHttpConnection *conn)
return rv;
}
// A structure used to marshall 2 pointers across the various necessary
// threads to complete an HTTP upgrade.
class nsCompleteUpgradeData
{
public:
nsCompleteUpgradeData(nsAHttpConnection *aConn,
nsIHttpUpgradeListener *aListener)
: mConn(aConn), mUpgradeListener(aListener) {}
nsRefPtr<nsAHttpConnection> mConn;
nsCOMPtr<nsIHttpUpgradeListener> mUpgradeListener;
};
nsresult
nsHttpConnectionMgr::CompleteUpgrade(nsAHttpConnection *aConn,
nsIHttpUpgradeListener *aUpgradeListener)
{
nsCompleteUpgradeData *data =
new nsCompleteUpgradeData(aConn, aUpgradeListener);
nsresult rv;
rv = PostEvent(&nsHttpConnectionMgr::OnMsgCompleteUpgrade, 0, data);
if (NS_FAILED(rv))
delete data;
return rv;
}
nsresult
nsHttpConnectionMgr::UpdateParam(nsParamName name, PRUint16 value)
{
@@ -1996,6 +2023,31 @@ nsHttpConnectionMgr::OnMsgReclaimConnection(PRInt32, void *param)
NS_RELEASE(conn);
}
void
nsHttpConnectionMgr::OnMsgCompleteUpgrade(PRInt32, void *param)
{
NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "wrong thread");
nsCompleteUpgradeData *data = (nsCompleteUpgradeData *) param;
LOG(("nsHttpConnectionMgr::OnMsgCompleteUpgrade "
"this=%p conn=%p listener=%p\n", this, data->mConn.get(),
data->mUpgradeListener.get()));
nsCOMPtr<nsISocketTransport> socketTransport;
nsCOMPtr<nsIAsyncInputStream> socketIn;
nsCOMPtr<nsIAsyncOutputStream> socketOut;
nsresult rv;
rv = data->mConn->TakeTransport(getter_AddRefs(socketTransport),
getter_AddRefs(socketIn),
getter_AddRefs(socketOut));
if (NS_SUCCEEDED(rv))
data->mUpgradeListener->OnTransportAvailable(socketTransport,
socketIn,
socketOut);
delete data;
}
void
nsHttpConnectionMgr::OnMsgUpdateParam(PRInt32, void *param)
{

View File

@@ -58,6 +58,8 @@
class nsHttpPipeline;
class nsIHttpUpgradeListener;
//-----------------------------------------------------------------------------
class nsHttpConnectionMgr : public nsIObserver
@@ -147,6 +149,13 @@ public:
// it will be closed.
nsresult ReclaimConnection(nsHttpConnection *conn);
// called by the main thread to execute the taketransport() logic on the
// socket thread after a 101 response has been received and the socket
// needs to be transferred to an expectant upgrade listener such as
// websockets.
nsresult CompleteUpgrade(nsAHttpConnection *aConn,
nsIHttpUpgradeListener *aUpgradeListener);
// called to update a parameter after the connection manager has already
// been initialized.
nsresult UpdateParam(nsParamName name, PRUint16 value);
@@ -578,6 +587,7 @@ private:
void OnMsgPruneDeadConnections (PRInt32, void *);
void OnMsgSpeculativeConnect (PRInt32, void *);
void OnMsgReclaimConnection (PRInt32, void *);
void OnMsgCompleteUpgrade (PRInt32, void *);
void OnMsgUpdateParam (PRInt32, void *);
void OnMsgClosePersistentConnections (PRInt32, void *);
void OnMsgProcessFeedback (PRInt32, void *);

View File

@@ -233,6 +233,40 @@ private:
};
NS_IMPL_THREADSAFE_ISUPPORTS1(CallAcknowledge, nsIRunnable)
//-----------------------------------------------------------------------------
// CallOnTransportAvailable
//-----------------------------------------------------------------------------
class CallOnTransportAvailable : public nsIRunnable
{
public:
NS_DECL_ISUPPORTS
CallOnTransportAvailable(WebSocketChannel *aChannel,
nsISocketTransport *aTransport,
nsIAsyncInputStream *aSocketIn,
nsIAsyncOutputStream *aSocketOut)
: mChannel(aChannel),
mTransport(aTransport),
mSocketIn(aSocketIn),
mSocketOut(aSocketOut) {}
NS_IMETHOD Run()
{
LOG(("WebSocketChannel::CallOnTransportAvailable %p\n", this));
return mChannel->OnTransportAvailable(mTransport, mSocketIn, mSocketOut);
}
private:
~CallOnTransportAvailable() {}
nsRefPtr<WebSocketChannel> mChannel;
nsCOMPtr<nsISocketTransport> mTransport;
nsCOMPtr<nsIAsyncInputStream> mSocketIn;
nsCOMPtr<nsIAsyncOutputStream> mSocketOut;
};
NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnTransportAvailable, nsIRunnable)
//-----------------------------------------------------------------------------
// OutboundMessage
//-----------------------------------------------------------------------------
@@ -693,6 +727,7 @@ WebSocketChannel::WebSocketChannel() :
mOpenBlocked(0),
mOpenRunning(0),
mChannelWasOpened(0),
mDataStarted(0),
mMaxMessageSize(PR_INT32_MAX),
mStopOnClose(NS_OK),
mServerCloseCode(CLOSE_ABNORMAL),
@@ -1880,6 +1915,14 @@ nsresult
WebSocketChannel::StartWebsocketData()
{
LOG(("WebSocketChannel::StartWebsocketData() %p", this));
NS_ABORT_IF_FALSE(!mDataStarted, "StartWebsocketData twice");
mDataStarted = 1;
LOG(("WebSocketChannel::StartWebsocketData Notifying Listener %p\n",
mListener.get()));
if (mListener)
mListener->OnStart(mContext);
return mSocketIn->AsyncWait(this, 0, 0, mSocketThread);
}
@@ -2384,6 +2427,13 @@ WebSocketChannel::OnTransportAvailable(nsISocketTransport *aTransport,
nsIAsyncInputStream *aSocketIn,
nsIAsyncOutputStream *aSocketOut)
{
if (!NS_IsMainThread()) {
return NS_DispatchToMainThread(new CallOnTransportAvailable(this,
aTransport,
aSocketIn,
aSocketOut));
}
LOG(("WebSocketChannel::OnTransportAvailable %p [%p %p %p] rcvdonstart=%d\n",
this, aTransport, aSocketIn, aSocketOut, mRecvdHttpOnStartRequest));
@@ -2559,12 +2609,6 @@ WebSocketChannel::OnStartRequest(nsIRequest *aRequest,
if (NS_FAILED(rv))
return rv;
LOG(("WebSocketChannel::OnStartRequest: Notifying Listener %p\n",
mListener.get()));
if (mListener)
mListener->OnStart(mContext);
mRecvdHttpOnStartRequest = 1;
if (mRecvdHttpUpgradeTransport)
return StartWebsocketData();

View File

@@ -218,6 +218,7 @@ private:
PRUint32 mOpenBlocked : 1;
PRUint32 mOpenRunning : 1;
PRUint32 mChannelWasOpened : 1;
PRUint32 mDataStarted : 1;
PRInt32 mMaxMessageSize;
nsresult mStopOnClose;