Backed out changeset 0bf18526200b (bug 1046109)
This commit is contained in:
@@ -122,34 +122,5 @@ SocketConsumerBase::SetConnectionStatus(
|
|||||||
mConnectionStatus = aConnectionStatus;
|
mConnectionStatus = aConnectionStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// SocketIOBase
|
|
||||||
//
|
|
||||||
|
|
||||||
SocketIOBase::~SocketIOBase()
|
|
||||||
{ }
|
|
||||||
|
|
||||||
void
|
|
||||||
SocketIOBase::EnqueueData(UnixSocketRawData* aData)
|
|
||||||
{
|
|
||||||
if (!aData->mSize) {
|
|
||||||
delete aData; // delete empty data immediately
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
mOutgoingQ.AppendElement(aData);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool
|
|
||||||
SocketIOBase::HasPendingData() const
|
|
||||||
{
|
|
||||||
return !mOutgoingQ.IsEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
SocketIOBase::SocketIOBase(size_t aMaxReadSize)
|
|
||||||
: mMaxReadSize(aMaxReadSize)
|
|
||||||
{
|
|
||||||
MOZ_ASSERT(mMaxReadSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,16 +9,9 @@
|
|||||||
#ifndef mozilla_ipc_SocketBase_h
|
#ifndef mozilla_ipc_SocketBase_h
|
||||||
#define mozilla_ipc_SocketBase_h
|
#define mozilla_ipc_SocketBase_h
|
||||||
|
|
||||||
#include "base/message_loop.h"
|
|
||||||
#include "nsAutoPtr.h"
|
#include "nsAutoPtr.h"
|
||||||
#include "nsTArray.h"
|
|
||||||
#include "nsThreadUtils.h"
|
#include "nsThreadUtils.h"
|
||||||
|
|
||||||
#ifdef MOZ_TASK_TRACER
|
|
||||||
#include "GeckoTaskTracer.h"
|
|
||||||
using namespace mozilla::tasktracer;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
namespace mozilla {
|
namespace mozilla {
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
|
|
||||||
@@ -304,119 +297,6 @@ private:
|
|||||||
nsAutoPtr<T> mInstance;
|
nsAutoPtr<T> mInstance;
|
||||||
};
|
};
|
||||||
|
|
||||||
//
|
|
||||||
// SocketIOBase
|
|
||||||
//
|
|
||||||
|
|
||||||
/* |SocketIOBase| is a base class for Socket I/O classes that
|
|
||||||
* perform operations on the I/O thread. It provides methds
|
|
||||||
* for the most common read and write scenarios.
|
|
||||||
*/
|
|
||||||
class SocketIOBase
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
virtual ~SocketIOBase();
|
|
||||||
|
|
||||||
void EnqueueData(UnixSocketRawData* aData);
|
|
||||||
bool HasPendingData() const;
|
|
||||||
|
|
||||||
template <typename T>
|
|
||||||
nsresult ReceiveData(int aFd, T* aIO)
|
|
||||||
{
|
|
||||||
MOZ_ASSERT(aFd >= 0);
|
|
||||||
MOZ_ASSERT(aIO);
|
|
||||||
|
|
||||||
do {
|
|
||||||
nsAutoPtr<UnixSocketRawData> incoming(
|
|
||||||
new UnixSocketRawData(mMaxReadSize));
|
|
||||||
|
|
||||||
ssize_t res =
|
|
||||||
TEMP_FAILURE_RETRY(read(aFd, incoming->mData, incoming->mSize));
|
|
||||||
|
|
||||||
if (res < 0) {
|
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
||||||
return NS_OK; /* no more data available */
|
|
||||||
}
|
|
||||||
/* an error occored */
|
|
||||||
nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO);
|
|
||||||
NS_DispatchToMainThread(r);
|
|
||||||
return NS_ERROR_FAILURE;
|
|
||||||
} else if (!res) {
|
|
||||||
/* EOF or peer shut down sending */
|
|
||||||
nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO);
|
|
||||||
NS_DispatchToMainThread(r);
|
|
||||||
return NS_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef MOZ_TASK_TRACER
|
|
||||||
// Make unix socket creation events to be the source events of TaskTracer,
|
|
||||||
// and originate the rest correlation tasks from here.
|
|
||||||
AutoSourceEvent taskTracerEvent(SourceEventType::UNIXSOCKET);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
incoming->mSize = res;
|
|
||||||
nsRefPtr<nsRunnable> r =
|
|
||||||
new SocketIOReceiveRunnable<T>(aIO, incoming.forget());
|
|
||||||
NS_DispatchToMainThread(r);
|
|
||||||
} while (true);
|
|
||||||
|
|
||||||
return NS_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename T>
|
|
||||||
nsresult SendPendingData(int aFd, T* aIO)
|
|
||||||
{
|
|
||||||
MOZ_ASSERT(aFd >= 0);
|
|
||||||
MOZ_ASSERT(aIO);
|
|
||||||
|
|
||||||
do {
|
|
||||||
if (!HasPendingData()) {
|
|
||||||
return NS_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
UnixSocketRawData* outgoing = mOutgoingQ.ElementAt(0);
|
|
||||||
MOZ_ASSERT(outgoing->mSize);
|
|
||||||
|
|
||||||
const uint8_t* data = outgoing->mData + outgoing->mCurrentWriteOffset;
|
|
||||||
size_t size = outgoing->mSize - outgoing->mCurrentWriteOffset;
|
|
||||||
|
|
||||||
ssize_t res = TEMP_FAILURE_RETRY(write(aFd, data, size));
|
|
||||||
|
|
||||||
if (res < 0) {
|
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
||||||
return NS_OK; /* no more data available */
|
|
||||||
}
|
|
||||||
/* an error occored */
|
|
||||||
nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO);
|
|
||||||
NS_DispatchToMainThread(r);
|
|
||||||
return NS_ERROR_FAILURE;
|
|
||||||
} else if (!res) {
|
|
||||||
return NS_OK; /* nothing written */
|
|
||||||
}
|
|
||||||
|
|
||||||
outgoing->mCurrentWriteOffset += res;
|
|
||||||
|
|
||||||
if (outgoing->mCurrentWriteOffset == outgoing->mSize) {
|
|
||||||
mOutgoingQ.RemoveElementAt(0);
|
|
||||||
delete data;
|
|
||||||
}
|
|
||||||
} while (true);
|
|
||||||
|
|
||||||
return NS_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected:
|
|
||||||
SocketIOBase(size_t aMaxReadSize);
|
|
||||||
|
|
||||||
private:
|
|
||||||
const size_t mMaxReadSize;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Raw data queue. Must be pushed/popped from I/O thread only.
|
|
||||||
*/
|
|
||||||
nsTArray<UnixSocketRawData*> mOutgoingQ;
|
|
||||||
};
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,6 +9,11 @@
|
|||||||
#include "nsXULAppAPI.h"
|
#include "nsXULAppAPI.h"
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
|
|
||||||
|
#ifdef MOZ_TASK_TRACER
|
||||||
|
#include "GeckoTaskTracer.h"
|
||||||
|
using namespace mozilla::tasktracer;
|
||||||
|
#endif
|
||||||
|
|
||||||
static const size_t MAX_READ_SIZE = 1 << 16;
|
static const size_t MAX_READ_SIZE = 1 << 16;
|
||||||
|
|
||||||
namespace mozilla {
|
namespace mozilla {
|
||||||
@@ -19,14 +24,12 @@ namespace ipc {
|
|||||||
//
|
//
|
||||||
|
|
||||||
class UnixSocketImpl : public UnixSocketWatcher
|
class UnixSocketImpl : public UnixSocketWatcher
|
||||||
, protected SocketIOBase
|
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
UnixSocketImpl(MessageLoop* mIOLoop,
|
UnixSocketImpl(MessageLoop* mIOLoop,
|
||||||
UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector,
|
UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector,
|
||||||
const nsACString& aAddress)
|
const nsACString& aAddress)
|
||||||
: UnixSocketWatcher(mIOLoop)
|
: UnixSocketWatcher(mIOLoop)
|
||||||
, SocketIOBase(MAX_READ_SIZE)
|
|
||||||
, mConsumer(aConsumer)
|
, mConsumer(aConsumer)
|
||||||
, mConnector(aConnector)
|
, mConnector(aConnector)
|
||||||
, mShuttingDownOnIOThread(false)
|
, mShuttingDownOnIOThread(false)
|
||||||
@@ -41,9 +44,9 @@ public:
|
|||||||
MOZ_ASSERT(IsShutdownOnMainThread());
|
MOZ_ASSERT(IsShutdownOnMainThread());
|
||||||
}
|
}
|
||||||
|
|
||||||
void Send(UnixSocketRawData* aData)
|
void QueueWriteData(UnixSocketRawData* aData)
|
||||||
{
|
{
|
||||||
EnqueueData(aData);
|
mOutgoingQ.AppendElement(aData);
|
||||||
AddWatchers(WRITE_WATCHER, false);
|
AddWatchers(WRITE_WATCHER, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -142,6 +145,12 @@ private:
|
|||||||
|
|
||||||
void FireSocketError();
|
void FireSocketError();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Raw data queue. Must be pushed/popped from IO thread only.
|
||||||
|
*/
|
||||||
|
typedef nsTArray<UnixSocketRawData* > UnixSocketRawDataQueue;
|
||||||
|
UnixSocketRawDataQueue mOutgoingQ;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connector object used to create the connection we are currently using.
|
* Connector object used to create the connection we are currently using.
|
||||||
*/
|
*/
|
||||||
@@ -219,7 +228,7 @@ public:
|
|||||||
UnixSocketImpl* impl = GetImpl();
|
UnixSocketImpl* impl = GetImpl();
|
||||||
MOZ_ASSERT(!impl->IsShutdownOnIOThread());
|
MOZ_ASSERT(!impl->IsShutdownOnIOThread());
|
||||||
|
|
||||||
impl->Send(mData);
|
impl->QueueWriteData(mData);
|
||||||
}
|
}
|
||||||
private:
|
private:
|
||||||
nsRefPtr<UnixSocketConsumer> mConsumer;
|
nsRefPtr<UnixSocketConsumer> mConsumer;
|
||||||
@@ -454,7 +463,7 @@ UnixSocketImpl::OnAccepted(int aFd,
|
|||||||
NS_DispatchToMainThread(r);
|
NS_DispatchToMainThread(r);
|
||||||
|
|
||||||
AddWatchers(READ_WATCHER, true);
|
AddWatchers(READ_WATCHER, true);
|
||||||
if (HasPendingData()) {
|
if (!mOutgoingQ.IsEmpty()) {
|
||||||
AddWatchers(WRITE_WATCHER, false);
|
AddWatchers(WRITE_WATCHER, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -483,7 +492,7 @@ UnixSocketImpl::OnConnected()
|
|||||||
NS_DispatchToMainThread(r);
|
NS_DispatchToMainThread(r);
|
||||||
|
|
||||||
AddWatchers(READ_WATCHER, true);
|
AddWatchers(READ_WATCHER, true);
|
||||||
if (HasPendingData()) {
|
if (!mOutgoingQ.IsEmpty()) {
|
||||||
AddWatchers(WRITE_WATCHER, false);
|
AddWatchers(WRITE_WATCHER, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -518,10 +527,51 @@ UnixSocketImpl::OnSocketCanReceiveWithoutBlocking()
|
|||||||
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
|
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
|
||||||
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
|
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
|
||||||
|
|
||||||
nsresult rv = ReceiveData(GetFd(), this);
|
// Read all of the incoming data.
|
||||||
if (NS_FAILED(rv)) {
|
while (true) {
|
||||||
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
|
nsAutoPtr<UnixSocketRawData> incoming(new UnixSocketRawData(MAX_READ_SIZE));
|
||||||
return;
|
|
||||||
|
ssize_t ret = read(GetFd(), incoming->mData, incoming->mSize);
|
||||||
|
if (ret <= 0) {
|
||||||
|
if (ret == -1) {
|
||||||
|
if (errno == EINTR) {
|
||||||
|
continue; // retry system call when interrupted
|
||||||
|
}
|
||||||
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
|
return; // no data available: return and re-poll
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
NS_WARNING("Cannot read from network");
|
||||||
|
#endif
|
||||||
|
// else fall through to error handling on other errno's
|
||||||
|
}
|
||||||
|
|
||||||
|
// We're done with our descriptors. Ensure that spurious events don't
|
||||||
|
// cause us to end up back here.
|
||||||
|
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
|
||||||
|
nsRefPtr<nsRunnable> r =
|
||||||
|
new SocketIORequestClosingRunnable<UnixSocketImpl>(this);
|
||||||
|
NS_DispatchToMainThread(r);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef MOZ_TASK_TRACER
|
||||||
|
// Make unix socket creation events to be the source events of TaskTracer,
|
||||||
|
// and originate the rest correlation tasks from here.
|
||||||
|
AutoSourceEvent taskTracerEvent(SourceEventType::UNIXSOCKET);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
incoming->mSize = ret;
|
||||||
|
nsRefPtr<nsRunnable> r =
|
||||||
|
new SocketIOReceiveRunnable<UnixSocketImpl>(this, incoming.forget());
|
||||||
|
NS_DispatchToMainThread(r);
|
||||||
|
|
||||||
|
// If ret is less than MAX_READ_SIZE, there's no
|
||||||
|
// more data in the socket for us to read now.
|
||||||
|
if (ret < ssize_t(MAX_READ_SIZE)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -531,13 +581,40 @@ UnixSocketImpl::OnSocketCanSendWithoutBlocking()
|
|||||||
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
|
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
|
||||||
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
|
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
|
||||||
|
|
||||||
nsresult rv = SendPendingData(GetFd(), this);
|
// Try to write the bytes of mCurrentRilRawData. If all were written, continue.
|
||||||
if (NS_FAILED(rv)) {
|
//
|
||||||
return;
|
// Otherwise, save the byte position of the next byte to write
|
||||||
}
|
// within mCurrentWriteOffset, and request another write when the
|
||||||
|
// system won't block.
|
||||||
|
//
|
||||||
|
while (true) {
|
||||||
|
UnixSocketRawData* data;
|
||||||
|
if (mOutgoingQ.IsEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
data = mOutgoingQ.ElementAt(0);
|
||||||
|
const uint8_t *toWrite;
|
||||||
|
toWrite = data->mData;
|
||||||
|
|
||||||
if (HasPendingData()) {
|
while (data->mCurrentWriteOffset < data->mSize) {
|
||||||
AddWatchers(WRITE_WATCHER, false);
|
ssize_t write_amount = data->mSize - data->mCurrentWriteOffset;
|
||||||
|
ssize_t written;
|
||||||
|
written = write (GetFd(), toWrite + data->mCurrentWriteOffset,
|
||||||
|
write_amount);
|
||||||
|
if (written > 0) {
|
||||||
|
data->mCurrentWriteOffset += written;
|
||||||
|
}
|
||||||
|
if (written != write_amount) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data->mCurrentWriteOffset != data->mSize) {
|
||||||
|
AddWatchers(WRITE_WATCHER, false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
mOutgoingQ.RemoveElementAt(0);
|
||||||
|
delete data;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user