Bug 974410: Inherit UnixSocketImpl from UnixFdWatcher, r=kyle

With this patch UnixSocketImpl inherits from UnixFdWatcher. The
new base class implements general file-descriptor handling and
I/O watchers.
This commit is contained in:
Thomas Zimmermann
2014-02-26 17:52:00 +01:00
parent b9f868744e
commit cdbb475ead

View File

@@ -16,6 +16,7 @@
#include "base/eintr_wrapper.h"
#include "base/message_loop.h"
#include "mozilla/ipc/UnixSocketWatcher.h"
#include "mozilla/Monitor.h"
#include "mozilla/FileUtils.h"
#include "nsString.h"
@@ -38,14 +39,15 @@ static const int SOCKET_RETRY_TIME_MS = 1000;
namespace mozilla {
namespace ipc {
class UnixSocketImpl : public MessageLoopForIO::Watcher
class UnixSocketImpl : public UnixFdWatcher
{
public:
UnixSocketImpl(UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector,
UnixSocketImpl(MessageLoop* mIOLoop,
UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector,
const nsACString& aAddress,
SocketConnectionStatus aConnectionStatus)
: mConsumer(aConsumer)
, mIOLoop(nullptr)
: UnixFdWatcher(mIOLoop)
, mConsumer(aConsumer)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
, mAddress(aAddress)
@@ -63,12 +65,7 @@ public:
void QueueWriteData(UnixSocketRawData* aData)
{
mOutgoingQ.AppendElement(aData);
OnFileCanWriteWithoutBlocking(mFd);
}
bool isFdValid()
{
return mFd > 0;
OnFileCanWriteWithoutBlocking(GetFd());
}
bool IsShutdownOnMainThread()
@@ -94,22 +91,15 @@ public:
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
mReadWatcher.StopWatchingFileDescriptor();
mWriteWatcher.StopWatchingFileDescriptor();
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
mShuttingDownOnIOThread = true;
}
void SetUpIO()
{
MOZ_ASSERT(!mIOLoop);
MOZ_ASSERT(mFd >= 0);
mIOLoop = MessageLoopForIO::current();
mIOLoop->WatchFileDescriptor(mFd,
true,
MessageLoopForIO::WATCH_READ,
&mReadWatcher,
this);
MOZ_ASSERT(IsOpen());
AddWatchers(READ_WATCHER, true);
}
void SetDelayedConnectTask(CancelableTask* aTask)
@@ -193,33 +183,12 @@ private:
*/
virtual void OnFileCanWriteWithoutBlocking(int aFd);
/**
* IO Loop pointer. Must be initalized and called from IO thread only.
*/
MessageLoopForIO* mIOLoop;
/**
* Raw data queue. Must be pushed/popped from IO thread only.
*/
typedef nsTArray<UnixSocketRawData* > UnixSocketRawDataQueue;
UnixSocketRawDataQueue mOutgoingQ;
/**
* Read watcher for libevent. Only to be accessed on IO Thread.
*/
MessageLoopForIO::FileDescriptorWatcher mReadWatcher;
/**
* Write watcher for libevent. Only to be accessed on IO Thread.
*/
MessageLoopForIO::FileDescriptorWatcher mWriteWatcher;
/**
* File descriptor to read from/write to. Connection happens on user provided
* thread. Read/write/close happens on IO thread.
*/
ScopedClose mFd;
/**
* Connector object used to create the connection we are currently using.
*/
@@ -495,10 +464,8 @@ UnixSocketImpl::FireSocketError()
MOZ_ASSERT(!NS_IsMainThread());
// Clean up watchers, statuses, fds
mReadWatcher.StopWatchingFileDescriptor();
mWriteWatcher.StopWatchingFileDescriptor();
Close();
mConnectionStatus = SOCKET_DISCONNECTED;
mFd.reset(-1);
// Tell the main thread we've errored
nsRefPtr<OnSocketEventTask> t =
@@ -520,13 +487,14 @@ UnixSocketImpl::Accept()
return;
}
if (mFd.get() < 0) {
mFd = mConnector->Create();
if (mFd.get() < 0) {
if (!IsOpen()) {
int fd = mConnector->Create();
if (fd < 0) {
NS_WARNING("Cannot create socket fd!");
FireSocketError();
return;
}
SetFd(fd);
if (!SetSocketFlags()) {
NS_WARNING("Cannot set socket flags!");
@@ -534,23 +502,23 @@ UnixSocketImpl::Accept()
return;
}
if (bind(mFd.get(), (struct sockaddr*)&mAddr, mAddrSize)) {
if (bind(GetFd(), (struct sockaddr*)&mAddr, mAddrSize)) {
#ifdef DEBUG
CHROMIUM_LOG("...bind(%d) gave errno %d", mFd.get(), errno);
CHROMIUM_LOG("...bind(%d) gave errno %d", GetFd(), errno);
#endif
FireSocketError();
return;
}
if (listen(mFd.get(), 1)) {
if (listen(GetFd(), 1)) {
#ifdef DEBUG
CHROMIUM_LOG("...listen(%d) gave errno %d", mFd.get(), errno);
CHROMIUM_LOG("...listen(%d) gave errno %d", GetFd(), errno);
#endif
FireSocketError();
return;
}
if (!mConnector->SetUpListenSocket(mFd)) {
if (!mConnector->SetUpListenSocket(GetFd())) {
NS_WARNING("Could not set up listen socket!");
FireSocketError();
return;
@@ -567,13 +535,14 @@ UnixSocketImpl::Connect()
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(mConnector);
if (mFd.get() < 0) {
mFd = mConnector->Create();
if (mFd.get() < 0) {
if (!IsOpen()) {
int fd = mConnector->Create();
if (fd < 0) {
NS_WARNING("Cannot create socket fd!");
FireSocketError();
return;
}
SetFd(fd);
}
int ret;
@@ -585,37 +554,31 @@ UnixSocketImpl::Connect()
}
// Select non-blocking IO.
if (-1 == fcntl(mFd.get(), F_SETFL, O_NONBLOCK)) {
if (-1 == fcntl(GetFd(), F_SETFL, O_NONBLOCK)) {
NS_WARNING("Cannot set nonblock!");
FireSocketError();
return;
}
ret = connect(mFd.get(), (struct sockaddr*)&mAddr, mAddrSize);
ret = connect(GetFd(), (struct sockaddr*)&mAddr, mAddrSize);
if (ret) {
if (errno == EINPROGRESS) {
// Select blocking IO again, since we've now at least queue'd the connect
// as nonblock.
int current_opts = fcntl(mFd.get(), F_GETFL, 0);
int current_opts = fcntl(GetFd(), F_GETFL, 0);
if (-1 == current_opts) {
NS_WARNING("Cannot get socket opts!");
FireSocketError();
return;
}
if (-1 == fcntl(mFd.get(), F_SETFL, current_opts & ~O_NONBLOCK)) {
if (-1 == fcntl(GetFd(), F_SETFL, current_opts & ~O_NONBLOCK)) {
NS_WARNING("Cannot set socket opts to blocking!");
FireSocketError();
return;
}
// Set up a write watch to make sure we receive the connect signal
MessageLoopForIO::current()->WatchFileDescriptor(
mFd.get(),
false,
MessageLoopForIO::WATCH_WRITE,
&mWriteWatcher,
this);
AddWatchers(WRITE_WATCHER, false);
#ifdef DEBUG
CHROMIUM_LOG("UnixSocket Connection delayed!");
@@ -635,7 +598,7 @@ UnixSocketImpl::Connect()
return;
}
if (!mConnector->SetUp(mFd)) {
if (!mConnector->SetUp(GetFd())) {
NS_WARNING("Could not set up socket!");
FireSocketError();
return;
@@ -654,16 +617,16 @@ UnixSocketImpl::SetSocketFlags()
{
// Set socket addr to be reused even if kernel is still waiting to close
int n = 1;
setsockopt(mFd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n));
setsockopt(GetFd(), SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n));
// Set close-on-exec bit.
int flags = fcntl(mFd, F_GETFD);
int flags = fcntl(GetFd(), F_GETFD);
if (-1 == flags) {
return false;
}
flags |= FD_CLOEXEC;
if (-1 == fcntl(mFd, F_SETFD, flags)) {
if (-1 == fcntl(GetFd(), F_SETFD, flags)) {
return false;
}
@@ -766,8 +729,7 @@ UnixSocketImpl::OnFileCanReadWithoutBlocking(int aFd)
// We're done with our descriptors. Ensure that spurious events don't
// cause us to end up back here.
mReadWatcher.StopWatchingFileDescriptor();
mWriteWatcher.StopWatchingFileDescriptor();
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
nsRefPtr<RequestClosingSocketTask> t = new RequestClosingSocketTask(this);
NS_DispatchToMainThread(t);
return;
@@ -787,7 +749,7 @@ UnixSocketImpl::OnFileCanReadWithoutBlocking(int aFd)
MOZ_CRASH("We returned early");
} else if (mConnectionStatus == SOCKET_LISTENING) {
int client_fd = accept(mFd.get(), (struct sockaddr*)&mAddr, &mAddrSize);
int client_fd = accept(GetFd(), (struct sockaddr*)&mAddr, &mAddrSize);
if (client_fd < 0) {
return;
@@ -798,16 +760,13 @@ UnixSocketImpl::OnFileCanReadWithoutBlocking(int aFd)
return;
}
mReadWatcher.StopWatchingFileDescriptor();
mWriteWatcher.StopWatchingFileDescriptor();
mFd.reset(client_fd);
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
Close();
SetFd(client_fd);
if (!SetSocketFlags()) {
return;
}
mIOLoop = nullptr;
nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
NS_DispatchToMainThread(t);
@@ -822,8 +781,8 @@ UnixSocketImpl::OnFileCanWriteWithoutBlocking(int aFd)
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
MOZ_ASSERT(aFd >= 0);
if (mConnectionStatus == SOCKET_CONNECTED) {
// Try to write the bytes of mCurrentRilRawData. If all were written, continue.
//
@@ -854,12 +813,7 @@ UnixSocketImpl::OnFileCanWriteWithoutBlocking(int aFd)
}
if (data->mCurrentWriteOffset != data->mSize) {
MessageLoopForIO::current()->WatchFileDescriptor(
aFd,
false,
MessageLoopForIO::WATCH_WRITE,
&mWriteWatcher,
this);
AddWatchers(WRITE_WATCHER, false);
return;
}
mOutgoingQ.RemoveElementAt(0);
@@ -868,7 +822,7 @@ UnixSocketImpl::OnFileCanWriteWithoutBlocking(int aFd)
} else if (mConnectionStatus == SOCKET_CONNECTING) {
int error, ret;
socklen_t len = sizeof(error);
ret = getsockopt(mFd.get(), SOL_SOCKET, SO_ERROR, &error, &len);
ret = getsockopt(GetFd(), SOL_SOCKET, SO_ERROR, &error, &len);
if (ret || error) {
NS_WARNING("getsockopt failure on async socket connect!");
@@ -882,7 +836,7 @@ UnixSocketImpl::OnFileCanWriteWithoutBlocking(int aFd)
return;
}
if (!mConnector->SetUp(mFd)) {
if (!mConnector->SetUp(GetFd())) {
NS_WARNING("Could not set up socket!");
FireSocketError();
return;
@@ -951,8 +905,8 @@ UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector,
}
nsCString addr(aAddress);
mImpl = new UnixSocketImpl(this, connector.forget(), addr, SOCKET_CONNECTING);
MessageLoop* ioLoop = XRE_GetIOMessageLoop();
mImpl = new UnixSocketImpl(ioLoop, this, connector.forget(), addr, SOCKET_CONNECTING);
mConnectionStatus = SOCKET_CONNECTING;
if (aDelayMs > 0) {
SocketDelayedConnectTask* connectTask = new SocketDelayedConnectTask(mImpl);
@@ -977,8 +931,8 @@ UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
return false;
}
mImpl = new UnixSocketImpl(this, connector.forget(), EmptyCString(),
SOCKET_LISTENING);
mImpl = new UnixSocketImpl(XRE_GetIOMessageLoop(), this, connector.forget(),
EmptyCString(), SOCKET_LISTENING);
mConnectionStatus = SOCKET_LISTENING;
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new SocketAcceptTask(mImpl));