Backed out changeset 452a777db125 (bug 1046109)

This commit is contained in:
Carsten "Tomcat" Book
2014-07-31 10:14:18 +02:00
parent d5afb4f81c
commit dc3769a467
2 changed files with 287 additions and 347 deletions

View File

@@ -16,55 +16,113 @@ namespace mozilla {
namespace ipc {
//
// UnixSocketConsumerIO
// UnixSocketImpl
//
class UnixSocketConsumerIO MOZ_FINAL : public UnixSocketWatcher
, protected SocketIOBase
class UnixSocketImpl : public UnixSocketWatcher
, protected SocketIOBase
{
public:
UnixSocketConsumerIO(MessageLoop* mIOLoop,
UnixSocketConsumer* aConsumer,
UnixSocketConnector* aConnector,
const nsACString& aAddress);
~UnixSocketConsumerIO();
UnixSocketImpl(MessageLoop* mIOLoop,
UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector,
const nsACString& aAddress)
: UnixSocketWatcher(mIOLoop)
, SocketIOBase(MAX_READ_SIZE)
, mConsumer(aConsumer)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
, mAddress(aAddress)
, mDelayedConnectTask(nullptr)
{
}
void GetSocketAddr(nsAString& aAddrStr) const;
SocketConsumerBase* GetConsumer();
~UnixSocketImpl()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsShutdownOnMainThread());
}
// Shutdown state
//
void Send(UnixSocketRawData* aData)
{
EnqueueData(aData);
AddWatchers(WRITE_WATCHER, false);
}
bool IsShutdownOnMainThread() const;
void ShutdownOnMainThread();
bool IsShutdownOnMainThread()
{
MOZ_ASSERT(NS_IsMainThread());
return mConsumer == nullptr;
}
bool IsShutdownOnIOThread() const;
void ShutdownOnIOThread();
void ShutdownOnMainThread()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(!IsShutdownOnMainThread());
mConsumer = nullptr;
}
// Delayed-task handling
//
bool IsShutdownOnIOThread()
{
return mShuttingDownOnIOThread;
}
void SetDelayedConnectTask(CancelableTask* aTask);
void ClearDelayedConnectTask();
void CancelDelayedConnectTask();
void ShutdownOnIOThread()
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
// Task callback methods
//
Close(); // will also remove fd from I/O loop
mShuttingDownOnIOThread = true;
}
/**
* Run bind/listen to prepare for further runs of accept()
*/
void Listen();
void SetDelayedConnectTask(CancelableTask* aTask)
{
MOZ_ASSERT(NS_IsMainThread());
mDelayedConnectTask = aTask;
}
void ClearDelayedConnectTask()
{
MOZ_ASSERT(NS_IsMainThread());
mDelayedConnectTask = nullptr;
}
void CancelDelayedConnectTask()
{
MOZ_ASSERT(NS_IsMainThread());
if (!mDelayedConnectTask) {
return;
}
mDelayedConnectTask->Cancel();
ClearDelayedConnectTask();
}
/**
* Connect to a socket
*/
void Connect();
void Send(UnixSocketRawData* aData);
/**
* Run bind/listen to prepare for further runs of accept()
*/
void Listen();
// I/O callback methods
//
void GetSocketAddr(nsAString& aAddrStr)
{
if (!mConnector) {
NS_WARNING("No connector to get socket address from!");
aAddrStr.Truncate();
return;
}
mConnector->GetSocketAddr(mAddr, aAddrStr);
}
/**
* Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
* directly from main thread. All non-main-thread accesses should happen with
* mImpl as container.
*/
RefPtr<UnixSocketConsumer> mConsumer;
void OnAccepted(int aFd, const sockaddr_any* aAddr,
socklen_t aAddrLen) MOZ_OVERRIDE;
@@ -74,18 +132,16 @@ public:
void OnSocketCanReceiveWithoutBlocking() MOZ_OVERRIDE;
void OnSocketCanSendWithoutBlocking() MOZ_OVERRIDE;
private:
void FireSocketError();
SocketConsumerBase* GetConsumer()
{
return mConsumer.get();
}
// Set up flags on file descriptor.
private:
// Set up flags on whatever our current file descriptor is.
static bool SetSocketFlags(int aFd);
/**
* Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
* directly from main thread. All non-main-thread accesses should happen with
* mIO as container.
*/
RefPtr<UnixSocketConsumer> mConsumer;
void FireSocketError();
/**
* Connector object used to create the connection we are currently using.
@@ -118,109 +174,77 @@ private:
CancelableTask* mDelayedConnectTask;
};
UnixSocketConsumerIO::UnixSocketConsumerIO(MessageLoop* mIOLoop,
UnixSocketConsumer* aConsumer,
UnixSocketConnector* aConnector,
const nsACString& aAddress)
: UnixSocketWatcher(mIOLoop)
, SocketIOBase(MAX_READ_SIZE)
, mConsumer(aConsumer)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
, mAddress(aAddress)
, mDelayedConnectTask(nullptr)
class SocketListenTask : public SocketIOTask<UnixSocketImpl>
{
MOZ_ASSERT(mConsumer);
MOZ_ASSERT(mConnector);
}
public:
SocketListenTask(UnixSocketImpl* aImpl)
: SocketIOTask<UnixSocketImpl>(aImpl)
{ }
UnixSocketConsumerIO::~UnixSocketConsumerIO()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsShutdownOnMainThread());
}
void
UnixSocketConsumerIO::GetSocketAddr(nsAString& aAddrStr) const
{
if (!mConnector) {
NS_WARNING("No connector to get socket address from!");
aAddrStr.Truncate();
return;
void Run() MOZ_OVERRIDE
{
MOZ_ASSERT(!NS_IsMainThread());
if (!IsCanceled()) {
GetIO()->Listen();
}
}
mConnector->GetSocketAddr(mAddr, aAddrStr);
}
};
SocketConsumerBase*
UnixSocketConsumerIO::GetConsumer()
class SocketConnectTask : public SocketIOTask<UnixSocketImpl>
{
return mConsumer.get();
}
public:
SocketConnectTask(UnixSocketImpl* aImpl)
: SocketIOTask<UnixSocketImpl>(aImpl)
{ }
bool
UnixSocketConsumerIO::IsShutdownOnMainThread() const
{
MOZ_ASSERT(NS_IsMainThread());
return mConsumer == nullptr;
}
void
UnixSocketConsumerIO::ShutdownOnMainThread()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(!IsShutdownOnMainThread());
mConsumer = nullptr;
}
bool
UnixSocketConsumerIO::IsShutdownOnIOThread() const
{
return mShuttingDownOnIOThread;
}
void
UnixSocketConsumerIO::ShutdownOnIOThread()
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
Close(); // will also remove fd from I/O loop
mShuttingDownOnIOThread = true;
}
void
UnixSocketConsumerIO::SetDelayedConnectTask(CancelableTask* aTask)
{
MOZ_ASSERT(NS_IsMainThread());
mDelayedConnectTask = aTask;
}
void
UnixSocketConsumerIO::ClearDelayedConnectTask()
{
MOZ_ASSERT(NS_IsMainThread());
mDelayedConnectTask = nullptr;
}
void
UnixSocketConsumerIO::CancelDelayedConnectTask()
{
MOZ_ASSERT(NS_IsMainThread());
if (!mDelayedConnectTask) {
return;
void Run() MOZ_OVERRIDE
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsCanceled());
GetIO()->Connect();
}
};
mDelayedConnectTask->Cancel();
ClearDelayedConnectTask();
class SocketDelayedConnectTask : public SocketIOTask<UnixSocketImpl>
{
public:
SocketDelayedConnectTask(UnixSocketImpl* aImpl)
: SocketIOTask<UnixSocketImpl>(aImpl)
{ }
void Run() MOZ_OVERRIDE
{
MOZ_ASSERT(NS_IsMainThread());
if (IsCanceled()) {
return;
}
UnixSocketImpl* impl = GetIO();
if (impl->IsShutdownOnMainThread()) {
return;
}
impl->ClearDelayedConnectTask();
XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketConnectTask(impl));
}
};
void
UnixSocketImpl::FireSocketError()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
// Clean up watchers, statuses, fds
Close();
// Tell the main thread we've errored
nsRefPtr<nsRunnable> r =
new SocketIOEventRunnable<UnixSocketImpl>(
this, SocketIOEventRunnable<UnixSocketImpl>::CONNECT_ERROR);
NS_DispatchToMainThread(r);
}
void
UnixSocketConsumerIO::Listen()
UnixSocketImpl::Listen()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(mConnector);
@@ -255,7 +279,7 @@ UnixSocketConsumerIO::Listen()
}
void
UnixSocketConsumerIO::Connect()
UnixSocketImpl::Connect()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(mConnector);
@@ -287,149 +311,8 @@ UnixSocketConsumerIO::Connect()
NS_WARN_IF(NS_FAILED(rv));
}
void
UnixSocketConsumerIO::Send(UnixSocketRawData* aData)
{
EnqueueData(aData);
AddWatchers(WRITE_WATCHER, false);
}
void
UnixSocketConsumerIO::OnAccepted(int aFd,
const sockaddr_any* aAddr,
socklen_t aAddrLen)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
MOZ_ASSERT(aAddr);
MOZ_ASSERT(aAddrLen <= sizeof(mAddr));
memcpy (&mAddr, aAddr, aAddrLen);
mAddrSize = aAddrLen;
if (!mConnector->SetUp(aFd)) {
NS_WARNING("Could not set up socket!");
return;
}
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
Close();
if (!SetSocketFlags(aFd)) {
return;
}
SetSocket(aFd, SOCKET_IS_CONNECTED);
nsRefPtr<nsRunnable> r =
new SocketIOEventRunnable<UnixSocketConsumerIO>(
this, SocketIOEventRunnable<UnixSocketConsumerIO>::CONNECT_SUCCESS);
NS_DispatchToMainThread(r);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void
UnixSocketConsumerIO::OnConnected()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
if (!SetSocketFlags(GetFd())) {
NS_WARNING("Cannot set socket flags!");
FireSocketError();
return;
}
if (!mConnector->SetUp(GetFd())) {
NS_WARNING("Could not set up socket!");
FireSocketError();
return;
}
nsRefPtr<nsRunnable> r =
new SocketIOEventRunnable<UnixSocketConsumerIO>(
this, SocketIOEventRunnable<UnixSocketConsumerIO>::CONNECT_SUCCESS);
NS_DispatchToMainThread(r);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void
UnixSocketConsumerIO::OnListening()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
if (!mConnector->SetUpListenSocket(GetFd())) {
NS_WARNING("Could not set up listen socket!");
FireSocketError();
return;
}
AddWatchers(READ_WATCHER, true);
}
void
UnixSocketConsumerIO::OnError(const char* aFunction, int aErrno)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
UnixFdWatcher::OnError(aFunction, aErrno);
FireSocketError();
}
void
UnixSocketConsumerIO::OnSocketCanReceiveWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
nsresult rv = ReceiveData(GetFd(), this);
if (NS_FAILED(rv)) {
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
return;
}
}
void
UnixSocketConsumerIO::OnSocketCanSendWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
nsresult rv = SendPendingData(GetFd(), this);
if (NS_FAILED(rv)) {
return;
}
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void
UnixSocketConsumerIO::FireSocketError()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
// Clean up watchers, statuses, fds
Close();
// Tell the main thread we've errored
nsRefPtr<nsRunnable> r =
new SocketIOEventRunnable<UnixSocketConsumerIO>(
this, SocketIOEventRunnable<UnixSocketConsumerIO>::CONNECT_ERROR);
NS_DispatchToMainThread(r);
}
bool
UnixSocketConsumerIO::SetSocketFlags(int aFd)
UnixSocketImpl::SetSocketFlags(int aFd)
{
// Set socket addr to be reused even if kernel is still waiting to close
int n = 1;
@@ -460,92 +343,148 @@ UnixSocketConsumerIO::SetSocketFlags(int aFd)
return true;
}
//
// Socket tasks
//
class ListenTask MOZ_FINAL : public SocketIOTask<UnixSocketConsumerIO>
void
UnixSocketImpl::OnAccepted(int aFd,
const sockaddr_any* aAddr,
socklen_t aAddrLen)
{
public:
ListenTask(UnixSocketConsumerIO* aIO)
: SocketIOTask<UnixSocketConsumerIO>(aIO)
{ }
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
MOZ_ASSERT(aAddr);
MOZ_ASSERT(aAddrLen <= sizeof(mAddr));
void Run() MOZ_OVERRIDE
{
MOZ_ASSERT(!NS_IsMainThread());
memcpy (&mAddr, aAddr, aAddrLen);
mAddrSize = aAddrLen;
if (!IsCanceled()) {
GetIO()->Listen();
}
if (!mConnector->SetUp(aFd)) {
NS_WARNING("Could not set up socket!");
return;
}
};
class ConnectTask MOZ_FINAL : public SocketIOTask<UnixSocketConsumerIO>
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
Close();
if (!SetSocketFlags(aFd)) {
return;
}
SetSocket(aFd, SOCKET_IS_CONNECTED);
nsRefPtr<nsRunnable> r =
new SocketIOEventRunnable<UnixSocketImpl>(
this, SocketIOEventRunnable<UnixSocketImpl>::CONNECT_SUCCESS);
NS_DispatchToMainThread(r);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void
UnixSocketImpl::OnConnected()
{
public:
ConnectTask(UnixSocketConsumerIO* aIO)
: SocketIOTask<UnixSocketConsumerIO>(aIO)
{ }
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
void Run() MOZ_OVERRIDE
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsCanceled());
GetIO()->Connect();
if (!SetSocketFlags(GetFd())) {
NS_WARNING("Cannot set socket flags!");
FireSocketError();
return;
}
};
class DelayedConnectTask MOZ_FINAL : public SocketIOTask<UnixSocketConsumerIO>
if (!mConnector->SetUp(GetFd())) {
NS_WARNING("Could not set up socket!");
FireSocketError();
return;
}
nsRefPtr<nsRunnable> r =
new SocketIOEventRunnable<UnixSocketImpl>(
this, SocketIOEventRunnable<UnixSocketImpl>::CONNECT_SUCCESS);
NS_DispatchToMainThread(r);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void
UnixSocketImpl::OnListening()
{
public:
DelayedConnectTask(UnixSocketConsumerIO* aIO)
: SocketIOTask<UnixSocketConsumerIO>(aIO)
{ }
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
void Run() MOZ_OVERRIDE
{
MOZ_ASSERT(NS_IsMainThread());
if (IsCanceled()) {
return;
}
UnixSocketConsumerIO* io = GetIO();
if (io->IsShutdownOnMainThread()) {
return;
}
io->ClearDelayedConnectTask();
XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new ConnectTask(io));
if (!mConnector->SetUpListenSocket(GetFd())) {
NS_WARNING("Could not set up listen socket!");
FireSocketError();
return;
}
};
AddWatchers(READ_WATCHER, true);
}
void
UnixSocketImpl::OnError(const char* aFunction, int aErrno)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
UnixFdWatcher::OnError(aFunction, aErrno);
FireSocketError();
}
void
UnixSocketImpl::OnSocketCanReceiveWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
nsresult rv = ReceiveData(GetFd(), this);
if (NS_FAILED(rv)) {
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
return;
}
}
void
UnixSocketImpl::OnSocketCanSendWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
nsresult rv = SendPendingData(GetFd(), this);
if (NS_FAILED(rv)) {
return;
}
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
//
// UnixSocketConsumer
//
UnixSocketConsumer::UnixSocketConsumer()
: mIO(nullptr)
: mImpl(nullptr)
{ }
UnixSocketConsumer::~UnixSocketConsumer()
{
MOZ_ASSERT(!mIO);
MOZ_ASSERT(!mImpl);
}
bool
UnixSocketConsumer::SendSocketData(UnixSocketRawData* aData)
{
MOZ_ASSERT(NS_IsMainThread());
if (!mIO) {
if (!mImpl) {
return false;
}
MOZ_ASSERT(!mIO->IsShutdownOnMainThread());
MOZ_ASSERT(!mImpl->IsShutdownOnMainThread());
XRE_GetIOMessageLoop()->PostTask(
FROM_HERE, new SocketIOSendTask<UnixSocketConsumerIO>(mIO, aData));
FROM_HERE, new SocketIOSendTask<UnixSocketImpl>(mImpl, aData));
return true;
}
@@ -573,21 +512,21 @@ void
UnixSocketConsumer::CloseSocket()
{
MOZ_ASSERT(NS_IsMainThread());
if (!mIO) {
if (!mImpl) {
return;
}
mIO->CancelDelayedConnectTask();
mImpl->CancelDelayedConnectTask();
// From this point on, we consider mIO as being deleted.
// From this point on, we consider mImpl as being deleted.
// We sever the relationship here so any future calls to listen or connect
// will create a new implementation.
mIO->ShutdownOnMainThread();
mImpl->ShutdownOnMainThread();
XRE_GetIOMessageLoop()->PostTask(
FROM_HERE, new SocketIOShutdownTask<UnixSocketConsumerIO>(mIO));
FROM_HERE, new SocketIOShutdownTask<UnixSocketImpl>(mImpl));
mIO = nullptr;
mImpl = nullptr;
NotifyDisconnect();
}
@@ -596,11 +535,11 @@ void
UnixSocketConsumer::GetSocketAddr(nsAString& aAddrStr)
{
aAddrStr.Truncate();
if (!mIO || GetConnectionStatus() != SOCKET_CONNECTED) {
if (!mImpl || GetConnectionStatus() != SOCKET_CONNECTED) {
NS_WARNING("No socket currently open!");
return;
}
mIO->GetSocketAddr(aAddrStr);
mImpl->GetSocketAddr(aAddrStr);
}
bool
@@ -613,21 +552,21 @@ UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector,
nsAutoPtr<UnixSocketConnector> connector(aConnector);
if (mIO) {
if (mImpl) {
NS_WARNING("Socket already connecting/connected!");
return false;
}
nsCString addr(aAddress);
MessageLoop* ioLoop = XRE_GetIOMessageLoop();
mIO = new UnixSocketConsumerIO(ioLoop, this, connector.forget(), addr);
mImpl = new UnixSocketImpl(ioLoop, this, connector.forget(), addr);
SetConnectionStatus(SOCKET_CONNECTING);
if (aDelayMs > 0) {
DelayedConnectTask* connectTask = new DelayedConnectTask(mIO);
mIO->SetDelayedConnectTask(connectTask);
SocketDelayedConnectTask* connectTask = new SocketDelayedConnectTask(mImpl);
mImpl->SetDelayedConnectTask(connectTask);
MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs);
} else {
ioLoop->PostTask(FROM_HERE, new ConnectTask(mIO));
ioLoop->PostTask(FROM_HERE, new SocketConnectTask(mImpl));
}
return true;
}
@@ -640,15 +579,16 @@ UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
nsAutoPtr<UnixSocketConnector> connector(aConnector);
if (mIO) {
if (mImpl) {
NS_WARNING("Socket already connecting/connected!");
return false;
}
mIO = new UnixSocketConsumerIO(
XRE_GetIOMessageLoop(), this, connector.forget(), EmptyCString());
mImpl = new UnixSocketImpl(XRE_GetIOMessageLoop(), this, connector.forget(),
EmptyCString());
SetConnectionStatus(SOCKET_LISTENING);
XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new ListenTask(mIO));
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new SocketListenTask(mImpl));
return true;
}