Files
tubestation/ipc/unixsocket/StreamSocket.cpp
Thomas Zimmermann 3297a98bf1 Bug 1168806: Configurable consumer thread for socket IPC classes, r=kmachulis
The consumer thread handles socket creation, destruction, and
data processing for socket IPC. Traditionally this has been
done on the main thread.

This patch extends the socket IPC classes to support arbitrary
consumer threads. The thread is configured when establishing a
connection, and performs all of the above operations until the
socket is closed.
2015-06-02 10:01:57 +02:00

641 lines
14 KiB
C++

/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "StreamSocket.h"
#include <fcntl.h>
#include "mozilla/RefPtr.h"
#include "nsXULAppAPI.h"
#include "StreamSocketConsumer.h"
#include "UnixSocketConnector.h"
static const size_t MAX_READ_SIZE = 1 << 16;
namespace mozilla {
namespace ipc {
//
// StreamSocketIO
//
class StreamSocketIO final
: public UnixSocketWatcher
, public ConnectionOrientedSocketIO
{
public:
class ConnectTask;
class DelayedConnectTask;
class ReceiveRunnable;
StreamSocketIO(nsIThread* aConsumerThread,
MessageLoop* mIOLoop,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector);
StreamSocketIO(nsIThread* aConsumerThread,
MessageLoop* mIOLoop, int aFd,
ConnectionStatus aConnectionStatus,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector);
~StreamSocketIO();
StreamSocket* GetStreamSocket();
DataSocket* GetDataSocket();
// Delayed-task handling
//
void SetDelayedConnectTask(CancelableTask* aTask);
void ClearDelayedConnectTask();
void CancelDelayedConnectTask();
// Task callback methods
//
/**
* Connect to a socket
*/
void Connect();
void Send(UnixSocketIOBuffer* aBuffer);
// I/O callback methods
//
void OnConnected() override;
void OnError(const char* aFunction, int aErrno) override;
void OnListening() override;
void OnSocketCanReceiveWithoutBlocking() override;
void OnSocketCanSendWithoutBlocking() override;
// Methods for |ConnectionOrientedSocketIO|
//
nsresult Accept(int aFd,
const struct sockaddr* aAddress,
socklen_t aAddressLength) override;
// Methods for |DataSocket|
//
nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer) override;
void ConsumeBuffer() override;
void DiscardBuffer() override;
// Methods for |SocketIOBase|
//
SocketBase* GetSocketBase() override;
bool IsShutdownOnMainThread() const override;
bool IsShutdownOnIOThread() const override;
void ShutdownOnMainThread() override;
void ShutdownOnIOThread() override;
private:
void FireSocketError();
/**
* Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
* directly from main thread. All non-main-thread accesses should happen with
* mIO as container.
*/
RefPtr<StreamSocket> mStreamSocket;
/**
* Connector object used to create the connection we are currently using.
*/
nsAutoPtr<UnixSocketConnector> mConnector;
/**
* If true, do not requeue whatever task we're running
*/
bool mShuttingDownOnIOThread;
/**
* Number of valid bytes in |mAddress|
*/
socklen_t mAddressLength;
/**
* Address structure of the socket currently in use
*/
struct sockaddr_storage mAddress;
/**
* Task member for delayed connect task. Should only be access on main thread.
*/
CancelableTask* mDelayedConnectTask;
/**
* I/O buffer for received data
*/
nsAutoPtr<UnixSocketRawData> mBuffer;
};
StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector)
: UnixSocketWatcher(aIOLoop)
, ConnectionOrientedSocketIO(aConsumerThread)
, mStreamSocket(aStreamSocket)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
, mAddressLength(0)
, mDelayedConnectTask(nullptr)
{
MOZ_ASSERT(mStreamSocket);
MOZ_ASSERT(mConnector);
}
StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
int aFd, ConnectionStatus aConnectionStatus,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector)
: UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus)
, ConnectionOrientedSocketIO(aConsumerThread)
, mStreamSocket(aStreamSocket)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
, mAddressLength(0)
, mDelayedConnectTask(nullptr)
{
MOZ_ASSERT(mStreamSocket);
MOZ_ASSERT(mConnector);
}
StreamSocketIO::~StreamSocketIO()
{
MOZ_ASSERT(IsConsumerThread());
MOZ_ASSERT(IsShutdownOnMainThread());
}
StreamSocket*
StreamSocketIO::GetStreamSocket()
{
return mStreamSocket.get();
}
DataSocket*
StreamSocketIO::GetDataSocket()
{
return mStreamSocket.get();
}
void
StreamSocketIO::SetDelayedConnectTask(CancelableTask* aTask)
{
MOZ_ASSERT(IsConsumerThread());
mDelayedConnectTask = aTask;
}
void
StreamSocketIO::ClearDelayedConnectTask()
{
MOZ_ASSERT(IsConsumerThread());
mDelayedConnectTask = nullptr;
}
void
StreamSocketIO::CancelDelayedConnectTask()
{
MOZ_ASSERT(IsConsumerThread());
if (!mDelayedConnectTask) {
return;
}
mDelayedConnectTask->Cancel();
ClearDelayedConnectTask();
}
void
StreamSocketIO::Connect()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(mConnector);
MOZ_ASSERT(!IsOpen());
struct sockaddr* address = reinterpret_cast<struct sockaddr*>(&mAddress);
mAddressLength = sizeof(mAddress);
int fd;
nsresult rv = mConnector->CreateStreamSocket(address, &mAddressLength, fd);
if (NS_FAILED(rv)) {
FireSocketError();
return;
}
SetFd(fd);
// calls OnConnected() on success, or OnError() otherwise
rv = UnixSocketWatcher::Connect(address, mAddressLength);
NS_WARN_IF(NS_FAILED(rv));
}
void
StreamSocketIO::Send(UnixSocketIOBuffer* aData)
{
EnqueueData(aData);
AddWatchers(WRITE_WATCHER, false);
}
void
StreamSocketIO::OnConnected()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void
StreamSocketIO::OnListening()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
NS_NOTREACHED("Invalid call to |StreamSocketIO::OnListening|");
}
void
StreamSocketIO::OnError(const char* aFunction, int aErrno)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
UnixFdWatcher::OnError(aFunction, aErrno);
FireSocketError();
}
void
StreamSocketIO::OnSocketCanReceiveWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
ssize_t res = ReceiveData(GetFd());
if (res < 0) {
/* I/O error */
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
} else if (!res) {
/* EOF or peer shutdown */
RemoveWatchers(READ_WATCHER);
}
}
void
StreamSocketIO::OnSocketCanSendWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
nsresult rv = SendPendingData(GetFd());
if (NS_FAILED(rv)) {
return;
}
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void
StreamSocketIO::FireSocketError()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
// Clean up watchers, statuses, fds
Close();
// Tell the main thread we've errored
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
}
// |ConnectionOrientedSocketIO|
nsresult
StreamSocketIO::Accept(int aFd,
const struct sockaddr* aAddress,
socklen_t aAddressLength)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTING);
SetSocket(aFd, SOCKET_IS_CONNECTED);
// Address setup
mAddressLength = aAddressLength;
memcpy(&mAddress, aAddress, mAddressLength);
// Signal success
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
return NS_OK;
}
// |DataSocketIO|
nsresult
StreamSocketIO::QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer)
{
MOZ_ASSERT(aBuffer);
if (!mBuffer) {
mBuffer = new UnixSocketRawData(MAX_READ_SIZE);
}
*aBuffer = mBuffer.get();
return NS_OK;
}
/**
* |ReceiveRunnable| transfers data received on the I/O thread
* to an instance of |StreamSocket| on the main thread.
*/
class StreamSocketIO::ReceiveRunnable final
: public SocketIORunnable<StreamSocketIO>
{
public:
ReceiveRunnable(StreamSocketIO* aIO, UnixSocketBuffer* aBuffer)
: SocketIORunnable<StreamSocketIO>(aIO)
, mBuffer(aBuffer)
{ }
NS_IMETHOD Run() override
{
StreamSocketIO* io = SocketIORunnable<StreamSocketIO>::GetIO();
MOZ_ASSERT(io->IsConsumerThread());
if (NS_WARN_IF(io->IsShutdownOnMainThread())) {
// Since we've already explicitly closed and the close
// happened before this, this isn't really an error.
return NS_OK;
}
StreamSocket* streamSocket = io->GetStreamSocket();
MOZ_ASSERT(streamSocket);
streamSocket->ReceiveSocketData(mBuffer);
return NS_OK;
}
private:
nsAutoPtr<UnixSocketBuffer> mBuffer;
};
void
StreamSocketIO::ConsumeBuffer()
{
GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
NS_DISPATCH_NORMAL);
}
void
StreamSocketIO::DiscardBuffer()
{
// Nothing to do.
}
// |SocketIOBase|
SocketBase*
StreamSocketIO::GetSocketBase()
{
return GetDataSocket();
}
bool
StreamSocketIO::IsShutdownOnMainThread() const
{
MOZ_ASSERT(IsConsumerThread());
return mStreamSocket == nullptr;
}
bool
StreamSocketIO::IsShutdownOnIOThread() const
{
return mShuttingDownOnIOThread;
}
void
StreamSocketIO::ShutdownOnMainThread()
{
MOZ_ASSERT(IsConsumerThread());
MOZ_ASSERT(!IsShutdownOnMainThread());
mStreamSocket = nullptr;
}
void
StreamSocketIO::ShutdownOnIOThread()
{
MOZ_ASSERT(!IsConsumerThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
Close(); // will also remove fd from I/O loop
mShuttingDownOnIOThread = true;
}
//
// Socket tasks
//
class StreamSocketIO::ConnectTask final
: public SocketIOTask<StreamSocketIO>
{
public:
ConnectTask(StreamSocketIO* aIO)
: SocketIOTask<StreamSocketIO>(aIO)
{ }
void Run() override
{
MOZ_ASSERT(!GetIO()->IsConsumerThread());
MOZ_ASSERT(!IsCanceled());
GetIO()->Connect();
}
};
class StreamSocketIO::DelayedConnectTask final
: public SocketIOTask<StreamSocketIO>
{
public:
DelayedConnectTask(StreamSocketIO* aIO)
: SocketIOTask<StreamSocketIO>(aIO)
{ }
void Run() override
{
MOZ_ASSERT(GetIO()->IsConsumerThread());
if (IsCanceled()) {
return;
}
StreamSocketIO* io = GetIO();
if (io->IsShutdownOnMainThread()) {
return;
}
io->ClearDelayedConnectTask();
io->GetIOLoop()->PostTask(FROM_HERE, new ConnectTask(io));
}
};
//
// StreamSocket
//
StreamSocket::StreamSocket(StreamSocketConsumer* aConsumer, int aIndex)
: mIO(nullptr)
, mConsumer(aConsumer)
, mIndex(aIndex)
{
MOZ_ASSERT(mConsumer);
}
StreamSocket::~StreamSocket()
{
MOZ_ASSERT(!mIO);
}
void
StreamSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
{
mConsumer->ReceiveSocketData(mIndex, aBuffer);
}
nsresult
StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs,
nsIThread* aConsumerThread, MessageLoop* aIOLoop)
{
MOZ_ASSERT(!mIO);
mIO = new StreamSocketIO(aConsumerThread, aIOLoop, this, aConnector);
SetConnectionStatus(SOCKET_CONNECTING);
if (aDelayMs > 0) {
StreamSocketIO::DelayedConnectTask* connectTask =
new StreamSocketIO::DelayedConnectTask(mIO);
mIO->SetDelayedConnectTask(connectTask);
MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs);
} else {
aIOLoop->PostTask(FROM_HERE, new StreamSocketIO::ConnectTask(mIO));
}
return NS_OK;
}
nsresult
StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs)
{
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop());
}
// |ConnectionOrientedSocket|
nsresult
StreamSocket::PrepareAccept(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
ConnectionOrientedSocketIO*& aIO)
{
MOZ_ASSERT(!mIO);
MOZ_ASSERT(aConnector);
SetConnectionStatus(SOCKET_CONNECTING);
mIO = new StreamSocketIO(aConsumerThread, aIOLoop,
-1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
this, aConnector);
aIO = mIO;
return NS_OK;
}
// |DataSocket|
void
StreamSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
{
MOZ_ASSERT(mIO);
MOZ_ASSERT(mIO->IsConsumerThread());
MOZ_ASSERT(!mIO->IsShutdownOnMainThread());
mIO->GetIOLoop()->PostTask(
FROM_HERE,
new SocketIOSendTask<StreamSocketIO, UnixSocketIOBuffer>(mIO, aBuffer));
}
// |SocketBase|
void
StreamSocket::Close()
{
MOZ_ASSERT(mIO);
MOZ_ASSERT(mIO->IsConsumerThread());
mIO->CancelDelayedConnectTask();
// From this point on, we consider |mIO| as being deleted. We sever
// the relationship here so any future calls to |Connect| will create
// a new I/O object.
mIO->ShutdownOnMainThread();
mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
mIO = nullptr;
NotifyDisconnect();
}
void
StreamSocket::OnConnectSuccess()
{
mConsumer->OnConnectSuccess(mIndex);
}
void
StreamSocket::OnConnectError()
{
mConsumer->OnConnectError(mIndex);
}
void
StreamSocket::OnDisconnect()
{
mConsumer->OnDisconnect(mIndex);
}
} // namespace ipc
} // namespace mozilla