Bug 1170466: Implement accept and connect in |ConnectionOrientedSocketIO|, r=kmachulis

|ConnectionOrientedSocketIO| now handles accepting and connecting
sockets. All sub-classes have been changed accordingly.
This commit is contained in:
Thomas Zimmermann
2015-06-03 11:53:50 +02:00
parent 5c7d8ad16b
commit e2b97ef62c
4 changed files with 104 additions and 157 deletions

View File

@@ -5,6 +5,7 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "ConnectionOrientedSocket.h"
#include "UnixSocketConnector.h"
namespace mozilla {
namespace ipc {
@@ -17,21 +18,85 @@ ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
int aFd,
ConnectionStatus aConnectionStatus)
ConnectionStatus aConnectionStatus,
UnixSocketConnector* aConnector)
: DataSocketIO(aConsumerThread)
, UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus)
{ }
, mConnector(aConnector)
, mPeerAddressLength(0)
{
MOZ_ASSERT(mConnector);
}
ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
nsIThread* aConsumerThread,
MessageLoop* aIOLoop)
MessageLoop* aIOLoop,
UnixSocketConnector* aConnector)
: DataSocketIO(aConsumerThread)
, UnixSocketWatcher(aIOLoop)
{ }
, mConnector(aConnector)
, mPeerAddressLength(0)
{
MOZ_ASSERT(mConnector);
}
ConnectionOrientedSocketIO::~ConnectionOrientedSocketIO()
{ }
nsresult
ConnectionOrientedSocketIO::Accept(int aFd,
const struct sockaddr* aPeerAddress,
socklen_t aPeerAddressLength)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTING);
SetSocket(aFd, SOCKET_IS_CONNECTED);
// Address setup
mPeerAddressLength = aPeerAddressLength;
memcpy(&mPeerAddress, aPeerAddress, mPeerAddressLength);
// Signal success and start data transfer
OnConnected();
return NS_OK;
}
nsresult
ConnectionOrientedSocketIO::Connect()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(!IsOpen());
struct sockaddr* peerAddress =
reinterpret_cast<struct sockaddr*>(&mPeerAddress);
mPeerAddressLength = sizeof(mPeerAddress);
int fd;
nsresult rv = mConnector->CreateStreamSocket(peerAddress,
&mPeerAddressLength,
fd);
if (NS_FAILED(rv)) {
// Tell the consumer thread we've errored
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
return NS_ERROR_FAILURE;
}
SetFd(fd);
// calls OnConnected() on success, or OnError() otherwise
rv = UnixSocketWatcher::Connect(peerAddress, mPeerAddressLength);
if (NS_FAILED(rv)) {
return rv;
}
return NS_OK;
}
void
ConnectionOrientedSocketIO::Send(UnixSocketIOBuffer* aBuffer)
{

View File

@@ -31,9 +31,11 @@ class ConnectionOrientedSocketIO
public:
virtual ~ConnectionOrientedSocketIO();
virtual nsresult Accept(int aFd,
const struct sockaddr* aAddress,
socklen_t aAddressLength) = 0;
nsresult Accept(int aFd,
const struct sockaddr* aAddress,
socklen_t aAddressLength);
nsresult Connect();
void Send(UnixSocketIOBuffer* aBuffer);
@@ -55,19 +57,39 @@ protected:
* @param aIOLoop The socket's I/O loop.
* @param aFd The socket file descriptor.
* @param aConnectionStatus The connection status for |aFd|.
* @param aConnector Connector object for socket-type-specific methods.
*/
ConnectionOrientedSocketIO(nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
int aFd, ConnectionStatus aConnectionStatus);
int aFd, ConnectionStatus aConnectionStatus,
UnixSocketConnector* aConnector);
/**
* Constructs an instance of |ConnectionOrientedSocketIO|
*
* @param aConsumerThread The socket's consumer thread.
* @param aIOLoop The socket's I/O loop.
* @param aConnector Connector object for socket-type-specific methods.
*/
ConnectionOrientedSocketIO(nsIThread* aConsumerThread,
MessageLoop* aIOLoop);
MessageLoop* aIOLoop,
UnixSocketConnector* aConnector);
private:
/**
* Connector object used to create the connection we are currently using.
*/
nsAutoPtr<UnixSocketConnector> mConnector;
/**
* Number of valid bytes in |mPeerAddress|.
*/
socklen_t mPeerAddressLength;
/**
* Address of the socket's current peer.
*/
struct sockaddr_storage mPeerAddress;
};
class ConnectionOrientedSocket : public DataSocket

