Bug 836523 - Wait for incoming connections in UnixSocketImpl. r=qdot, r=echou
The UnixSocketImpl currently polls the socket file descriptor while listening for incoming connections and schedules itself to run again if no connection requests have been received. This behavior interferes with closing the socket and deleting the socket structure in the main thread. It can happen that the I/O thread dispatches a SocketAcceptTask to poll the listening socket and the main thread dispatches a DeleteInstanceRunnable for the UnixSocketImpl, such that the delete operation gets dispatched before the poll operation. The latter then operates on the just deleted UnixSocketImpl. With this patch, the I/O thread watches the listing socket for incoming connection requests and only attempts to run accept when connection requests are pending. This allows to serialize polling and close operations within the I/O thread in a sound order. A side effect of this patch is that we don't constantly run code for polling the listing socket, which should result in less CPU overhead and save battery power.
This commit is contained in:
@@ -491,28 +491,7 @@ UnixSocketImpl::Accept()
|
||||
|
||||
}
|
||||
|
||||
int client_fd;
|
||||
client_fd = accept(mFd.get(), &mAddr, &mAddrSize);
|
||||
if (client_fd < 0) {
|
||||
EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!mConnector->SetUp(client_fd)) {
|
||||
NS_WARNING("Could not set up socket!");
|
||||
return;
|
||||
}
|
||||
mFd.reset(client_fd);
|
||||
|
||||
nsRefPtr<OnSocketEventTask> t =
|
||||
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
|
||||
NS_DispatchToMainThread(t);
|
||||
|
||||
// Due to the fact that we've dispatched our OnConnectSuccess message before
|
||||
// starting reading, we're guaranteed that any subsequent read tasks will
|
||||
// happen after the object has been notified of a successful connect.
|
||||
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
|
||||
new StartImplReadingTask(this));
|
||||
SetUpIO();
|
||||
}
|
||||
|
||||
void
|
||||
@@ -577,11 +556,6 @@ UnixSocketImpl::SetNonblockFlags()
|
||||
return false;
|
||||
}
|
||||
|
||||
// Select non-blocking IO.
|
||||
if (-1 == fcntl(mFd, F_SETFL, O_NONBLOCK)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -656,50 +630,82 @@ UnixSocketConsumer::CloseSocket()
|
||||
void
|
||||
UnixSocketImpl::OnFileCanReadWithoutBlocking(int aFd)
|
||||
{
|
||||
// Keep reading data until either
|
||||
//
|
||||
// - mIncoming is completely read
|
||||
// If so, sConsumer->MessageReceived(mIncoming.forget())
|
||||
//
|
||||
// - mIncoming isn't completely read, but there's no more
|
||||
// data available on the socket
|
||||
// If so, break;
|
||||
while (true) {
|
||||
if (!mIncoming) {
|
||||
uint8_t data[MAX_READ_SIZE];
|
||||
ssize_t ret = read(aFd, data, MAX_READ_SIZE);
|
||||
if (ret < 0) {
|
||||
if (ret == -1) {
|
||||
if (errno == EINTR) {
|
||||
continue; // retry system call when interrupted
|
||||
enum SocketConnectionStatus status = mConsumer->GetConnectionStatus();
|
||||
|
||||
if (status == SOCKET_CONNECTED) {
|
||||
|
||||
// Keep reading data until either
|
||||
//
|
||||
// - mIncoming is completely read
|
||||
// If so, sConsumer->MessageReceived(mIncoming.forget())
|
||||
//
|
||||
// - mIncoming isn't completely read, but there's no more
|
||||
// data available on the socket
|
||||
// If so, break;
|
||||
while (true) {
|
||||
if (!mIncoming) {
|
||||
uint8_t data[MAX_READ_SIZE];
|
||||
ssize_t ret = read(aFd, data, MAX_READ_SIZE);
|
||||
if (ret < 0) {
|
||||
if (ret == -1) {
|
||||
if (errno == EINTR) {
|
||||
continue; // retry system call when interrupted
|
||||
}
|
||||
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
return; // no data available: return and re-poll
|
||||
}
|
||||
// else fall through to error handling on other errno's
|
||||
}
|
||||
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
return; // no data available: return and re-poll
|
||||
}
|
||||
// else fall through to error handling on other errno's
|
||||
}
|
||||
#ifdef DEBUG
|
||||
NS_WARNING("Cannot read from network");
|
||||
NS_WARNING("Cannot read from network");
|
||||
#endif
|
||||
// At this point, assume that we can't actually access
|
||||
// the socket anymore
|
||||
mReadWatcher.StopWatchingFileDescriptor();
|
||||
mWriteWatcher.StopWatchingFileDescriptor();
|
||||
nsRefPtr<SocketCloseTask> t = new SocketCloseTask(this);
|
||||
NS_DispatchToMainThread(t);
|
||||
return;
|
||||
}
|
||||
if (ret) {
|
||||
mIncoming = new UnixSocketRawData(ret);
|
||||
memcpy(mIncoming->mData, data, ret);
|
||||
nsRefPtr<SocketReceiveTask> t =
|
||||
new SocketReceiveTask(this, mIncoming.forget());
|
||||
NS_DispatchToMainThread(t);
|
||||
}
|
||||
if (ret < ssize_t(MAX_READ_SIZE)) {
|
||||
return;
|
||||
// At this point, assume that we can't actually access
|
||||
// the socket anymore
|
||||
mReadWatcher.StopWatchingFileDescriptor();
|
||||
mWriteWatcher.StopWatchingFileDescriptor();
|
||||
nsRefPtr<SocketCloseTask> t = new SocketCloseTask(this);
|
||||
NS_DispatchToMainThread(t);
|
||||
return;
|
||||
}
|
||||
if (ret) {
|
||||
mIncoming = new UnixSocketRawData(ret);
|
||||
memcpy(mIncoming->mData, data, ret);
|
||||
nsRefPtr<SocketReceiveTask> t =
|
||||
new SocketReceiveTask(this, mIncoming.forget());
|
||||
NS_DispatchToMainThread(t);
|
||||
}
|
||||
if (ret < ssize_t(MAX_READ_SIZE)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (status == SOCKET_LISTENING) {
|
||||
|
||||
int client_fd = accept(mFd.get(), &mAddr, &mAddrSize);
|
||||
|
||||
if (client_fd < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!mConnector->SetUp(client_fd)) {
|
||||
NS_WARNING("Could not set up socket!");
|
||||
return;
|
||||
}
|
||||
|
||||
mReadWatcher.StopWatchingFileDescriptor();
|
||||
mWriteWatcher.StopWatchingFileDescriptor();
|
||||
|
||||
mFd.reset(client_fd);
|
||||
|
||||
nsRefPtr<OnSocketEventTask> t =
|
||||
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
|
||||
NS_DispatchToMainThread(t);
|
||||
|
||||
// Due to the fact that we've dispatched our OnConnectSuccess message before
|
||||
// starting reading, we're guaranteed that any subsequent read tasks will
|
||||
// happen after the object has been notified of a successful connect.
|
||||
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
|
||||
new StartImplReadingTask(this));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -798,12 +804,12 @@ UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector,
|
||||
addr.Assign(aAddress);
|
||||
mImpl = new UnixSocketImpl(this, aConnector, addr);
|
||||
MessageLoop* ioLoop = XRE_GetIOMessageLoop();
|
||||
mConnectionStatus = SOCKET_CONNECTING;
|
||||
if (aDelayMs > 0) {
|
||||
ioLoop->PostDelayedTask(FROM_HERE, new SocketConnectTask(mImpl), aDelayMs);
|
||||
} else {
|
||||
ioLoop->PostTask(FROM_HERE, new SocketConnectTask(mImpl));
|
||||
}
|
||||
mConnectionStatus = SOCKET_CONNECTING;
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -818,9 +824,9 @@ UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
|
||||
}
|
||||
nsCString addr;
|
||||
mImpl = new UnixSocketImpl(this, aConnector, addr);
|
||||
mConnectionStatus = SOCKET_LISTENING;
|
||||
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
|
||||
new SocketAcceptTask(mImpl));
|
||||
mConnectionStatus = SOCKET_LISTENING;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user