Bug 796176 - Patch 1: UnixSocket changes to get connect/listen running main thread, connect status to consumers; r=cjones

This commit is contained in:
Kyle Machulis
2012-10-10 22:48:40 -07:00
parent c79d1b9999
commit f28e7f6e44
2 changed files with 142 additions and 31 deletions

View File

@@ -54,6 +54,7 @@ public:
~UnixSocketImpl()
{
StopTask();
mReadWatcher.StopWatchingFileDescriptor();
mWriteWatcher.StopWatchingFileDescriptor();
}
@@ -116,12 +117,6 @@ public:
this);
}
void PrepareRemoval()
{
mTask = nullptr;
mCurrentTaskIsCanceled = true;
}
/**
* Connect to a socket
*/
@@ -140,7 +135,14 @@ public:
/**
* Stop whatever connect/accept task is running
*/
void Stop();
void StopTask()
{
if (mTask) {
mTask->Cancel();
mTask = nullptr;
}
mCurrentTaskIsCanceled = true;
}
/**
* Set up nonblocking flags on whatever our current file descriptor is.
@@ -234,6 +236,43 @@ DestroyImpl(UnixSocketImpl* impl)
delete impl;
}
class OnSocketEventTask : public nsRunnable
{
public:
enum SocketEvent {
CONNECT_SUCCESS,
CONNECT_ERROR,
DISCONNECT
};
OnSocketEventTask(UnixSocketImpl* aImpl, SocketEvent e) :
mImpl(aImpl),
mEvent(e)
{
MOZ_ASSERT(aImpl);
}
NS_IMETHOD Run()
{
MOZ_ASSERT(NS_IsMainThread());
if (!mImpl->mConsumer) {
NS_WARNING("CloseSocket has already been called! (mConsumer is null)");
// Since we've already explicitly closed and the close happened before
// this, this isn't really an error. Since we've warned, return OK.
return NS_OK;
}
if (mEvent == CONNECT_SUCCESS) {
mImpl->mConsumer->NotifySuccess();
} else if (mEvent == CONNECT_ERROR) {
mImpl->mConsumer->NotifyError();
}
return NS_OK;
}
private:
UnixSocketImpl* mImpl;
SocketEvent mEvent;
};
class SocketReceiveTask : public nsRunnable
{
public:
@@ -275,7 +314,8 @@ public:
MOZ_ASSERT(aData);
}
void Run()
void
Run()
{
mImpl->QueueWriteData(mData);
}
@@ -294,7 +334,8 @@ public:
{
}
void Run()
void
Run()
{
mImpl->SetUpIO();
}
@@ -344,6 +385,11 @@ UnixSocketImpl::Accept()
socklen_t addr_sz;
struct sockaddr addr;
if (!mConnector) {
NS_WARNING("No connector object available!");
return;
}
// This will set things we don't particularly care about, but it will hand
// back the correct structure size which is what we do care about.
mConnector->CreateAddr(true, addr_sz, &addr, nullptr);
@@ -378,24 +424,23 @@ UnixSocketImpl::Accept()
int client_fd;
client_fd = accept(mFd.get(), &addr, &addr_sz);
if (client_fd < 0) {
#if DEBUG
LOG("Socket accept errno=%d\n", errno);
#endif
EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
return;
}
if(client_fd < 0)
{
EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
return;
}
if (!mConnector->Setup(client_fd)) {
if (!mConnector->SetUp(client_fd)) {
NS_WARNING("Could not set up socket!");
return;
}
mFd.reset(client_fd);
nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
NS_DispatchToMainThread(t);
// Due to the fact that we've dispatched our OnConnectSuccess message before
// starting reading, we're guaranteed that any subsequent read tasks will
// happen after the object has been notified of a successful connect.
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new StartImplReadingTask(this));
}
@@ -424,14 +469,24 @@ UnixSocketImpl::Connect()
LOG("Socket connect errno=%d\n", errno);
#endif
mFd.reset(-1);
nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_ERROR);
NS_DispatchToMainThread(t);
return;
}
if (!mConnector->Setup(mFd)) {
if (!mConnector->SetUp(mFd)) {
NS_WARNING("Could not set up socket!");
return;
}
nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
NS_DispatchToMainThread(t);
// Due to the fact that we've dispatched our OnConnectSuccess message before
// starting reading, we're guaranteed that any subsequent read tasks will
// happen after the object has been notified of a successful connect.
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new StartImplReadingTask(this));
}
@@ -462,6 +517,11 @@ UnixSocketImpl::SetNonblockFlags()
return true;
}
UnixSocketConsumer::UnixSocketConsumer() : mImpl(nullptr)
, mConnectionStatus(SOCKET_DISCONNECTED)
{
}
UnixSocketConsumer::~UnixSocketConsumer()
{
}
@@ -469,6 +529,7 @@ UnixSocketConsumer::~UnixSocketConsumer()
bool
UnixSocketConsumer::SendSocketData(UnixSocketRawData* aData)
{
MOZ_ASSERT(NS_IsMainThread());
if (!mImpl) {
return false;
}
@@ -480,6 +541,7 @@ UnixSocketConsumer::SendSocketData(UnixSocketRawData* aData)
bool
UnixSocketConsumer::SendSocketData(const nsACString& aStr)
{
MOZ_ASSERT(NS_IsMainThread());
if (!mImpl) {
return false;
}
@@ -501,8 +563,9 @@ UnixSocketConsumer::CloseSocket()
return;
}
UnixSocketImpl* impl = mImpl;
mImpl->mConsumer.forget();
mImpl = nullptr;
impl->mConsumer.forget();
impl->StopTask();
// To make sure the owner doesn't die on the IOThread, remove pointer here
// Line it up to be destructed on the IO Thread
// Kill our pointer to it
@@ -538,10 +601,7 @@ UnixSocketImpl::OnFileCanReadWithoutBlocking(int aFd)
// else fall through to error handling on other errno's
}
#ifdef DEBUG
nsAutoString str;
str.AssignLiteral("Cannot read from network, error ");
str += (int)ret;
NS_WARNING(NS_ConvertUTF16toUTF8(str).get());
NS_WARNING("Cannot read from network");
#endif
// At this point, assume that we can't actually access
// the socket anymore
@@ -608,12 +668,27 @@ UnixSocketImpl::OnFileCanWriteWithoutBlocking(int aFd)
}
}
void
UnixSocketConsumer::NotifySuccess()
{
MOZ_ASSERT(NS_IsMainThread());
mConnectionStatus = SOCKET_CONNECTED;
OnConnectSuccess();
}
void
UnixSocketConsumer::NotifyError()
{
MOZ_ASSERT(NS_IsMainThread());
mConnectionStatus = SOCKET_DISCONNECTED;
OnConnectError();
}
bool
UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector,
const char* aAddress)
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(aConnector);
MOZ_ASSERT(NS_IsMainThread());
if (mImpl) {
NS_WARNING("Socket already connecting/connected!");
return false;
@@ -623,14 +698,15 @@ UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector,
mImpl = new UnixSocketImpl(this, aConnector, addr);
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new SocketConnectTask(mImpl));
mConnectionStatus = SOCKET_CONNECTING;
return true;
}
bool
UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(aConnector);
MOZ_ASSERT(NS_IsMainThread());
if (mImpl) {
NS_WARNING("Socket already connecting/connected!");
return false;
@@ -639,12 +715,14 @@ UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
mImpl = new UnixSocketImpl(this, aConnector, addr);
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new SocketAcceptTask(mImpl));
mConnectionStatus = SOCKET_CONNECTING;
return true;
}
void
UnixSocketConsumer::CancelSocketTask()
{
mConnectionStatus = SOCKET_DISCONNECTED;
if(!mImpl) {
NS_WARNING("No socket implementation to cancel task on!");
return;