View File

@@ -48,21 +48,6 @@ public:
void ClearDelayedConnectTask();
void CancelDelayedConnectTask();
// Task callback methods
//
/**
* Connect to a socket
*/
void Connect();
// Methods for |ConnectionOrientedSocketIO|
//
nsresult Accept(int aFd,
const struct sockaddr* aAddress,
socklen_t aAddressLength) override;
// Methods for |DataSocket|
//
@@ -82,8 +67,6 @@ public:
void ShutdownOnIOThread() override;
private:
void FireSocketError();
/**
* Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
* directly from consumer thread. All non-consumer-thread accesses should
@@ -91,26 +74,11 @@ private:
*/
RefPtr<StreamSocket> mStreamSocket;
/**
* Connector object used to create the connection we are currently using.
*/
nsAutoPtr<UnixSocketConnector> mConnector;
/**
* If true, do not requeue whatever task we're running
*/
bool mShuttingDownOnIOThread;
/**
* Number of valid bytes in |mAddress|
*/
socklen_t mAddressLength;
/**
* Address structure of the socket currently in use
*/
struct sockaddr_storage mAddress;
/**
* Task member for delayed connect task. Should only be access on consumer
* thread.
@@ -127,15 +95,12 @@ StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector)
: ConnectionOrientedSocketIO(aConsumerThread, aIOLoop)
: ConnectionOrientedSocketIO(aConsumerThread, aIOLoop, aConnector)
, mStreamSocket(aStreamSocket)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
, mAddressLength(0)
, mDelayedConnectTask(nullptr)
{
MOZ_ASSERT(mStreamSocket);
MOZ_ASSERT(mConnector);
}
StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
@@ -146,15 +111,13 @@ StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
: ConnectionOrientedSocketIO(aConsumerThread,
aIOLoop,
aFd,
aConnectionStatus)
aConnectionStatus,
aConnector)
, mStreamSocket(aStreamSocket)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
, mAddressLength(0)
, mDelayedConnectTask(nullptr)
{
MOZ_ASSERT(mStreamSocket);
MOZ_ASSERT(mConnector);
}
StreamSocketIO::~StreamSocketIO()
@@ -204,73 +167,6 @@ StreamSocketIO::CancelDelayedConnectTask()
ClearDelayedConnectTask();
}
void
StreamSocketIO::Connect()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(mConnector);
MOZ_ASSERT(!IsOpen());
struct sockaddr* address = reinterpret_cast<struct sockaddr*>(&mAddress);
mAddressLength = sizeof(mAddress);
int fd;
nsresult rv = mConnector->CreateStreamSocket(address, &mAddressLength, fd);
if (NS_FAILED(rv)) {
FireSocketError();
return;
}
SetFd(fd);
// calls OnConnected() on success, or OnError() otherwise
rv = UnixSocketWatcher::Connect(address, mAddressLength);
NS_WARN_IF(NS_FAILED(rv));
}
void
StreamSocketIO::FireSocketError()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
// Clean up watchers, statuses, fds
Close();
// Tell the consumer thread we've errored
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
}
// |ConnectionOrientedSocketIO|
nsresult
StreamSocketIO::Accept(int aFd,
const struct sockaddr* aAddress,
socklen_t aAddressLength)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTING);
SetSocket(aFd, SOCKET_IS_CONNECTED);
// Address setup
mAddressLength = aAddressLength;
memcpy(&mAddress, aAddress, mAddressLength);
// Signal success
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
return NS_OK;
}
// |DataSocketIO|
nsresult