Bug 1046109: Add |SocketIOBase|, r=kyle
|SocketIOBase| is a base class for Socket I/O classes. It's not a requirement, but provides a number of helpful methods for common I/O operations on the I/O thread.
This commit is contained in:
@@ -9,11 +9,6 @@
|
||||
#include "nsXULAppAPI.h"
|
||||
#include <fcntl.h>
|
||||
|
||||
#ifdef MOZ_TASK_TRACER
|
||||
#include "GeckoTaskTracer.h"
|
||||
using namespace mozilla::tasktracer;
|
||||
#endif
|
||||
|
||||
static const size_t MAX_READ_SIZE = 1 << 16;
|
||||
|
||||
namespace mozilla {
|
||||
@@ -24,12 +19,14 @@ namespace ipc {
|
||||
//
|
||||
|
||||
class UnixSocketImpl : public UnixSocketWatcher
|
||||
, protected SocketIOBase
|
||||
{
|
||||
public:
|
||||
UnixSocketImpl(MessageLoop* mIOLoop,
|
||||
UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector,
|
||||
const nsACString& aAddress)
|
||||
: UnixSocketWatcher(mIOLoop)
|
||||
, SocketIOBase(MAX_READ_SIZE)
|
||||
, mConsumer(aConsumer)
|
||||
, mConnector(aConnector)
|
||||
, mShuttingDownOnIOThread(false)
|
||||
@@ -44,9 +41,9 @@ public:
|
||||
MOZ_ASSERT(IsShutdownOnMainThread());
|
||||
}
|
||||
|
||||
void QueueWriteData(UnixSocketRawData* aData)
|
||||
void Send(UnixSocketRawData* aData)
|
||||
{
|
||||
mOutgoingQ.AppendElement(aData);
|
||||
EnqueueData(aData);
|
||||
AddWatchers(WRITE_WATCHER, false);
|
||||
}
|
||||
|
||||
@@ -145,12 +142,6 @@ private:
|
||||
|
||||
void FireSocketError();
|
||||
|
||||
/**
|
||||
* Raw data queue. Must be pushed/popped from IO thread only.
|
||||
*/
|
||||
typedef nsTArray<UnixSocketRawData* > UnixSocketRawDataQueue;
|
||||
UnixSocketRawDataQueue mOutgoingQ;
|
||||
|
||||
/**
|
||||
* Connector object used to create the connection we are currently using.
|
||||
*/
|
||||
@@ -228,7 +219,7 @@ public:
|
||||
UnixSocketImpl* impl = GetImpl();
|
||||
MOZ_ASSERT(!impl->IsShutdownOnIOThread());
|
||||
|
||||
impl->QueueWriteData(mData);
|
||||
impl->Send(mData);
|
||||
}
|
||||
private:
|
||||
nsRefPtr<UnixSocketConsumer> mConsumer;
|
||||
@@ -463,7 +454,7 @@ UnixSocketImpl::OnAccepted(int aFd,
|
||||
NS_DispatchToMainThread(r);
|
||||
|
||||
AddWatchers(READ_WATCHER, true);
|
||||
if (!mOutgoingQ.IsEmpty()) {
|
||||
if (HasPendingData()) {
|
||||
AddWatchers(WRITE_WATCHER, false);
|
||||
}
|
||||
}
|
||||
@@ -492,7 +483,7 @@ UnixSocketImpl::OnConnected()
|
||||
NS_DispatchToMainThread(r);
|
||||
|
||||
AddWatchers(READ_WATCHER, true);
|
||||
if (!mOutgoingQ.IsEmpty()) {
|
||||
if (HasPendingData()) {
|
||||
AddWatchers(WRITE_WATCHER, false);
|
||||
}
|
||||
}
|
||||
@@ -527,51 +518,10 @@ UnixSocketImpl::OnSocketCanReceiveWithoutBlocking()
|
||||
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
|
||||
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
|
||||
|
||||
// Read all of the incoming data.
|
||||
while (true) {
|
||||
nsAutoPtr<UnixSocketRawData> incoming(new UnixSocketRawData(MAX_READ_SIZE));
|
||||
|
||||
ssize_t ret = read(GetFd(), incoming->mData, incoming->mSize);
|
||||
if (ret <= 0) {
|
||||
if (ret == -1) {
|
||||
if (errno == EINTR) {
|
||||
continue; // retry system call when interrupted
|
||||
}
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
return; // no data available: return and re-poll
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
NS_WARNING("Cannot read from network");
|
||||
#endif
|
||||
// else fall through to error handling on other errno's
|
||||
}
|
||||
|
||||
// We're done with our descriptors. Ensure that spurious events don't
|
||||
// cause us to end up back here.
|
||||
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
|
||||
nsRefPtr<nsRunnable> r =
|
||||
new SocketIORequestClosingRunnable<UnixSocketImpl>(this);
|
||||
NS_DispatchToMainThread(r);
|
||||
return;
|
||||
}
|
||||
|
||||
#ifdef MOZ_TASK_TRACER
|
||||
// Make unix socket creation events to be the source events of TaskTracer,
|
||||
// and originate the rest correlation tasks from here.
|
||||
AutoSourceEvent taskTracerEvent(SourceEventType::UNIXSOCKET);
|
||||
#endif
|
||||
|
||||
incoming->mSize = ret;
|
||||
nsRefPtr<nsRunnable> r =
|
||||
new SocketIOReceiveRunnable<UnixSocketImpl>(this, incoming.forget());
|
||||
NS_DispatchToMainThread(r);
|
||||
|
||||
// If ret is less than MAX_READ_SIZE, there's no
|
||||
// more data in the socket for us to read now.
|
||||
if (ret < ssize_t(MAX_READ_SIZE)) {
|
||||
return;
|
||||
}
|
||||
nsresult rv = ReceiveData(GetFd(), this);
|
||||
if (NS_FAILED(rv)) {
|
||||
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -581,40 +531,13 @@ UnixSocketImpl::OnSocketCanSendWithoutBlocking()
|
||||
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
|
||||
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
|
||||
|
||||
// Try to write the bytes of mCurrentRilRawData. If all were written, continue.
|
||||
//
|
||||
// Otherwise, save the byte position of the next byte to write
|
||||
// within mCurrentWriteOffset, and request another write when the
|
||||
// system won't block.
|
||||
//
|
||||
while (true) {
|
||||
UnixSocketRawData* data;
|
||||
if (mOutgoingQ.IsEmpty()) {
|
||||
return;
|
||||
}
|
||||
data = mOutgoingQ.ElementAt(0);
|
||||
const uint8_t *toWrite;
|
||||
toWrite = data->mData;
|
||||
nsresult rv = SendPendingData(GetFd(), this);
|
||||
if (NS_FAILED(rv)) {
|
||||
return;
|
||||
}
|
||||
|
||||
while (data->mCurrentWriteOffset < data->mSize) {
|
||||
ssize_t write_amount = data->mSize - data->mCurrentWriteOffset;
|
||||
ssize_t written;
|
||||
written = write (GetFd(), toWrite + data->mCurrentWriteOffset,
|
||||
write_amount);
|
||||
if (written > 0) {
|
||||
data->mCurrentWriteOffset += written;
|
||||
}
|
||||
if (written != write_amount) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (data->mCurrentWriteOffset != data->mSize) {
|
||||
AddWatchers(WRITE_WATCHER, false);
|
||||
return;
|
||||
}
|
||||
mOutgoingQ.RemoveElementAt(0);
|
||||
delete data;
|
||||
if (HasPendingData()) {
|
||||
AddWatchers(WRITE_WATCHER, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user