Combine AsyncChannel, SyncChannel, and RPCChannel into one class (bug 901789, r=cjones,bent).

This commit is contained in:
David Anderson
2013-09-27 18:42:08 -07:00
parent b4cef087f5
commit d2c237dd72
44 changed files with 2721 additions and 2117 deletions

View File

@@ -18,4 +18,4 @@
# Modifying this file will now automatically clobber the buildbot machines \o/
#
Bug 913260 needed a clobber to not break tons of tests
Bug 901789 needs a clobber.

View File

@@ -468,7 +468,7 @@ inline
already_AddRefed<nsIDOMBlob>
GetBlobFromParams(const SlicedBlobConstructorParams& aParams)
{
static_assert(ActorFlavor == mozilla::dom::ipc::Parent,
static_assert(ActorFlavor == Parent,
"No other flavor is supported here!");
BlobParent* actor =

View File

@@ -169,7 +169,7 @@ public:
typedef typename BlobTraits<ActorFlavor>::BaseType BaseType;
typedef RemoteBlob<ActorFlavor> RemoteBlobType;
typedef mozilla::ipc::IProtocolManager<
mozilla::ipc::RPCChannel::RPCListener>::ActorDestroyReason
mozilla::ipc::MessageListener>::ActorDestroyReason
ActorDestroyReason;
protected:

View File

@@ -262,7 +262,7 @@ ContentParentMemoryReporter::CollectReports(nsIMemoryReporterCallback* cb,
for (uint32_t i = 0; i < cps.Length(); i++) {
ContentParent* cp = cps[i];
AsyncChannel* channel = cp->GetIPCChannel();
MessageChannel* channel = cp->GetIPCChannel();
nsString friendlyName;
cp->FriendlyName(friendlyName);
@@ -879,7 +879,7 @@ ContentParent::ShutDownProcess(bool aCloseWithError)
}
if (aCloseWithError && !mCalledCloseWithError) {
AsyncChannel* channel = GetIPCChannel();
MessageChannel* channel = GetIPCChannel();
if (channel) {
mCalledCloseWithError = true;
channel->CloseWithError();

View File

@@ -28,7 +28,7 @@ using mozilla::gfx::SharedDIBSurface;
#include "gfxAlphaRecovery.h"
#include "mozilla/Util.h"
#include "mozilla/ipc/SyncChannel.h"
#include "mozilla/ipc/MessageChannel.h"
#include "mozilla/AutoRestore.h"
using namespace mozilla;
@@ -1405,7 +1405,7 @@ PluginInstanceChild::PluginWindowProcInternal(HWND hWnd,
WPARAM wParam,
LPARAM lParam)
{
NS_ASSERTION(!mozilla::ipc::SyncChannel::IsPumpingMessages(),
NS_ASSERTION(!mozilla::ipc::MessageChannel::IsPumpingMessages(),
"Failed to prevent a nonqueued message from running!");
PluginInstanceChild* self = reinterpret_cast<PluginInstanceChild*>(
GetProp(hWnd, kPluginInstanceChildProperty));

View File

@@ -15,7 +15,7 @@
using std::string;
using mozilla::ipc::RPCChannel;
using mozilla::ipc::MessageChannel;
namespace {
@@ -67,9 +67,9 @@ NPRemoteWindow::NPRemoteWindow() :
clipRect.right = 0;
}
RPCChannel::RacyRPCPolicy
MediateRace(const RPCChannel::Message& parent,
const RPCChannel::Message& child)
ipc::RacyRPCPolicy
MediateRace(const MessageChannel::Message& parent,
const MessageChannel::Message& child)
{
switch (parent.type()) {
case PPluginInstance::Msg_Paint__ID:
@@ -78,10 +78,10 @@ MediateRace(const RPCChannel::Message& parent,
case PPluginInstance::Msg_NPP_HandleEvent_IOSurface__ID:
// our code relies on the frame list not changing during paints and
// reflows
return RPCChannel::RRPParentWins;
return ipc::RRPParentWins;
default:
return RPCChannel::RRPChildWins;
return ipc::RRPChildWins;
}
}

View File

@@ -10,7 +10,7 @@
#include "ipc/IPCMessageUtils.h"
#include "base/message_loop.h"
#include "mozilla/ipc/RPCChannel.h"
#include "mozilla/ipc/MessageChannel.h"
#include "mozilla/ipc/CrossProcessMutex.h"
#include "gfxipc/ShadowLayerUtils.h"
@@ -43,9 +43,9 @@ enum ScriptableObjectType
Proxy
};
mozilla::ipc::RPCChannel::RacyRPCPolicy
MediateRace(const mozilla::ipc::RPCChannel::Message& parent,
const mozilla::ipc::RPCChannel::Message& child);
mozilla::ipc::RacyRPCPolicy
MediateRace(const mozilla::ipc::MessageChannel::Message& parent,
const mozilla::ipc::MessageChannel::Message& child);
std::string
MungePluginDsoPath(const std::string& path);

View File

@@ -16,7 +16,7 @@
/* This must occur *after* plugins/PluginModuleChild.h to avoid typedefs conflicts. */
#include "mozilla/Util.h"
#include "mozilla/ipc/SyncChannel.h"
#include "mozilla/ipc/MessageChannel.h"
#ifdef MOZ_WIDGET_GTK
#include <gtk/gtk.h>

View File

@@ -74,7 +74,7 @@ class PluginModuleChild : public PPluginModuleChild
{
typedef mozilla::dom::PCrashReporterChild PCrashReporterChild;
protected:
virtual mozilla::ipc::RPCChannel::RacyRPCPolicy
virtual mozilla::ipc::RacyRPCPolicy
MediateRPCRace(const Message& parent, const Message& child) MOZ_OVERRIDE
{
return MediateRace(parent, child);

View File

@@ -16,7 +16,7 @@
#include "base/process_util.h"
#include "mozilla/Attributes.h"
#include "mozilla/dom/PCrashReporterParent.h"
#include "mozilla/ipc/SyncChannel.h"
#include "mozilla/ipc/MessageChannel.h"
#include "mozilla/plugins/BrowserStreamParent.h"
#include "mozilla/plugins/PluginInstanceParent.h"
#include "mozilla/Preferences.h"
@@ -51,7 +51,7 @@
using base::KillProcess;
using mozilla::PluginLibrary;
using mozilla::ipc::SyncChannel;
using mozilla::ipc::MessageChannel;
using mozilla::dom::PCrashReporterParent;
using mozilla::dom::CrashReporterParent;
@@ -248,7 +248,7 @@ void
PluginModuleParent::SetChildTimeout(const int32_t aChildTimeout)
{
int32_t timeoutMs = (aChildTimeout > 0) ? (1000 * aChildTimeout) :
SyncChannel::kNoTimeout;
MessageChannel::kNoTimeout;
SetReplyTimeoutMs(timeoutMs);
}

View File

@@ -134,7 +134,7 @@ public:
#endif // XP_WIN
protected:
virtual mozilla::ipc::RPCChannel::RacyRPCPolicy
virtual mozilla::ipc::RacyRPCPolicy
MediateRPCRace(const Message& parent, const Message& child) MOZ_OVERRIDE
{
return MediateRace(parent, child);

View File

@@ -64,8 +64,7 @@ CompositorChild::Create(Transport* aTransport, ProcessId aOtherProcess)
NS_RUNTIMEABORT("Couldn't OpenProcessHandle() to parent process.");
return false;
}
if (!child->Open(aTransport, handle, XRE_GetIOMessageLoop(),
AsyncChannel::Child)) {
if (!child->Open(aTransport, handle, XRE_GetIOMessageLoop(), ipc::ChildSide)) {
NS_RUNTIMEABORT("Couldn't Open() Compositor channel.");
return false;
}

View File

@@ -20,7 +20,7 @@
#include "mozilla/Assertions.h" // for MOZ_ASSERT, etc
#include "mozilla/Monitor.h" // for Monitor, MonitorAutoLock
#include "mozilla/ReentrantMonitor.h" // for ReentrantMonitor, etc
#include "mozilla/ipc/AsyncChannel.h" // for AsyncChannel, etc
#include "mozilla/ipc/MessageChannel.h" // for MessageChannel, etc
#include "mozilla/ipc/Transport.h" // for Transport
#include "mozilla/layers/CompositableClient.h" // for CompositableChild, etc
#include "mozilla/layers/ISurfaceAllocator.h" // for ISurfaceAllocator
@@ -289,8 +289,8 @@ static void DeallocSurfaceDescriptorGrallocSync(const SurfaceDescriptor& aBuffer
static void ConnectImageBridge(ImageBridgeChild * child, ImageBridgeParent * parent)
{
MessageLoop *parentMsgLoop = parent->GetMessageLoop();
ipc::AsyncChannel *parentChannel = parent->GetIPCChannel();
child->Open(parentChannel, parentMsgLoop, mozilla::ipc::AsyncChannel::Child);
ipc::MessageChannel *parentChannel = parent->GetIPCChannel();
child->Open(parentChannel, parentMsgLoop, mozilla::ipc::ChildSide);
}
ImageBridgeChild::ImageBridgeChild()
@@ -358,7 +358,7 @@ ConnectImageBridgeInChildProcess(Transport* aTransport,
// Bind the IPC channel to the image bridge thread.
sImageBridgeChildSingleton->Open(aTransport, aOtherProcess,
XRE_GetIOMessageLoop(),
AsyncChannel::Child);
ipc::ChildSide);
}
static void ReleaseImageClientNow(ImageClient* aClient)

View File

@@ -12,7 +12,7 @@
#include "base/task.h" // for CancelableTask, DeleteTask, etc
#include "base/tracked.h" // for FROM_HERE
#include "gfxPoint.h" // for gfxIntSize
#include "mozilla/ipc/AsyncChannel.h" // for AsyncChannel, etc
#include "mozilla/ipc/MessageChannel.h" // for MessageChannel, etc
#include "mozilla/ipc/ProtocolUtils.h"
#include "mozilla/ipc/Transport.h" // for Transport
#include "mozilla/layers/CompositableTransactionParent.h"
@@ -99,8 +99,7 @@ ConnectImageBridgeInParentProcess(ImageBridgeParent* aBridge,
Transport* aTransport,
ProcessHandle aOtherProcess)
{
aBridge->Open(aTransport, aOtherProcess,
XRE_GetIOMessageLoop(), AsyncChannel::Parent);
aBridge->Open(aTransport, aOtherProcess, XRE_GetIOMessageLoop(), ipc::ParentSide);
}
/*static*/ bool

View File

@@ -91,6 +91,11 @@ class Message : public Pickle {
return (header()->flags & RPC_BIT) != 0;
}
// True if this is an urgent message.
bool is_urgent() const {
return (header()->flags & URGENT_BIT) != 0;
}
// True if compression is enabled for this message.
bool compress() const {
return (header()->flags & COMPRESS_BIT) != 0;
@@ -270,6 +275,10 @@ class Message : public Pickle {
header()->flags |= RPC_BIT;
}
void set_urgent() {
header()->flags |= URGENT_BIT;
}
#if !defined(OS_MACOSX)
protected:
#endif
@@ -284,7 +293,8 @@ class Message : public Pickle {
PUMPING_MSGS_BIT= 0x0040,
HAS_SENT_TIME_BIT = 0x0080,
RPC_BIT = 0x0100,
COMPRESS_BIT = 0x0200
COMPRESS_BIT = 0x0200,
URGENT_BIT = 0x0400
};
struct Header : Pickle::Header {

View File

@@ -1,298 +0,0 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
*/
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef ipc_glue_AsyncChannel_h
#define ipc_glue_AsyncChannel_h 1
#include "base/basictypes.h"
#include "base/message_loop.h"
#include "mozilla/WeakPtr.h"
#include "mozilla/Monitor.h"
#include "mozilla/ipc/Transport.h"
#include "nsAutoPtr.h"
//-----------------------------------------------------------------------------
namespace mozilla {
namespace ipc {
struct HasResultCodes
{
enum Result {
MsgProcessed,
MsgDropped,
MsgNotKnown,
MsgNotAllowed,
MsgPayloadError,
MsgProcessingError,
MsgRouteError,
MsgValueError
};
};
class RefCountedMonitor : public Monitor
{
public:
RefCountedMonitor()
: Monitor("mozilla.ipc.AsyncChannel.mMonitor")
{}
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedMonitor)
};
class AsyncChannel : protected HasResultCodes
{
protected:
typedef mozilla::Monitor Monitor;
enum ChannelState {
ChannelClosed,
ChannelOpening,
ChannelConnected,
ChannelTimeout,
ChannelClosing,
ChannelError
};
public:
typedef IPC::Message Message;
typedef mozilla::ipc::Transport Transport;
class /*NS_INTERFACE_CLASS*/ AsyncListener
: protected HasResultCodes
, public mozilla::SupportsWeakPtr<AsyncListener>
{
public:
virtual ~AsyncListener() { }
virtual void OnChannelClose() = 0;
virtual void OnChannelError() = 0;
virtual Result OnMessageReceived(const Message& aMessage) = 0;
virtual void OnProcessingError(Result aError) = 0;
// FIXME/bug 792652: this doesn't really belong here, but a
// large refactoring is needed to put it where it belongs.
virtual int32_t GetProtocolTypeId() = 0;
virtual void OnChannelConnected(int32_t peer_pid) {}
};
enum Side { Parent, Child, Unknown };
public:
//
// These methods are called on the "worker" thread
//
AsyncChannel(AsyncListener* aListener);
virtual ~AsyncChannel();
// "Open" from the perspective of the transport layer; the underlying
// socketpair/pipe should already be created.
//
// Returns true iff the transport layer was successfully connected,
// i.e., mChannelState == ChannelConnected.
bool Open(Transport* aTransport, MessageLoop* aIOLoop=0, Side aSide=Unknown);
// "Open" a connection to another thread in the same process.
//
// Returns true iff the transport layer was successfully connected,
// i.e., mChannelState == ChannelConnected.
//
// For more details on the process of opening a channel between
// threads, see the extended comment on this function
// in AsyncChannel.cpp.
bool Open(AsyncChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide);
// Close the underlying transport channel.
void Close();
// Force the channel to behave as if a channel error occurred. Valid
// for process links only, not thread links.
void CloseWithError();
// Asynchronously send a message to the other side of the channel
virtual bool Send(Message* msg);
// Asynchronously deliver a message back to this side of the
// channel
virtual bool Echo(Message* msg);
// Send OnChannelConnected notification to listeners.
void DispatchOnChannelConnected(int32_t peer_pid);
// Unsound_IsClosed and Unsound_NumQueuedMessages are safe to call from any
// thread, but they make no guarantees about whether you'll get an
// up-to-date value; the values are written on one thread and read without
// locking, on potentially different threads. Thus you should only use
// them when you don't particularly care about getting a recent value (e.g.
// in a memory report).
bool Unsound_IsClosed() const;
uint32_t Unsound_NumQueuedMessages() const;
//
// Each AsyncChannel is associated with either a ProcessLink or a
// ThreadLink via the field mLink. The type of link is determined
// by whether this AsyncChannel is communicating with another
// process or another thread. In the former case, file
// descriptors or a socket are used via the I/O queue. In the
// latter case, messages are enqueued directly onto the target
// thread's work queue.
//
class Link {
protected:
AsyncChannel *mChan;
public:
Link(AsyncChannel *aChan);
virtual ~Link();
// n.b.: These methods all require that the channel monitor is
// held when they are invoked.
virtual void EchoMessage(Message *msg) = 0;
virtual void SendMessage(Message *msg) = 0;
virtual void SendClose() = 0;
virtual bool Unsound_IsClosed() const = 0;
virtual uint32_t Unsound_NumQueuedMessages() const = 0;
};
class ProcessLink : public Link, public Transport::Listener {
protected:
Transport* mTransport;
MessageLoop* mIOLoop; // thread where IO happens
Transport::Listener* mExistingListener; // channel's previous listener
void OnCloseChannel();
void OnChannelOpened();
void OnTakeConnectedChannel();
void OnEchoMessage(Message* msg);
void AssertIOThread() const
{
NS_ABORT_IF_FALSE(mIOLoop == MessageLoop::current(),
"not on I/O thread!");
}
public:
ProcessLink(AsyncChannel *chan);
virtual ~ProcessLink();
void Open(Transport* aTransport, MessageLoop *aIOLoop, Side aSide);
// Run on the I/O thread, only when using inter-process link.
// These methods acquire the monitor and forward to the
// similarly named methods in AsyncChannel below
// (OnMessageReceivedFromLink(), etc)
virtual void OnMessageReceived(const Message& msg) MOZ_OVERRIDE;
virtual void OnChannelConnected(int32_t peer_pid) MOZ_OVERRIDE;
virtual void OnChannelError() MOZ_OVERRIDE;
virtual void EchoMessage(Message *msg) MOZ_OVERRIDE;
virtual void SendMessage(Message *msg) MOZ_OVERRIDE;
virtual void SendClose() MOZ_OVERRIDE;
virtual bool Unsound_IsClosed() const MOZ_OVERRIDE;
virtual uint32_t Unsound_NumQueuedMessages() const MOZ_OVERRIDE;
};
class ThreadLink : public Link {
protected:
AsyncChannel* mTargetChan;
public:
ThreadLink(AsyncChannel *aChan, AsyncChannel *aTargetChan);
virtual ~ThreadLink();
virtual void EchoMessage(Message *msg) MOZ_OVERRIDE;
virtual void SendMessage(Message *msg) MOZ_OVERRIDE;
virtual void SendClose() MOZ_OVERRIDE;
virtual bool Unsound_IsClosed() const MOZ_OVERRIDE;
virtual uint32_t Unsound_NumQueuedMessages() const MOZ_OVERRIDE;
};
protected:
// The "link" thread is either the I/O thread (ProcessLink) or the
// other actor's work thread (ThreadLink). In either case, it is
// NOT our worker thread.
void AssertLinkThread() const
{
NS_ABORT_IF_FALSE(mWorkerLoopID != MessageLoop::current()->id(),
"on worker thread but should not be!");
}
// Can be run on either thread
void AssertWorkerThread() const
{
NS_ABORT_IF_FALSE(mWorkerLoopID == MessageLoop::current()->id(),
"not on worker thread!");
}
bool Connected() const {
mMonitor->AssertCurrentThreadOwns();
// The transport layer allows us to send messages before
// receiving the "connected" ack from the remote side.
return (ChannelOpening == mChannelState ||
ChannelConnected == mChannelState);
}
// Return true if |msg| is a special message targeted at the IO
// thread, in which case it shouldn't be delivered to the worker.
virtual bool MaybeInterceptSpecialIOMessage(const Message& msg);
void ProcessGoodbyeMessage();
// Runs on the link thread. Invoked either from the I/O thread methods above
// or directly from the other actor if using a thread-based link.
//
// n.b.: mMonitor is always held when these methods are invoked.
// In the case of a ProcessLink, it is acquired by the ProcessLink.
// In the case of a ThreadLink, it is acquired by the other actor,
// which then invokes these methods directly.
virtual void OnMessageReceivedFromLink(const Message& msg);
virtual void OnChannelErrorFromLink();
void PostErrorNotifyTask();
// Run on the worker thread
void OnDispatchMessage(const Message& aMsg);
virtual bool OnSpecialMessage(uint16_t id, const Message& msg);
void SendSpecialMessage(Message* msg) const;
// Tell the IO thread to close the channel and wait for it to ACK.
void SynchronouslyClose();
bool MaybeHandleError(Result code, const char* channelName);
void ReportConnectionError(const char* channelName) const;
// Run on the worker thread
void OnNotifyMaybeChannelError();
virtual bool ShouldDeferNotifyMaybeError() const {
return false;
}
void NotifyChannelClosed();
void NotifyMaybeChannelError();
void OnOpenAsSlave(AsyncChannel *aTargetChan, Side aSide);
void CommonThreadOpenInit(AsyncChannel *aTargetChan, Side aSide);
virtual void Clear();
mozilla::WeakPtr<AsyncListener> mListener;
ChannelState mChannelState;
nsRefPtr<RefCountedMonitor> mMonitor;
MessageLoop* mWorkerLoop; // thread where work is done
bool mChild; // am I the child or parent?
CancelableTask* mChannelErrorTask; // NotifyMaybeChannelError runnable
Link *mLink; // link to other thread/process
// id() of mWorkerLoop. This persists even after mWorkerLoop is cleared
// during channel shutdown.
int mWorkerLoopID;
};
} // namespace ipc
} // namespace mozilla
#endif // ifndef ipc_glue_AsyncChannel_h

1380
ipc/glue/MessageChannel.cpp Normal file

File diff suppressed because it is too large Load Diff

612
ipc/glue/MessageChannel.h Normal file
View File

@@ -0,0 +1,612 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
*/
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef ipc_glue_MessageChannel_h
#define ipc_glue_MessageChannel_h 1
#include "base/basictypes.h"
#include "base/message_loop.h"
#include "mozilla/WeakPtr.h"
#include "mozilla/Monitor.h"
#include "mozilla/ipc/Transport.h"
#include "MessageLink.h"
#include "nsAutoPtr.h"
#include "mozilla/DebugOnly.h"
#include <deque>
#include <stack>
#include <vector>
#include <math.h>
namespace mozilla {
namespace ipc {
class MessageChannel;
class RefCountedMonitor : public Monitor
{
public:
RefCountedMonitor()
: Monitor("mozilla.ipc.MessageChannel.mMonitor")
{}
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedMonitor)
};
class MessageChannel : HasResultCodes
{
friend class ProcessLink;
friend class ThreadLink;
typedef mozilla::Monitor Monitor;
public:
static const int32_t kNoTimeout;
typedef IPC::Message Message;
typedef mozilla::ipc::Transport Transport;
MessageChannel(MessageListener *aListener);
~MessageChannel();
// "Open" from the perspective of the transport layer; the underlying
// socketpair/pipe should already be created.
//
// Returns true iff the transport layer was successfully connected,
// i.e., mChannelState == ChannelConnected.
bool Open(Transport* aTransport, MessageLoop* aIOLoop=0, Side aSide=UnknownSide);
// "Open" a connection to another thread in the same process.
//
// Returns true iff the transport layer was successfully connected,
// i.e., mChannelState == ChannelConnected.
//
// For more details on the process of opening a channel between
// threads, see the extended comment on this function
// in MessageChannel.cpp.
bool Open(MessageChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide);
// Close the underlying transport channel.
void Close();
// Force the channel to behave as if a channel error occurred. Valid
// for process links only, not thread links.
void CloseWithError();
// Asynchronously send a message to the other side of the channel
bool Send(Message* aMsg);
// Asynchronously deliver a message back to this side of the
// channel
bool Echo(Message* aMsg);
// Synchronously send |msg| (i.e., wait for |reply|)
bool Send(Message* aMsg, Message* aReply);
// Make an RPC to the other side of the channel
bool Call(Message* aMsg, Message* aReply);
void SetReplyTimeoutMs(int32_t aTimeoutMs);
bool IsOnCxxStack() const {
return !mCxxStackFrames.empty();
}
void FlushPendingRPCQueue();
// Unsound_IsClosed and Unsound_NumQueuedMessages are safe to call from any
// thread, but they make no guarantees about whether you'll get an
// up-to-date value; the values are written on one thread and read without
// locking, on potentially different threads. Thus you should only use
// them when you don't particularly care about getting a recent value (e.g.
// in a memory report).
bool Unsound_IsClosed() const {
return mLink ? mLink->Unsound_IsClosed() : true;
}
uint32_t Unsound_NumQueuedMessages() const {
return mLink ? mLink->Unsound_NumQueuedMessages() : 0;
}
static bool IsPumpingMessages() {
return sIsPumpingMessages;
}
static void SetIsPumpingMessages(bool aIsPumping) {
sIsPumpingMessages = aIsPumping;
}
#ifdef OS_WIN
struct MOZ_STACK_CLASS SyncStackFrame
{
SyncStackFrame(MessageChannel* channel, bool rpc);
~SyncStackFrame();
bool mRPC;
bool mSpinNestedEvents;
bool mListenerNotified;
MessageChannel* mChannel;
// The previous stack frame for this channel.
SyncStackFrame* mPrev;
// The previous stack frame on any channel.
SyncStackFrame* mStaticPrev;
};
friend struct MessageChannel::SyncStackFrame;
static bool IsSpinLoopActive() {
for (SyncStackFrame* frame = sStaticTopFrame; frame; frame = frame->mPrev) {
if (frame->mSpinNestedEvents)
return true;
}
return false;
}
protected:
// The deepest sync stack frame for this channel.
SyncStackFrame* mTopFrame;
// The deepest sync stack frame on any channel.
static SyncStackFrame* sStaticTopFrame;
public:
void ProcessNativeEventsInRPCCall();
static void NotifyGeckoEventDispatch();
private:
void SpinInternalEventLoop();
#endif
private:
void CommonThreadOpenInit(MessageChannel *aTargetChan, Side aSide);
void OnOpenAsSlave(MessageChannel *aTargetChan, Side aSide);
void PostErrorNotifyTask();
void OnNotifyMaybeChannelError();
void ReportConnectionError(const char* aChannelName) const;
bool MaybeHandleError(Result code, const char* channelName);
void Clear();
// Send OnChannelConnected notification to listeners.
void DispatchOnChannelConnected(int32_t peer_pid);
// Any protocol that requires blocking until a reply arrives, will send its
// outgoing message through this function. Currently, two protocols do this:
//
// sync, which can only initiate messages from child to parent.
// urgent, which can only initiate messages from parent to child.
//
// SendAndWait() expects that the worker thread owns the monitor, and that
// the message has been prepared to be sent over the link. It returns as
// soon as a reply has been received, or an error has occurred.
//
// Note that while the child is blocked waiting for a sync reply, it can wake
// up to process urgent calls from the parent.
bool SendAndWait(Message* aMsg, Message* aReply);
bool RPCCall(Message* aMsg, Message* aReply);
bool UrgentCall(Message* aMsg, Message* aReply);
bool RPCEventOccurred();
void MaybeUndeferIncall();
void EnqueuePendingMessages();
// Executed on the worker thread. Dequeues one pending message.
bool OnMaybeDequeueOne();
// Dispatches an incoming message to its appropriate handler.
void DispatchMessage(const Message &aMsg);
// DispatchMessage will route to one of these functions depending on the
// protocol type of the message.
void DispatchSyncMessage(const Message &aMsg);
void DispatchUrgentMessage(const Message &aMsg);
void DispatchAsyncMessage(const Message &aMsg);
void DispatchRPCMessage(const Message &aMsg, size_t aStackDepth);
// Return true if the wait ended because a notification was received.
//
// Return false if the time elapsed from when we started the process of
// waiting until afterwards exceeded the currently allotted timeout.
// That *DOES NOT* mean false => "no event" (== timeout); there are many
// circumstances that could cause the measured elapsed time to exceed the
// timeout EVEN WHEN we were notified.
//
// So in sum: true is a meaningful return value; false isn't,
// necessarily.
bool WaitForSyncNotify();
bool WaitForRPCNotify();
bool WaitResponse(bool aWaitTimedOut);
bool ShouldContinueFromTimeout();
// The "remote view of stack depth" can be different than the
// actual stack depth when there are out-of-turn replies. When we
// receive one, our actual RPC stack depth doesn't decrease, but
// the other side (that sent the reply) thinks it has. So, the
// "view" returned here is |stackDepth| minus the number of
// out-of-turn replies.
//
// Only called from the worker thread.
size_t RemoteViewOfStackDepth(size_t stackDepth) const {
AssertWorkerThread();
return stackDepth - mOutOfTurnReplies.size();
}
int32_t NextSeqno() {
AssertWorkerThread();
return (mSide == ChildSide) ? --mNextSeqno : ++mNextSeqno;
}
// This helper class manages mCxxStackDepth on behalf of MessageChannel.
// When the stack depth is incremented from zero to non-zero, it invokes
// an RPCChannel callback, and similarly for when the depth goes from
// non-zero to zero.
void EnteredCxxStack() {
mListener->OnEnteredCxxStack();
}
void ExitedCxxStack();
void EnteredCall() {
mListener->OnEnteredCall();
}
void ExitedCall() {
mListener->OnExitedCall();
}
MessageListener *Listener() const {
return mListener.get();
}
enum Direction { IN_MESSAGE, OUT_MESSAGE };
struct RPCFrame {
RPCFrame(Direction direction, const Message* msg)
: mDirection(direction), mMsg(msg)
{ }
bool IsRPCIncall() const {
return mMsg->is_rpc() && IN_MESSAGE == mDirection;
}
bool IsRPCOutcall() const {
return mMsg->is_rpc() && OUT_MESSAGE == mDirection;
}
void Describe(int32_t* id, const char** dir, const char** sems,
const char** name) const
{
*id = mMsg->routing_id();
*dir = (IN_MESSAGE == mDirection) ? "in" : "out";
*sems = mMsg->is_rpc() ? "rpc" : mMsg->is_sync() ? "sync" : "async";
*name = mMsg->name();
}
Direction mDirection;
const Message* mMsg;
};
class MOZ_STACK_CLASS CxxStackFrame
{
public:
CxxStackFrame(MessageChannel& that, Direction direction, const Message* msg)
: mThat(that)
{
mThat.AssertWorkerThread();
if (mThat.mCxxStackFrames.empty())
mThat.EnteredCxxStack();
mThat.mCxxStackFrames.push_back(RPCFrame(direction, msg));
const RPCFrame& frame = mThat.mCxxStackFrames.back();
if (frame.IsRPCIncall())
mThat.EnteredCall();
mThat.mSawRPCOutMsg |= frame.IsRPCOutcall();
}
~CxxStackFrame() {
bool exitingCall = mThat.mCxxStackFrames.back().IsRPCIncall();
mThat.mCxxStackFrames.pop_back();
bool exitingStack = mThat.mCxxStackFrames.empty();
// mListener could have gone away if Close() was called while
// MessageChannel code was still on the stack
if (!mThat.mListener)
return;
mThat.AssertWorkerThread();
if (exitingCall)
mThat.ExitedCall();
if (exitingStack)
mThat.ExitedCxxStack();
}
private:
MessageChannel& mThat;
// disable harmful methods
CxxStackFrame();
CxxStackFrame(const CxxStackFrame&);
CxxStackFrame& operator=(const CxxStackFrame&);
};
void DebugAbort(const char* file, int line, const char* cond,
const char* why,
bool reply=false) const;
// This method is only safe to call on the worker thread, or in a
// debugger with all threads paused.
void DumpRPCStack(const char* const pfx="") const;
private:
// Called from both threads
size_t RPCStackDepth() const {
mMonitor->AssertCurrentThreadOwns();
return mRPCStack.size();
}
// Returns true if we're blocking waiting for a reply.
bool AwaitingSyncReply() const {
mMonitor->AssertCurrentThreadOwns();
return mPendingSyncReplies > 0;
}
bool AwaitingUrgentReply() const {
mMonitor->AssertCurrentThreadOwns();
return mPendingUrgentReplies > 0;
}
bool AwaitingRPCReply() const {
mMonitor->AssertCurrentThreadOwns();
return !mRPCStack.empty();
}
// Returns true if we're dispatching a sync message's callback.
bool DispatchingSyncMessage() const {
return mDispatchingSyncMessage;
}
bool Connected() const;
private:
// Executed on the IO thread.
void NotifyWorkerThread();
// Return true if |aMsg| is a special message targeted at the IO
// thread, in which case it shouldn't be delivered to the worker.
bool MaybeInterceptSpecialIOMessage(const Message& aMsg);
void OnChannelConnected(int32_t peer_id);
// Tell the IO thread to close the channel and wait for it to ACK.
void SynchronouslyClose();
void OnMessageReceivedFromLink(const Message& aMsg);
void OnChannelErrorFromLink();
private:
// Run on the not current thread.
void NotifyChannelClosed();
void NotifyMaybeChannelError();
private:
// Can be run on either thread
void AssertWorkerThread() const
{
NS_ABORT_IF_FALSE(mWorkerLoopID == MessageLoop::current()->id(),
"not on worker thread!");
}
// The "link" thread is either the I/O thread (ProcessLink) or the
// other actor's work thread (ThreadLink). In either case, it is
// NOT our worker thread.
void AssertLinkThread() const
{
NS_ABORT_IF_FALSE(mWorkerLoopID != MessageLoop::current()->id(),
"on worker thread but should not be!");
}
private:
typedef IPC::Message::msgid_t msgid_t;
typedef std::deque<Message> MessageQueue;
typedef std::map<size_t, Message> MessageMap;
// All dequeuing tasks require a single point of cancellation,
// which is handled via a reference-counted task.
class RefCountedTask
{
public:
RefCountedTask(CancelableTask* aTask)
: mTask(aTask)
{ }
~RefCountedTask() { delete mTask; }
void Run() { mTask->Run(); }
void Cancel() { mTask->Cancel(); }
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedTask)
private:
CancelableTask* mTask;
};
// Wrap an existing task which can be cancelled at any time
// without the wrapper's knowledge.
class DequeueTask : public Task
{
public:
DequeueTask(RefCountedTask* aTask)
: mTask(aTask)
{ }
void Run() { mTask->Run(); }
private:
nsRefPtr<RefCountedTask> mTask;
};
private:
mozilla::WeakPtr<MessageListener> mListener;
ChannelState mChannelState;
nsRefPtr<RefCountedMonitor> mMonitor;
Side mSide;
MessageLink* mLink;
MessageLoop* mWorkerLoop; // thread where work is done
CancelableTask* mChannelErrorTask; // NotifyMaybeChannelError runnable
// id() of mWorkerLoop. This persists even after mWorkerLoop is cleared
// during channel shutdown.
int mWorkerLoopID;
// A task encapsulating dequeuing one pending message.
nsRefPtr<RefCountedTask> mDequeueOneTask;
// Timeout periods are broken up in two to prevent system suspension from
// triggering an abort. This method (called by WaitForEvent with a 'did
// timeout' flag) decides if we should wait again for half of mTimeoutMs
// or give up.
int32_t mTimeoutMs;
bool mInTimeoutSecondHalf;
// Worker-thread only; sequence numbers for messages that require
// synchronous replies.
int32_t mNextSeqno;
static bool sIsPumpingMessages;
class AutoEnterPendingReply {
public:
AutoEnterPendingReply(size_t &replyVar)
: mReplyVar(replyVar)
{
mReplyVar++;
}
~AutoEnterPendingReply() {
mReplyVar--;
}
private:
size_t& mReplyVar;
};
// Worker-thread only; type we're expecting for the reply to a sync
// out-message. This will never be greater than 1.
size_t mPendingSyncReplies;
// Worker-thread only; type we're expecting for the reply to an
// urgent out-message. This will never be greater than 1.
size_t mPendingUrgentReplies;
// If waiting for the reply to a sync out-message, it will be saved here
// on the I/O thread and then read and cleared by the worker thread.
nsAutoPtr<Message> mRecvd;
// Set while we are dispatching a synchronous message.
bool mDispatchingSyncMessage;
// Queue of all incoming messages, except for replies to sync and urgent
// messages, which are delivered directly to mRecvd, and any pending urgent
// incall, which is stored in mPendingUrgentRequest.
//
// If both this side and the other side are functioning correctly, the queue
// can only be in certain configurations. Let
//
// |A<| be an async in-message,
// |S<| be a sync in-message,
// |C<| be an RPC in-call,
// |R<| be an RPC reply.
//
// The queue can only match this configuration
//
// A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<)))
//
// The other side can send as many async messages |A<*| as it wants before
// sending us a blocking message.
//
// The first case is |S<|, a sync in-msg. The other side must be blocked,
// and thus can't send us any more messages until we process the sync
// in-msg.
//
// The second case is |C<|, an RPC in-call; the other side must be blocked.
// (There's a subtlety here: this in-call might have raced with an
// out-call, but we detect that with the mechanism below,
// |mRemoteStackDepth|, and races don't matter to the queue.)
//
// Final case, the other side replied to our most recent out-call |R<|.
// If that was the *only* out-call on our stack, |?{mStack.size() == 1}|,
// then other side "finished with us," and went back to its own business.
// That business might have included sending any number of async message
// |A<*| until sending a blocking message |(S< | C<)|. If we had more than
// one RPC call on our stack, the other side *better* not have sent us
// another blocking message, because it's blocked on a reply from us.
//
MessageQueue mPending;
nsAutoPtr<Message> mPendingUrgentRequest;
// Stack of all the out-calls on which this channel is awaiting responses.
// Each stack refers to a different protocol and the stacks are mutually
// exclusive: multiple outcalls of the same kind cannot be initiated while
// another is active.
std::stack<Message> mRPCStack;
// This is what we think the RPC stack depth is on the "other side" of this
// RPC channel. We maintain this variable so that we can detect racy RPC
// calls. With each RPC out-call sent, we send along what *we* think the
// stack depth of the remote side is *before* it will receive the RPC call.
//
// After sending the out-call, our stack depth is "incremented" by pushing
// that pending message onto mPending.
//
// Then when processing an in-call |c|, it must be true that
//
// mStack.size() == c.remoteDepth
//
// I.e., my depth is actually the same as what the other side thought it
// was when it sent in-call |c|. If this fails to hold, we have detected
// racy RPC calls.
//
// We then increment mRemoteStackDepth *just before* processing the
// in-call, since we know the other side is waiting on it, and decrement
// it *just after* finishing processing that in-call, since our response
// will pop the top of the other side's |mPending|.
//
// One nice aspect of this race detection is that it is symmetric; if one
// side detects a race, then the other side must also detect the same race.
size_t mRemoteStackDepthGuess;
// Approximation of code frames on the C++ stack. It can only be
// interpreted as the implication:
//
// !mCxxStackFrames.empty() => MessageChannel code on C++ stack
//
// This member is only accessed on the worker thread, and so is not
// protected by mMonitor. It is managed exclusively by the helper
// |class CxxStackFrame|.
std::vector<RPCFrame> mCxxStackFrames;
// Did we process an RPC out-call during this stack? Only meaningful in
// ExitedCxxStack(), from which this variable is reset.
bool mSawRPCOutMsg;
// Map of replies received "out of turn", because of RPC
// in-calls racing with replies to outstanding in-calls. See
// https://bugzilla.mozilla.org/show_bug.cgi?id=521929.
MessageMap mOutOfTurnReplies;
// Stack of RPC in-calls that were deferred because of race
// conditions.
std::stack<Message> mDeferred;
#ifdef OS_WIN
HANDLE mEvent;
#endif
};
} // namespace ipc
} // namespace mozilla
#endif // ifndef ipc_glue_MessageChannel_h

367
ipc/glue/MessageLink.cpp Normal file
View File

@@ -0,0 +1,367 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
*/
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "mozilla/ipc/MessageLink.h"
#include "mozilla/ipc/MessageChannel.h"
#include "mozilla/ipc/BrowserProcessSubThread.h"
#include "mozilla/ipc/ProtocolUtils.h"
#include "nsDebug.h"
#include "nsTraceRefcnt.h"
#include "nsXULAppAPI.h"
using namespace mozilla;
using namespace std;
template<>
struct RunnableMethodTraits<mozilla::ipc::ProcessLink>
{
static void RetainCallee(mozilla::ipc::ProcessLink* obj) { }
static void ReleaseCallee(mozilla::ipc::ProcessLink* obj) { }
};
// We rely on invariants about the lifetime of the transport:
//
// - outlives this MessageChannel
// - deleted on the IO thread
//
// These invariants allow us to send messages directly through the
// transport without having to worry about orphaned Send() tasks on
// the IO thread touching MessageChannel memory after it's been deleted
// on the worker thread. We also don't need to refcount the
// Transport, because whatever task triggers its deletion only runs on
// the IO thread, and only runs after this MessageChannel is done with
// the Transport.
template<>
struct RunnableMethodTraits<mozilla::ipc::MessageChannel::Transport>
{
static void RetainCallee(mozilla::ipc::MessageChannel::Transport* obj) { }
static void ReleaseCallee(mozilla::ipc::MessageChannel::Transport* obj) { }
};
namespace mozilla {
namespace ipc {
MessageLink::MessageLink(MessageChannel *aChan)
: mChan(aChan)
{
}
MessageLink::~MessageLink()
{
mChan = nullptr;
}
ProcessLink::ProcessLink(MessageChannel *aChan)
: MessageLink(aChan),
mExistingListener(NULL)
{
}
ProcessLink::~ProcessLink()
{
mIOLoop = 0;
if (mTransport) {
mTransport->set_listener(0);
// we only hold a weak ref to the transport, which is "owned"
// by GeckoChildProcess/GeckoThread
mTransport = 0;
}
}
void
ProcessLink::Open(mozilla::ipc::Transport* aTransport, MessageLoop *aIOLoop, Side aSide)
{
NS_PRECONDITION(aTransport, "need transport layer");
// FIXME need to check for valid channel
mTransport = aTransport;
// FIXME figure out whether we're in parent or child, grab IO loop
// appropriately
bool needOpen = true;
if(aIOLoop) {
// We're a child or using the new arguments. Either way, we
// need an open.
needOpen = true;
mChan->mSide = (aSide == UnknownSide) ? ChildSide : aSide;
} else {
NS_PRECONDITION(aSide == UnknownSide, "expected default side arg");
// parent
mChan->mSide = ParentSide;
needOpen = false;
aIOLoop = XRE_GetIOMessageLoop();
}
mIOLoop = aIOLoop;
NS_ASSERTION(mIOLoop, "need an IO loop");
NS_ASSERTION(mChan->mWorkerLoop, "need a worker loop");
{
MonitorAutoLock lock(*mChan->mMonitor);
if (needOpen) {
// Transport::Connect() has not been called. Call it so
// we start polling our pipe and processing outgoing
// messages.
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &ProcessLink::OnChannelOpened));
} else {
// Transport::Connect() has already been called. Take
// over the channel from the previous listener and process
// any queued messages.
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &ProcessLink::OnTakeConnectedChannel));
}
// Should not wait here if something goes wrong with the channel.
while (!mChan->Connected() && mChan->mChannelState != ChannelError) {
mChan->mMonitor->Wait();
}
}
}
void
ProcessLink::EchoMessage(Message *msg)
{
mChan->AssertWorkerThread();
mChan->mMonitor->AssertCurrentThreadOwns();
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &ProcessLink::OnEchoMessage, msg));
// OnEchoMessage takes ownership of |msg|
}
void
ProcessLink::SendMessage(Message *msg)
{
mChan->AssertWorkerThread();
mChan->mMonitor->AssertCurrentThreadOwns();
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(mTransport, &Transport::Send, msg));
}
void
ProcessLink::SendClose()
{
mChan->AssertWorkerThread();
mChan->mMonitor->AssertCurrentThreadOwns();
mIOLoop->PostTask(
FROM_HERE, NewRunnableMethod(this, &ProcessLink::OnCloseChannel));
}
ThreadLink::ThreadLink(MessageChannel *aChan, MessageChannel *aTargetChan)
: MessageLink(aChan),
mTargetChan(aTargetChan)
{
}
ThreadLink::~ThreadLink()
{
// :TODO: MonitorAutoLock lock(*mChan->mMonitor);
// Bug 848949: We need to prevent the other side
// from sending us any more messages to avoid Use-After-Free.
// The setup here is as shown:
//
// (Us) (Them)
// MessageChannel MessageChannel
// | ^ \ / ^ |
// | | X | |
// v | / \ | v
// ThreadLink ThreadLink
//
// We want to null out the diagonal link from their ThreadLink
// to our MessageChannel. Note that we must hold the monitor so
// that we do this atomically with respect to them trying to send
// us a message.
if (mTargetChan) {
static_cast<ThreadLink*>(mTargetChan->mLink)->mTargetChan = 0;
}
mTargetChan = 0;
}
void
ThreadLink::EchoMessage(Message *msg)
{
mChan->AssertWorkerThread();
mChan->mMonitor->AssertCurrentThreadOwns();
mChan->OnMessageReceivedFromLink(*msg);
delete msg;
}
void
ThreadLink::SendMessage(Message *msg)
{
mChan->AssertWorkerThread();
mChan->mMonitor->AssertCurrentThreadOwns();
if (mTargetChan)
mTargetChan->OnMessageReceivedFromLink(*msg);
delete msg;
}
void
ThreadLink::SendClose()
{
mChan->AssertWorkerThread();
mChan->mMonitor->AssertCurrentThreadOwns();
mChan->mChannelState = ChannelClosed;
// In a ProcessLink, we would close our half the channel. This
// would show up on the other side as an error on the I/O thread.
// The I/O thread would then invoke OnChannelErrorFromLink().
// As usual, we skip that process and just invoke the
// OnChannelErrorFromLink() method directly.
if (mTargetChan)
mTargetChan->OnChannelErrorFromLink();
}
bool
ThreadLink::Unsound_IsClosed() const
{
MonitorAutoLock lock(*mChan->mMonitor);
return mChan->mChannelState == ChannelClosed;
}
uint32_t
ThreadLink::Unsound_NumQueuedMessages() const
{
// ThreadLinks don't have a message queue.
return 0;
}
//
// The methods below run in the context of the IO thread
//
void
ProcessLink::OnMessageReceived(const Message& msg)
{
AssertIOThread();
NS_ASSERTION(mChan->mChannelState != ChannelError, "Shouldn't get here!");
MonitorAutoLock lock(*mChan->mMonitor);
mChan->OnMessageReceivedFromLink(msg);
}
void
ProcessLink::OnEchoMessage(Message* msg)
{
AssertIOThread();
OnMessageReceived(*msg);
delete msg;
}
void
ProcessLink::OnChannelOpened()
{
mChan->AssertLinkThread();
{
MonitorAutoLock lock(*mChan->mMonitor);
mExistingListener = mTransport->set_listener(this);
#ifdef DEBUG
if (mExistingListener) {
queue<Message> pending;
mExistingListener->GetQueuedMessages(pending);
MOZ_ASSERT(pending.empty());
}
#endif // DEBUG
mChan->mChannelState = ChannelOpening;
lock.Notify();
}
/*assert*/mTransport->Connect();
}
void
ProcessLink::OnTakeConnectedChannel()
{
AssertIOThread();
queue<Message> pending;
{
MonitorAutoLock lock(*mChan->mMonitor);
mChan->mChannelState = ChannelConnected;
mExistingListener = mTransport->set_listener(this);
if (mExistingListener) {
mExistingListener->GetQueuedMessages(pending);
}
lock.Notify();
}
// Dispatch whatever messages the previous listener had queued up.
while (!pending.empty()) {
OnMessageReceived(pending.front());
pending.pop();
}
}
void
ProcessLink::OnChannelConnected(int32_t peer_pid)
{
AssertIOThread();
{
MonitorAutoLock lock(*mChan->mMonitor);
mChan->mChannelState = ChannelConnected;
mChan->mMonitor->Notify();
}
if (mExistingListener)
mExistingListener->OnChannelConnected(peer_pid);
mChan->OnChannelConnected(peer_pid);
}
void
ProcessLink::OnChannelError()
{
AssertIOThread();
MonitorAutoLock lock(*mChan->mMonitor);
mChan->OnChannelErrorFromLink();
}
void
ProcessLink::OnCloseChannel()
{
AssertIOThread();
mTransport->Close();
MonitorAutoLock lock(*mChan->mMonitor);
mChan->mChannelState = ChannelClosed;
mChan->mMonitor->Notify();
}
bool
ProcessLink::Unsound_IsClosed() const
{
return mTransport->Unsound_IsClosed();
}
uint32_t
ProcessLink::Unsound_NumQueuedMessages() const
{
return mTransport->Unsound_NumQueuedMessages();
}
} // namespace ipc
} // namespace mozilla

187
ipc/glue/MessageLink.h Normal file
View File

@@ -0,0 +1,187 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
*/
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef ipc_glue_MessageLink_h
#define ipc_glue_MessageLink_h 1
#include "base/basictypes.h"
#include "base/message_loop.h"
#include "mozilla/WeakPtr.h"
#include "mozilla/ipc/Transport.h"
namespace mozilla {
namespace ipc {
class MessageChannel;
struct HasResultCodes
{
enum Result {
MsgProcessed,
MsgDropped,
MsgNotKnown,
MsgNotAllowed,
MsgPayloadError,
MsgProcessingError,
MsgRouteError,
MsgValueError
};
};
enum Side {
ParentSide,
ChildSide,
UnknownSide
};
enum ChannelState {
ChannelClosed,
ChannelOpening,
ChannelConnected,
ChannelTimeout,
ChannelClosing,
ChannelError
};
// What happens if RPC calls race?
enum RacyRPCPolicy {
RRPError,
RRPChildWins,
RRPParentWins
};
class MessageListener
: protected HasResultCodes,
public mozilla::SupportsWeakPtr<MessageListener>
{
public:
typedef IPC::Message Message;
virtual ~MessageListener() { }
virtual void OnChannelClose() = 0;
virtual void OnChannelError() = 0;
virtual Result OnMessageReceived(const Message& aMessage) = 0;
virtual Result OnMessageReceived(const Message& aMessage, Message *& aReply) = 0;
virtual Result OnCallReceived(const Message& aMessage, Message *& aReply) = 0;
virtual void OnProcessingError(Result aError) = 0;
virtual void OnChannelConnected(int32_t peer_pid) {}
virtual bool OnReplyTimeout() {
return false;
}
virtual void OnEnteredCxxStack() {
NS_RUNTIMEABORT("default impl shouldn't be invoked");
}
virtual void OnExitedCxxStack() {
NS_RUNTIMEABORT("default impl shouldn't be invoked");
}
virtual void OnEnteredCall() {
NS_RUNTIMEABORT("default impl shouldn't be invoked");
}
virtual void OnExitedCall() {
NS_RUNTIMEABORT("default impl shouldn't be invoked");
}
virtual RacyRPCPolicy MediateRPCRace(const Message& parent,
const Message& child)
{
return RRPChildWins;
}
virtual void ProcessRemoteNativeEventsInRPCCall() {
}
// FIXME/bug 792652: this doesn't really belong here, but a
// large refactoring is needed to put it where it belongs.
virtual int32_t GetProtocolTypeId() = 0;
};
class MessageLink
{
public:
typedef IPC::Message Message;
MessageLink(MessageChannel *aChan);
virtual ~MessageLink();
// n.b.: These methods all require that the channel monitor is
// held when they are invoked.
virtual void EchoMessage(Message *msg) = 0;
virtual void SendMessage(Message *msg) = 0;
virtual void SendClose() = 0;
virtual bool Unsound_IsClosed() const = 0;
virtual uint32_t Unsound_NumQueuedMessages() const = 0;
protected:
MessageChannel *mChan;
};
class ProcessLink
: public MessageLink,
public Transport::Listener
{
void OnCloseChannel();
void OnChannelOpened();
void OnTakeConnectedChannel();
void OnEchoMessage(Message* msg);
void AssertIOThread() const
{
NS_ABORT_IF_FALSE(mIOLoop == MessageLoop::current(),
"not on I/O thread!");
}
public:
ProcessLink(MessageChannel *chan);
virtual ~ProcessLink();
void Open(Transport* aTransport, MessageLoop *aIOLoop, Side aSide);
// Run on the I/O thread, only when using inter-process link.
// These methods acquire the monitor and forward to the
// similarly named methods in AsyncChannel below
// (OnMessageReceivedFromLink(), etc)
virtual void OnMessageReceived(const Message& msg) MOZ_OVERRIDE;
virtual void OnChannelConnected(int32_t peer_pid) MOZ_OVERRIDE;
virtual void OnChannelError() MOZ_OVERRIDE;
virtual void EchoMessage(Message *msg) MOZ_OVERRIDE;
virtual void SendMessage(Message *msg) MOZ_OVERRIDE;
virtual void SendClose() MOZ_OVERRIDE;
virtual bool Unsound_IsClosed() const MOZ_OVERRIDE;
virtual uint32_t Unsound_NumQueuedMessages() const MOZ_OVERRIDE;
protected:
Transport* mTransport;
MessageLoop* mIOLoop; // thread where IO happens
Transport::Listener* mExistingListener; // channel's previous listener
};
class ThreadLink : public MessageLink
{
public:
ThreadLink(MessageChannel *aChan, MessageChannel *aTargetChan);
virtual ~ThreadLink();
virtual void EchoMessage(Message *msg) MOZ_OVERRIDE;
virtual void SendMessage(Message *msg) MOZ_OVERRIDE;
virtual void SendClose() MOZ_OVERRIDE;
virtual bool Unsound_IsClosed() const MOZ_OVERRIDE;
virtual uint32_t Unsound_NumQueuedMessages() const MOZ_OVERRIDE;
protected:
MessageChannel* mTargetChan;
};
} // namespace ipc
} // namespace mozilla
#endif // ifndef ipc_glue_MessageLink_h

View File

@@ -7,7 +7,7 @@
#include "base/process_util.h"
#include "mozilla/ipc/AsyncChannel.h"
#include "mozilla/ipc/MessageChannel.h"
#include "mozilla/ipc/ProtocolUtils.h"
#include "mozilla/ipc/Transport.h"
@@ -50,8 +50,8 @@ public:
bool
Bridge(const PrivateIPDLInterface&,
AsyncChannel* aParentChannel, ProcessHandle aParentProcess,
AsyncChannel* aChildChannel, ProcessHandle aChildProcess,
MessageChannel* aParentChannel, ProcessHandle aParentProcess,
MessageChannel* aChildChannel, ProcessHandle aChildProcess,
ProtocolId aProtocol, ProtocolId aChildProtocol)
{
ProcessId parentId = GetProcId(aParentProcess);
@@ -81,7 +81,7 @@ Bridge(const PrivateIPDLInterface&,
bool
Open(const PrivateIPDLInterface&,
AsyncChannel* aOpenerChannel, ProcessHandle aOtherProcess,
MessageChannel* aOpenerChannel, ProcessHandle aOtherProcess,
Transport::Mode aOpenerMode,
ProtocolId aProtocol, ProtocolId aChildProtocol)
{

View File

@@ -42,7 +42,7 @@ enum {
namespace mozilla {
namespace ipc {
class AsyncChannel;
class MessageChannel;
// Used to pass references to protocol actors across the wire.
// Actors created on the parent-side have a positive ID, and actors
@@ -99,7 +99,7 @@ public:
// XXX odd ducks, acknowledged
virtual ProcessHandle OtherProcess() const = 0;
virtual AsyncChannel* GetIPCChannel() = 0;
virtual MessageChannel* GetIPCChannel() = 0;
};
@@ -126,12 +126,12 @@ struct PrivateIPDLInterface {};
bool
Bridge(const PrivateIPDLInterface&,
AsyncChannel*, base::ProcessHandle, AsyncChannel*, base::ProcessHandle,
MessageChannel*, base::ProcessHandle, MessageChannel*, base::ProcessHandle,
ProtocolId, ProtocolId);
bool
Open(const PrivateIPDLInterface&,
AsyncChannel*, base::ProcessHandle, Transport::Mode,
MessageChannel*, base::ProcessHandle, Transport::Mode,
ProtocolId, ProtocolId);
bool

View File

@@ -1,645 +0,0 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
*/
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "mozilla/ipc/RPCChannel.h"
#include "mozilla/ipc/ProtocolUtils.h"
#include "nsDebug.h"
#include "nsTraceRefcnt.h"
#define RPC_ASSERT(_cond, ...) \
do { \
if (!(_cond)) \
DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__); \
} while (0)
using mozilla::MonitorAutoLock;
using mozilla::MonitorAutoUnlock;
template<>
struct RunnableMethodTraits<mozilla::ipc::RPCChannel>
{
static void RetainCallee(mozilla::ipc::RPCChannel* obj) { }
static void ReleaseCallee(mozilla::ipc::RPCChannel* obj) { }
};
namespace mozilla {
namespace ipc {
RPCChannel::RPCChannel(RPCListener* aListener)
: SyncChannel(aListener),
mPending(),
mStack(),
mOutOfTurnReplies(),
mDeferred(),
mRemoteStackDepthGuess(0),
mSawRPCOutMsg(false)
{
MOZ_COUNT_CTOR(RPCChannel);
mDequeueOneTask = new RefCountedTask(NewRunnableMethod(
this,
&RPCChannel::OnMaybeDequeueOne));
}
RPCChannel::~RPCChannel()
{
MOZ_COUNT_DTOR(RPCChannel);
RPC_ASSERT(mCxxStackFrames.empty(), "mismatched CxxStackFrame ctor/dtors");
Clear();
}
void
RPCChannel::Clear()
{
mDequeueOneTask->Cancel();
AsyncChannel::Clear();
}
bool
RPCChannel::EventOccurred() const
{
AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns();
RPC_ASSERT(StackDepth() > 0, "not in wait loop");
return (!Connected() ||
!mPending.empty() ||
!mUrgent.empty() ||
(!mOutOfTurnReplies.empty() &&
mOutOfTurnReplies.find(mStack.top().seqno())
!= mOutOfTurnReplies.end()));
}
bool
RPCChannel::Send(Message* msg)
{
Message copy = *msg;
CxxStackFrame f(*this, OUT_MESSAGE, &copy);
return AsyncChannel::Send(msg);
}
bool
RPCChannel::Send(Message* msg, Message* reply)
{
Message copy = *msg;
CxxStackFrame f(*this, OUT_MESSAGE, &copy);
return SyncChannel::Send(msg, reply);
}
bool
RPCChannel::Call(Message* _msg, Message* reply)
{
RPC_ASSERT(!mPendingReply, "should not be waiting for a reply");
nsAutoPtr<Message> msg(_msg);
AssertWorkerThread();
mMonitor->AssertNotCurrentThreadOwns();
RPC_ASSERT(!ProcessingSyncMessage() || msg->priority() == IPC::Message::PRIORITY_HIGH,
"violation of sync handler invariant");
RPC_ASSERT(msg->is_rpc(), "can only Call() RPC messages here");
#ifdef OS_WIN
SyncStackFrame frame(this, true);
#endif
Message copy = *msg;
CxxStackFrame f(*this, OUT_MESSAGE, &copy);
MonitorAutoLock lock(*mMonitor);
if (!Connected()) {
ReportConnectionError("RPCChannel");
return false;
}
bool urgent = (copy.priority() == IPC::Message::PRIORITY_HIGH);
msg->set_seqno(NextSeqno());
msg->set_rpc_remote_stack_depth_guess(mRemoteStackDepthGuess);
msg->set_rpc_local_stack_depth(1 + StackDepth());
mStack.push(*msg);
mLink->SendMessage(msg.forget());
while (1) {
// if a handler invoked by *Dispatch*() spun a nested event
// loop, and the connection was broken during that loop, we
// might have already processed the OnError event. if so,
// trying another loop iteration will be futile because
// channel state will have been cleared
if (!Connected()) {
ReportConnectionError("RPCChannel");
return false;
}
// now might be the time to process a message deferred because
// of race resolution
MaybeUndeferIncall();
// here we're waiting for something to happen. see long
// comment about the queue in RPCChannel.h
while (!EventOccurred()) {
bool maybeTimedOut = !RPCChannel::WaitForNotify();
if (EventOccurred() ||
// we might have received a "subtly deferred" message
// in a nested loop that it's now time to process
(!maybeTimedOut &&
(!mDeferred.empty() || !mOutOfTurnReplies.empty())))
break;
if (maybeTimedOut && !ShouldContinueFromTimeout())
return false;
}
if (!Connected()) {
ReportConnectionError("RPCChannel");
return false;
}
Message recvd;
MessageMap::iterator it;
if (!mOutOfTurnReplies.empty() &&
((it = mOutOfTurnReplies.find(mStack.top().seqno())) !=
mOutOfTurnReplies.end())) {
recvd = it->second;
mOutOfTurnReplies.erase(it);
}
else if (!mUrgent.empty()) {
recvd = mUrgent.front();
mUrgent.pop_front();
}
else if (!mPending.empty()) {
recvd = mPending.front();
mPending.pop_front();
}
else {
// because of subtleties with nested event loops, it's
// possible that we got here and nothing happened. or, we
// might have a deferred in-call that needs to be
// processed. either way, we won't break the inner while
// loop again until something new happens.
continue;
}
if (!recvd.is_rpc()) {
if (urgent && recvd.priority() != IPC::Message::PRIORITY_HIGH) {
// If we're waiting for an urgent reply, don't process any
// messages yet.
mNonUrgentDeferred.push_back(recvd);
} else if (recvd.is_sync()) {
RPC_ASSERT(mPending.empty(),
"other side should have been blocked");
MonitorAutoUnlock unlock(*mMonitor);
CxxStackFrame f(*this, IN_MESSAGE, &recvd);
SyncChannel::OnDispatchMessage(recvd);
} else {
MonitorAutoUnlock unlock(*mMonitor);
CxxStackFrame f(*this, IN_MESSAGE, &recvd);
AsyncChannel::OnDispatchMessage(recvd);
}
continue;
}
RPC_ASSERT(recvd.is_rpc(), "wtf???");
if (recvd.is_reply()) {
RPC_ASSERT(0 < mStack.size(), "invalid RPC stack");
const Message& outcall = mStack.top();
// in the parent, seqno's increase from 0, and in the
// child, they decrease from 0
if ((!mChild && recvd.seqno() < outcall.seqno()) ||
(mChild && recvd.seqno() > outcall.seqno())) {
mOutOfTurnReplies[recvd.seqno()] = recvd;
continue;
}
// FIXME/cjones: handle error
RPC_ASSERT(
recvd.is_reply_error() ||
(recvd.type() == (outcall.type()+1) &&
recvd.seqno() == outcall.seqno()),
"somebody's misbehavin'", "rpc", true);
// we received a reply to our most recent outstanding
// call. pop this frame and return the reply
mStack.pop();
bool isError = recvd.is_reply_error();
if (!isError) {
*reply = recvd;
}
if (0 == StackDepth()) {
RPC_ASSERT(
mOutOfTurnReplies.empty(),
"still have pending replies with no pending out-calls",
"rpc", true);
}
// finished with this RPC stack frame
return !isError;
}
// in-call. process in a new stack frame.
// "snapshot" the current stack depth while we own the Monitor
size_t stackDepth = StackDepth();
{
MonitorAutoUnlock unlock(*mMonitor);
// someone called in to us from the other side. handle the call
CxxStackFrame f(*this, IN_MESSAGE, &recvd);
Incall(recvd, stackDepth);
// FIXME/cjones: error handling
}
}
return true;
}
void
RPCChannel::MaybeUndeferIncall()
{
AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns();
if (mDeferred.empty())
return;
size_t stackDepth = StackDepth();
// the other side can only *under*-estimate our actual stack depth
RPC_ASSERT(mDeferred.top().rpc_remote_stack_depth_guess() <= stackDepth,
"fatal logic error");
if (mDeferred.top().rpc_remote_stack_depth_guess() < RemoteViewOfStackDepth(stackDepth))
return;
// maybe time to process this message
Message call = mDeferred.top();
mDeferred.pop();
// fix up fudge factor we added to account for race
RPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
--mRemoteStackDepthGuess;
mPending.push_back(call);
}
void
RPCChannel::EnqueuePendingMessages()
{
AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns();
MaybeUndeferIncall();
for (size_t i = 0; i < mDeferred.size(); ++i) {
mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
}
// XXX performance tuning knob: could process all or k pending
// messages here, rather than enqueuing for later processing
size_t total = mPending.size() + mUrgent.size() + mNonUrgentDeferred.size();
for (size_t i = 0; i < total; ++i) {
mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
}
}
void
RPCChannel::FlushPendingRPCQueue()
{
AssertWorkerThread();
mMonitor->AssertNotCurrentThreadOwns();
{
MonitorAutoLock lock(*mMonitor);
if (mDeferred.empty()) {
if (mPending.empty())
return;
const Message& last = mPending.back();
if (!last.is_rpc() || last.is_reply())
return;
}
}
while (OnMaybeDequeueOne());
}
bool
RPCChannel::OnMaybeDequeueOne()
{
// XXX performance tuning knob: could process all or k pending
// messages here
AssertWorkerThread();
mMonitor->AssertNotCurrentThreadOwns();
Message recvd;
{
MonitorAutoLock lock(*mMonitor);
if (!Connected()) {
ReportConnectionError("RPCChannel");
return false;
}
if (!mDeferred.empty())
MaybeUndeferIncall();
MessageQueue *queue = mUrgent.empty()
? mNonUrgentDeferred.empty()
? &mPending
: &mNonUrgentDeferred
: &mUrgent;
if (queue->empty())
return false;
recvd = queue->front();
queue->pop_front();
}
if (IsOnCxxStack() && recvd.is_rpc() && recvd.is_reply()) {
// We probably just received a reply in a nested loop for an
// RPC call sent before entering that loop.
mOutOfTurnReplies[recvd.seqno()] = recvd;
return false;
}
CxxStackFrame f(*this, IN_MESSAGE, &recvd);
if (recvd.is_rpc())
Incall(recvd, 0);
else if (recvd.is_sync())
SyncChannel::OnDispatchMessage(recvd);
else
AsyncChannel::OnDispatchMessage(recvd);
return true;
}
size_t
RPCChannel::RemoteViewOfStackDepth(size_t stackDepth) const
{
AssertWorkerThread();
return stackDepth - mOutOfTurnReplies.size();
}
void
RPCChannel::Incall(const Message& call, size_t stackDepth)
{
AssertWorkerThread();
mMonitor->AssertNotCurrentThreadOwns();
RPC_ASSERT(call.is_rpc() && !call.is_reply(), "wrong message type");
// Race detection: see the long comment near
// mRemoteStackDepthGuess in RPCChannel.h. "Remote" stack depth
// means our side, and "local" means other side.
if (call.rpc_remote_stack_depth_guess() != RemoteViewOfStackDepth(stackDepth)) {
// RPC in-calls have raced.
// the "winner", if there is one, gets to defer processing of
// the other side's in-call
bool defer;
const char* winner;
switch (Listener()->MediateRPCRace(mChild ? call : mStack.top(),
mChild ? mStack.top() : call)) {
case RRPChildWins:
winner = "child";
defer = mChild;
break;
case RRPParentWins:
winner = "parent";
defer = !mChild;
break;
case RRPError:
NS_RUNTIMEABORT("NYI: 'Error' RPC race policy");
return;
default:
NS_RUNTIMEABORT("not reached");
return;
}
if (LoggingEnabled()) {
printf_stderr(" (%s: %s won, so we're%sdeferring)\n",
mChild ? "child" : "parent", winner,
defer ? " " : " not ");
}
if (defer) {
// we now know the other side's stack has one more frame
// than we thought
++mRemoteStackDepthGuess; // decremented in MaybeProcessDeferred()
mDeferred.push(call);
return;
}
// we "lost" and need to process the other side's in-call.
// don't need to fix up the mRemoteStackDepthGuess here,
// because we're just about to increment it in DispatchCall(),
// which will make it correct again
}
#ifdef OS_WIN
SyncStackFrame frame(this, true);
#endif
DispatchIncall(call);
}
void
RPCChannel::DispatchIncall(const Message& call)
{
AssertWorkerThread();
mMonitor->AssertNotCurrentThreadOwns();
RPC_ASSERT(call.is_rpc() && !call.is_reply(),
"wrong message type");
Message* reply = nullptr;
++mRemoteStackDepthGuess;
Result rv = Listener()->OnCallReceived(call, reply);
--mRemoteStackDepthGuess;
if (!MaybeHandleError(rv, "RPCChannel")) {
delete reply;
reply = new Message();
reply->set_rpc();
reply->set_reply();
reply->set_reply_error();
}
reply->set_seqno(call.seqno());
{
MonitorAutoLock lock(*mMonitor);
if (ChannelConnected == mChannelState)
mLink->SendMessage(reply);
}
}
void
RPCChannel::ExitedCxxStack()
{
Listener()->OnExitedCxxStack();
if (mSawRPCOutMsg) {
MonitorAutoLock lock(*mMonitor);
// see long comment in OnMaybeDequeueOne()
EnqueuePendingMessages();
mSawRPCOutMsg = false;
}
}
void
RPCChannel::DebugAbort(const char* file, int line, const char* cond,
const char* why,
const char* type, bool reply) const
{
printf_stderr("###!!! [RPCChannel][%s][%s:%d] "
"Assertion (%s) failed. %s (triggered by %s%s)\n",
mChild ? "Child" : "Parent",
file, line, cond,
why,
type, reply ? "reply" : "");
// technically we need the mutex for this, but we're dying anyway
DumpRPCStack(" ");
printf_stderr(" remote RPC stack guess: %lu\n",
mRemoteStackDepthGuess);
printf_stderr(" deferred stack size: %lu\n",
mDeferred.size());
printf_stderr(" out-of-turn RPC replies stack size: %lu\n",
mOutOfTurnReplies.size());
printf_stderr(" Pending queue size: %lu, front to back:\n",
mPending.size());
MessageQueue pending = mPending;
while (!pending.empty()) {
printf_stderr(" [ %s%s ]\n",
pending.front().is_rpc() ? "rpc" :
(pending.front().is_sync() ? "sync" : "async"),
pending.front().is_reply() ? "reply" : "");
pending.pop_front();
}
NS_RUNTIMEABORT(why);
}
void
RPCChannel::DumpRPCStack(const char* const pfx) const
{
NS_WARN_IF_FALSE(MessageLoop::current() != mWorkerLoop,
"The worker thread had better be paused in a debugger!");
printf_stderr("%sRPCChannel 'backtrace':\n", pfx);
// print a python-style backtrace, first frame to last
for (uint32_t i = 0; i < mCxxStackFrames.size(); ++i) {
int32_t id;
const char* dir, *sems, *name;
mCxxStackFrames[i].Describe(&id, &dir, &sems, &name);
printf_stderr("%s[(%u) %s %s %s(actor=%d) ]\n", pfx,
i, dir, sems, name, id);
}
}
//
// The methods below run in the context of the link thread, and can proxy
// back to the methods above
//
void
RPCChannel::OnMessageReceivedFromLink(const Message& msg)
{
AssertLinkThread();
mMonitor->AssertCurrentThreadOwns();
if (MaybeInterceptSpecialIOMessage(msg))
return;
// regardless of the RPC stack, if we're awaiting a sync reply, we
// know that it needs to be immediately handled to unblock us.
if (AwaitingSyncReply() && msg.is_sync()) {
// wake up worker thread waiting at SyncChannel::Send
mRecvd = msg;
NotifyWorkerThread();
return;
}
MessageQueue *queue = (msg.priority() == IPC::Message::PRIORITY_HIGH)
? &mUrgent
: &mPending;
bool compressMessage = (msg.compress() && !queue->empty() &&
queue->back().type() == msg.type() &&
queue->back().routing_id() == msg.routing_id());
if (compressMessage) {
// This message type has compression enabled, and the back of
// the queue was the same message type and routed to the same
// destination. Replace it with the newer message.
MOZ_ASSERT(queue->back().compress());
queue->pop_back();
}
queue->push_back(msg);
// There are three cases we're concerned about, relating to the state of
// the main thread:
//
// (1) We are waiting on a sync reply - main thread is blocked on the IPC monitor.
// - If the message is high priority, we wake up the main thread to
// deliver the message. Otherwise, we leave it in the mPending queue,
// posting a task to the main event loop, where it will be processed
// once the synchronous reply has been received.
//
// (2) We are waiting on an RPC reply - main thread is blocked on the IPC monitor.
// - Always wake up the main thread to deliver the message.
//
// (3) We are not waiting on a reply.
// - We post a task to the main event loop.
//
bool waiting_rpc = (0 != StackDepth());
bool urgent = (msg.priority() == IPC::Message::PRIORITY_HIGH);
if (waiting_rpc || (AwaitingSyncReply() && urgent)) {
// Always wake up our RPC waiter, and wake up sync waiters for urgent
// messages.
NotifyWorkerThread();
} else {
// Worker thread is either not blocked on a reply, or this is an
// incoming RPC that raced with outgoing sync and needs to be deferred
// to a later event-loop iteration.
if (!compressMessage) {
// If we compressed away the previous message, we'll reuse
// its pending task.
mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
}
}
}
void
RPCChannel::OnChannelErrorFromLink()
{
AssertLinkThread();
mMonitor->AssertCurrentThreadOwns();
if (0 < StackDepth())
NotifyWorkerThread();
SyncChannel::OnChannelErrorFromLink();
}
} // namespace ipc
} // namespace mozilla

View File

@@ -1,427 +0,0 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef ipc_glue_RPCChannel_h
#define ipc_glue_RPCChannel_h 1
#include <stdio.h>
#include <deque>
#include <stack>
#include <vector>
#include "base/basictypes.h"
#include "nsISupportsImpl.h"
#include "mozilla/ipc/SyncChannel.h"
#include "nsAutoPtr.h"
namespace mozilla {
namespace ipc {
//-----------------------------------------------------------------------------
class RPCChannel : public SyncChannel
{
friend class CxxStackFrame;
public:
// What happens if RPC calls race?
enum RacyRPCPolicy {
RRPError,
RRPChildWins,
RRPParentWins
};
class /*NS_INTERFACE_CLASS*/ RPCListener :
public SyncChannel::SyncListener
{
public:
virtual ~RPCListener() { }
virtual void OnChannelClose() = 0;
virtual void OnChannelError() = 0;
virtual Result OnMessageReceived(const Message& aMessage) = 0;
virtual void OnProcessingError(Result aError) = 0;
virtual int32_t GetProtocolTypeId() = 0;
virtual bool OnReplyTimeout() = 0;
virtual Result OnMessageReceived(const Message& aMessage,
Message*& aReply) = 0;
virtual Result OnCallReceived(const Message& aMessage,
Message*& aReply) = 0;
virtual void OnChannelConnected(int32_t peer_pid) {}
virtual void OnEnteredCxxStack()
{
NS_RUNTIMEABORT("default impl shouldn't be invoked");
}
virtual void OnExitedCxxStack()
{
NS_RUNTIMEABORT("default impl shouldn't be invoked");
}
virtual void OnEnteredCall()
{
NS_RUNTIMEABORT("default impl shouldn't be invoked");
}
virtual void OnExitedCall()
{
NS_RUNTIMEABORT("default impl shouldn't be invoked");
}
virtual RacyRPCPolicy MediateRPCRace(const Message& parent,
const Message& child)
{
return RRPChildWins;
}
virtual void ProcessRemoteNativeEventsInRPCCall() {};
};
RPCChannel(RPCListener* aListener);
virtual ~RPCChannel();
void Clear() MOZ_OVERRIDE;
// Make an RPC to the other side of the channel
bool Call(Message* msg, Message* reply);
// RPCChannel overrides these so that the async and sync messages
// can be counted against mStackFrames
virtual bool Send(Message* msg) MOZ_OVERRIDE;
virtual bool Send(Message* msg, Message* reply) MOZ_OVERRIDE;
// Return true iff this has code on the C++ stack.
bool IsOnCxxStack() const {
return !mCxxStackFrames.empty();
}
/**
* If there is a pending RPC message, process all pending messages.
*
* @note This method is used on Windows when we detect that an outbound
* OLE RPC call is being made to unblock the parent.
*/
void FlushPendingRPCQueue();
#ifdef OS_WIN
void ProcessNativeEventsInRPCCall();
static void NotifyGeckoEventDispatch();
protected:
bool WaitForNotify();
void SpinInternalEventLoop();
#endif
protected:
virtual void OnMessageReceivedFromLink(const Message& msg) MOZ_OVERRIDE;
virtual void OnChannelErrorFromLink() MOZ_OVERRIDE;
private:
// Called on worker thread only
RPCListener* Listener() const {
return static_cast<RPCListener*>(mListener.get());
}
virtual bool ShouldDeferNotifyMaybeError() const MOZ_OVERRIDE {
return IsOnCxxStack();
}
bool EventOccurred() const;
void MaybeUndeferIncall();
void EnqueuePendingMessages();
/**
* Process one deferred or pending message.
* @return true if a message was processed
*/
bool OnMaybeDequeueOne();
/**
* The "remote view of stack depth" can be different than the
* actual stack depth when there are out-of-turn replies. When we
* receive one, our actual RPC stack depth doesn't decrease, but
* the other side (that sent the reply) thinks it has. So, the
* "view" returned here is |stackDepth| minus the number of
* out-of-turn replies.
*
* Only called from the worker thread.
*/
size_t RemoteViewOfStackDepth(size_t stackDepth) const;
void Incall(const Message& call, size_t stackDepth);
void DispatchIncall(const Message& call);
// This helper class managed RPCChannel.mCxxStackDepth on behalf
// of RPCChannel. When the stack depth is incremented from zero
// to non-zero, it invokes an RPCChannel callback, and similarly
// for when the depth goes from non-zero to zero;
void EnteredCxxStack()
{
Listener()->OnEnteredCxxStack();
}
void ExitedCxxStack();
void EnteredCall()
{
Listener()->OnEnteredCall();
}
void ExitedCall()
{
Listener()->OnExitedCall();
}
enum Direction { IN_MESSAGE, OUT_MESSAGE };
struct RPCFrame {
RPCFrame(Direction direction, const Message* msg) :
mDirection(direction), mMsg(msg)
{ }
bool IsRPCIncall() const
{
return mMsg->is_rpc() && IN_MESSAGE == mDirection;
}
bool IsRPCOutcall() const
{
return mMsg->is_rpc() && OUT_MESSAGE == mDirection;
}
void Describe(int32_t* id, const char** dir, const char** sems,
const char** name) const
{
*id = mMsg->routing_id();
*dir = (IN_MESSAGE == mDirection) ? "in" : "out";
*sems = mMsg->is_rpc() ? "rpc" : mMsg->is_sync() ? "sync" : "async";
*name = mMsg->name();
}
Direction mDirection;
const Message* mMsg;
};
class MOZ_STACK_CLASS CxxStackFrame
{
public:
CxxStackFrame(RPCChannel& that, Direction direction,
const Message* msg) : mThat(that) {
mThat.AssertWorkerThread();
if (mThat.mCxxStackFrames.empty())
mThat.EnteredCxxStack();
mThat.mCxxStackFrames.push_back(RPCFrame(direction, msg));
const RPCFrame& frame = mThat.mCxxStackFrames.back();
if (frame.IsRPCIncall())
mThat.EnteredCall();
mThat.mSawRPCOutMsg |= frame.IsRPCOutcall();
}
~CxxStackFrame() {
bool exitingCall = mThat.mCxxStackFrames.back().IsRPCIncall();
mThat.mCxxStackFrames.pop_back();
bool exitingStack = mThat.mCxxStackFrames.empty();
// mListener could have gone away if Close() was called while
// RPCChannel code was still on the stack
if (!mThat.mListener)
return;
mThat.AssertWorkerThread();
if (exitingCall)
mThat.ExitedCall();
if (exitingStack)
mThat.ExitedCxxStack();
}
private:
RPCChannel& mThat;
// disable harmful methods
CxxStackFrame();
CxxStackFrame(const CxxStackFrame&);
CxxStackFrame& operator=(const CxxStackFrame&);
};
// Called from both threads
size_t StackDepth() const {
mMonitor->AssertCurrentThreadOwns();
return mStack.size();
}
void DebugAbort(const char* file, int line, const char* cond,
const char* why,
const char* type="rpc", bool reply=false) const;
// This method is only safe to call on the worker thread, or in a
// debugger with all threads paused.
void DumpRPCStack(const char* const pfx="") const;
//
// Queue of all incoming messages, except for replies to sync
// messages, which are delivered directly to the SyncChannel
// through its mRecvd member.
//
// If both this side and the other side are functioning correctly,
// the queue can only be in certain configurations. Let
//
// |A<| be an async in-message,
// |S<| be a sync in-message,
// |C<| be an RPC in-call,
// |R<| be an RPC reply.
//
// The queue can only match this configuration
//
// A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<)))
//
// The other side can send as many async messages |A<*| as it
// wants before sending us a blocking message.
//
// The first case is |S<|, a sync in-msg. The other side must be
// blocked, and thus can't send us any more messages until we
// process the sync in-msg.
//
// The second case is |C<|, an RPC in-call; the other side must be
// blocked. (There's a subtlety here: this in-call might have
// raced with an out-call, but we detect that with the mechanism
// below, |mRemoteStackDepth|, and races don't matter to the
// queue.)
//
// Final case, the other side replied to our most recent out-call
// |R<|. If that was the *only* out-call on our stack,
// |?{mStack.size() == 1}|, then other side "finished with us,"
// and went back to its own business. That business might have
// included sending any number of async message |A<*| until
// sending a blocking message |(S< | C<)|. If we had more than
// one RPC call on our stack, the other side *better* not have
// sent us another blocking message, because it's blocked on a
// reply from us.
//
typedef std::deque<Message> MessageQueue;
MessageQueue mPending;
// List of async and sync messages that have been received while waiting
// for an urgent reply, that need to be deferred until that reply has been
// received.
MessageQueue mNonUrgentDeferred;
//
// Stack of all the RPC out-calls on which this RPCChannel is
// awaiting a response.
//
std::stack<Message> mStack;
//
// Map of replies received "out of turn", because of RPC
// in-calls racing with replies to outstanding in-calls. See
// https://bugzilla.mozilla.org/show_bug.cgi?id=521929.
//
typedef std::map<size_t, Message> MessageMap;
MessageMap mOutOfTurnReplies;
//
// Stack of RPC in-calls that were deferred because of race
// conditions.
//
std::stack<Message> mDeferred;
//
// This is what we think the RPC stack depth is on the "other
// side" of this RPC channel. We maintain this variable so that
// we can detect racy RPC calls. With each RPC out-call sent, we
// send along what *we* think the stack depth of the remote side
// is *before* it will receive the RPC call.
//
// After sending the out-call, our stack depth is "incremented"
// by pushing that pending message onto mPending.
//
// Then when processing an in-call |c|, it must be true that
//
// mStack.size() == c.remoteDepth
//
// i.e., my depth is actually the same as what the other side
// thought it was when it sent in-call |c|. If this fails to
// hold, we have detected racy RPC calls.
//
// We then increment mRemoteStackDepth *just before* processing
// the in-call, since we know the other side is waiting on it, and
// decrement it *just after* finishing processing that in-call,
// since our response will pop the top of the other side's
// |mPending|.
//
// One nice aspect of this race detection is that it is symmetric;
// if one side detects a race, then the other side must also
// detect the same race.
//
size_t mRemoteStackDepthGuess;
// Approximation of Sync/RPCChannel-code frames on the C++ stack.
// It can only be interpreted as the implication
//
// !mCxxStackFrames.empty() => RPCChannel code on C++ stack
//
// This member is only accessed on the worker thread, and so is
// not protected by mMonitor. It is managed exclusively by the
// helper |class CxxStackFrame|.
std::vector<RPCFrame> mCxxStackFrames;
// Did we process an RPC out-call during this stack? Only
// meaningful in ExitedCxxStack(), from which this variable is
// reset.
bool mSawRPCOutMsg;
private:
//
// All dequeuing tasks require a single point of cancellation,
// which is handled via a reference-counted task.
//
class RefCountedTask
{
public:
RefCountedTask(CancelableTask* aTask)
: mTask(aTask) {}
~RefCountedTask() { delete mTask; }
void Run() { mTask->Run(); }
void Cancel() { mTask->Cancel(); }
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedTask)
private:
CancelableTask* mTask;
};
//
// Wrap an existing task which can be cancelled at any time
// without the wrapper's knowledge.
//
class DequeueTask : public Task
{
public:
DequeueTask(RefCountedTask* aTask) : mTask(aTask) {}
void Run() { mTask->Run(); }
private:
nsRefPtr<RefCountedTask> mTask;
};
// A task encapsulating dequeuing one pending task
nsRefPtr<RefCountedTask> mDequeueOneTask;
};
} // namespace ipc
} // namespace mozilla
#endif // ifndef ipc_glue_RPCChannel_h

View File

@@ -1,402 +0,0 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
*/
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "mozilla/DebugOnly.h"
#include "mozilla/ipc/SyncChannel.h"
#include "mozilla/ipc/RPCChannel.h"
#include "nsDebug.h"
#include "nsTraceRefcnt.h"
using mozilla::MonitorAutoLock;
template<>
struct RunnableMethodTraits<mozilla::ipc::SyncChannel>
{
static void RetainCallee(mozilla::ipc::SyncChannel* obj) { }
static void ReleaseCallee(mozilla::ipc::SyncChannel* obj) { }
};
namespace mozilla {
namespace ipc {
const int32_t SyncChannel::kNoTimeout = INT32_MIN;
SyncChannel::SyncChannel(SyncListener* aListener)
: AsyncChannel(aListener)
#ifdef OS_WIN
, mTopFrame(nullptr)
#endif
, mPendingReply(0)
, mProcessingSyncMessage(false)
, mNextSeqno(0)
, mInTimeoutSecondHalf(false)
, mTimeoutMs(kNoTimeout)
{
MOZ_COUNT_CTOR(SyncChannel);
#ifdef OS_WIN
mEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr);
NS_ASSERTION(mEvent, "CreateEvent failed! Nothing is going to work!");
#endif
}
SyncChannel::~SyncChannel()
{
MOZ_COUNT_DTOR(SyncChannel);
#ifdef OS_WIN
CloseHandle(mEvent);
#endif
}
// static
bool SyncChannel::sIsPumpingMessages = false;
bool
SyncChannel::EventOccurred()
{
AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns();
NS_ABORT_IF_FALSE(AwaitingSyncReply(), "not in wait loop");
return !Connected() ||
mRecvd.type() != 0 ||
mRecvd.is_reply_error() ||
!mUrgent.empty();
}
bool
SyncChannel::ProcessUrgentMessages()
{
while (!mUrgent.empty()) {
Message msg = mUrgent.front();
mUrgent.pop_front();
MOZ_ASSERT(msg.priority() == IPC::Message::PRIORITY_HIGH);
{
MOZ_ASSERT(msg.is_sync() || msg.is_rpc());
MonitorAutoUnlock unlock(*mMonitor);
SyncChannel::OnDispatchMessage(msg);
}
// Check state that could have been changed during dispatch.
if (!Connected()) {
ReportConnectionError("SyncChannel");
return false;
}
// We should not have received another synchronous reply,
// because we cannot send synchronous messages in this state.
MOZ_ASSERT(mRecvd.type() == 0);
}
return true;
}
bool
SyncChannel::Send(Message* _msg, Message* reply)
{
if (mPendingReply) {
// This is a temporary hack in place, for e10s CPOWs, until bug 901789
// and the new followup RPC protocol land. Eventually this will become
// an assert again. See bug 900062 for details.
NS_ERROR("Nested sync messages are not supported");
return false;
}
nsAutoPtr<Message> msg(_msg);
AssertWorkerThread();
mMonitor->AssertNotCurrentThreadOwns();
NS_ABORT_IF_FALSE(!ProcessingSyncMessage(),
"violation of sync handler invariant");
NS_ABORT_IF_FALSE(msg->is_sync(), "can only Send() sync messages here");
#ifdef OS_WIN
SyncStackFrame frame(this, false);
#endif
msg->set_seqno(NextSeqno());
MonitorAutoLock lock(*mMonitor);
if (!Connected()) {
ReportConnectionError("SyncChannel");
return false;
}
mPendingReply = msg->type() + 1;
DebugOnly<int32_t> msgSeqno = msg->seqno();
mLink->SendMessage(msg.forget());
while (1) {
// if a handler invoked by *Dispatch*() spun a nested event
// loop, and the connection was broken during that loop, we
// might have already processed the OnError event. if so,
// trying another loop iteration will be futile because
// channel state will have been cleared
if (!Connected()) {
ReportConnectionError("SyncChannel");
return false;
}
while (!EventOccurred()) {
bool maybeTimedOut = !SyncChannel::WaitForNotify();
if (EventOccurred())
break;
// we might have received a "subtly deferred" message
// in a nested loop that it's now time to process
if (!mUrgent.empty())
break;
if (maybeTimedOut && !ShouldContinueFromTimeout())
return false;
}
if (!Connected()) {
ReportConnectionError("SyncChannel");
return false;
}
// Process all urgent messages. We forbid nesting synchronous sends,
// so mPendingReply etc will still be valid.
if (!ProcessUrgentMessages())
return false;
if (mRecvd.is_reply_error() || mRecvd.type() != 0) {
// we just received a synchronous message from the other side.
// If it's not the reply we were awaiting, there's a serious
// error: either a mistimed/malformed message or a sync in-message
// that raced with our sync out-message.
// (NB: IPDL prevents the latter from occuring in actor code)
// FIXME/cjones: real error handling
NS_ABORT_IF_FALSE(mRecvd.is_sync() && mRecvd.is_reply() &&
(mRecvd.is_reply_error() ||
(mPendingReply == mRecvd.type() &&
msgSeqno == mRecvd.seqno())),
"unexpected sync message");
mPendingReply = 0;
if (mRecvd.is_reply_error())
return false;
*reply = TakeReply();
MOZ_ASSERT(mUrgent.empty());
return true;
}
}
return true;
}
void
SyncChannel::OnDispatchMessage(const Message& msg)
{
AssertWorkerThread();
NS_ABORT_IF_FALSE(msg.is_sync() || msg.is_rpc(), "only sync messages here");
NS_ABORT_IF_FALSE(!msg.is_reply(), "wasn't awaiting reply");
Message* reply = 0;
mProcessingSyncMessage = true;
Result rv;
if (msg.is_sync())
rv = static_cast<SyncListener*>(mListener.get())->OnMessageReceived(msg, reply);
else
rv = static_cast<RPCChannel::RPCListener*>(mListener.get())->OnCallReceived(msg, reply);
mProcessingSyncMessage = false;
if (!MaybeHandleError(rv, "SyncChannel")) {
// FIXME/cjones: error handling; OnError()?
delete reply;
reply = new Message();
if (msg.is_sync())
reply->set_sync();
else if (msg.is_rpc())
reply->set_rpc();
reply->set_reply();
reply->set_reply_error();
}
reply->set_seqno(msg.seqno());
{
MonitorAutoLock lock(*mMonitor);
if (ChannelConnected == mChannelState)
mLink->SendMessage(reply);
}
}
//
// The methods below run in the context of the link thread, and can proxy
// back to the methods above
//
void
SyncChannel::OnMessageReceivedFromLink(const Message& msg)
{
AssertLinkThread();
mMonitor->AssertCurrentThreadOwns();
if (MaybeInterceptSpecialIOMessage(msg))
return;
if (msg.priority() == IPC::Message::PRIORITY_HIGH) {
// If the message is high priority, we skip the worker entirely, and
// wake up the loop that's spinning for a reply.
if (!AwaitingSyncReply()) {
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, msg));
} else {
mUrgent.push_back(msg);
NotifyWorkerThread();
}
return;
}
if (!msg.is_sync()) {
AsyncChannel::OnMessageReceivedFromLink(msg);
return;
}
if (!AwaitingSyncReply()) {
// wake up the worker, there's work to do
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, msg));
}
else {
// let the worker know a new sync message has arrived
mRecvd = msg;
NotifyWorkerThread();
}
}
void
SyncChannel::OnChannelErrorFromLink()
{
AssertLinkThread();
mMonitor->AssertCurrentThreadOwns();
if (AwaitingSyncReply())
NotifyWorkerThread();
AsyncChannel::OnChannelErrorFromLink();
}
//
// Synchronization between worker and IO threads
//
namespace {
bool
IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout)
{
return (aTimeout != PR_INTERVAL_NO_TIMEOUT) &&
(aTimeout <= (PR_IntervalNow() - aStart));
}
} // namespace <anon>
bool
SyncChannel::ShouldContinueFromTimeout()
{
AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns();
bool cont;
{
MonitorAutoUnlock unlock(*mMonitor);
cont = static_cast<SyncListener*>(mListener.get())->OnReplyTimeout();
}
static enum { UNKNOWN, NOT_DEBUGGING, DEBUGGING } sDebuggingChildren = UNKNOWN;
if (sDebuggingChildren == UNKNOWN) {
sDebuggingChildren = getenv("MOZ_DEBUG_CHILD_PROCESS") ? DEBUGGING : NOT_DEBUGGING;
}
if (sDebuggingChildren == DEBUGGING) {
return true;
}
if (!cont) {
// NB: there's a sublety here. If parents were allowed to
// send sync messages to children, then it would be possible
// for this synchronous close-on-timeout to race with async
// |OnMessageReceived| tasks arriving from the child, posted
// to the worker thread's event loop. This would complicate
// cleanup of the *Channel. But since IPDL forbids this (and
// since it doesn't support children timing out on parents),
// the parent can only block on RPC messages to the child, and
// in that case arriving async messages are enqueued to the
// RPC channel's special queue. They're then ignored because
// the channel state changes to ChannelTimeout
// (i.e. !Connected).
SynchronouslyClose();
mChannelState = ChannelTimeout;
}
return cont;
}
bool
SyncChannel::WaitResponse(bool aWaitTimedOut)
{
if (aWaitTimedOut) {
if (mInTimeoutSecondHalf) {
// We've really timed out this time
return false;
}
// Try a second time
mInTimeoutSecondHalf = true;
} else {
mInTimeoutSecondHalf = false;
}
return true;
}
// Windows versions of the following two functions live in
// WindowsMessageLoop.cpp.
#ifndef OS_WIN
bool
SyncChannel::WaitForNotify()
{
PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ?
PR_INTERVAL_NO_TIMEOUT :
PR_MillisecondsToInterval(mTimeoutMs);
// XXX could optimize away this syscall for "no timeout" case if desired
PRIntervalTime waitStart = PR_IntervalNow();
mMonitor->Wait(timeout);
// if the timeout didn't expire, we know we received an event.
// The converse is not true.
return WaitResponse(IsTimeoutExpired(waitStart, timeout));
}
void
SyncChannel::NotifyWorkerThread()
{
mMonitor->Notify();
}
#endif // ifndef OS_WIN
} // namespace ipc
} // namespace mozilla

View File

@@ -1,190 +0,0 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
*/
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef ipc_glue_SyncChannel_h
#define ipc_glue_SyncChannel_h 1
#include "mozilla/ipc/AsyncChannel.h"
#include <math.h>
namespace mozilla {
namespace ipc {
//-----------------------------------------------------------------------------
class SyncChannel : public AsyncChannel
{
protected:
typedef IPC::Message::msgid_t msgid_t;
public:
static const int32_t kNoTimeout;
class /*NS_INTERFACE_CLASS*/ SyncListener :
public AsyncChannel::AsyncListener
{
public:
virtual ~SyncListener() { }
virtual void OnChannelClose() = 0;
virtual void OnChannelError() = 0;
virtual Result OnMessageReceived(const Message& aMessage) = 0;
virtual void OnProcessingError(Result aError) = 0;
virtual int32_t GetProtocolTypeId() = 0;
virtual bool OnReplyTimeout() = 0;
virtual Result OnMessageReceived(const Message& aMessage,
Message*& aReply) = 0;
virtual void OnChannelConnected(int32_t peer_pid) {}
};
SyncChannel(SyncListener* aListener);
virtual ~SyncChannel();
virtual bool Send(Message* msg) MOZ_OVERRIDE {
return AsyncChannel::Send(msg);
}
// Synchronously send |msg| (i.e., wait for |reply|)
virtual bool Send(Message* msg, Message* reply);
// Set channel timeout value. Since this is broken up into
// two period, the minimum timeout value is 2ms.
void SetReplyTimeoutMs(int32_t aTimeoutMs) {
AssertWorkerThread();
mTimeoutMs = (aTimeoutMs <= 0) ? kNoTimeout :
// timeouts are broken up into two periods
(int32_t)ceil((double)aTimeoutMs/2.0);
}
static bool IsPumpingMessages() {
return sIsPumpingMessages;
}
static void SetIsPumpingMessages(bool aIsPumping) {
sIsPumpingMessages = aIsPumping;
}
#ifdef OS_WIN
public:
struct MOZ_STACK_CLASS SyncStackFrame
{
SyncStackFrame(SyncChannel* channel, bool rpc);
~SyncStackFrame();
bool mRPC;
bool mSpinNestedEvents;
bool mListenerNotified;
SyncChannel* mChannel;
/* the previous stack frame for this channel */
SyncStackFrame* mPrev;
/* the previous stack frame on any channel */
SyncStackFrame* mStaticPrev;
};
friend struct SyncChannel::SyncStackFrame;
static bool IsSpinLoopActive() {
for (SyncStackFrame* frame = sStaticTopFrame;
frame;
frame = frame->mPrev) {
if (frame->mSpinNestedEvents)
return true;
}
return false;
}
protected:
/* the deepest sync stack frame for this channel */
SyncStackFrame* mTopFrame;
/* the deepest sync stack frame on any channel */
static SyncStackFrame* sStaticTopFrame;
#endif // OS_WIN
protected:
// Executed on the link thread
// Override the AsyncChannel handler so we can dispatch sync messages
virtual void OnMessageReceivedFromLink(const Message& msg) MOZ_OVERRIDE;
virtual void OnChannelErrorFromLink() MOZ_OVERRIDE;
// Executed on the worker thread
bool ProcessingSyncMessage() const {
return mProcessingSyncMessage;
}
void OnDispatchMessage(const Message& aMsg);
//
// Return true if the wait ended because a notification was
// received. That is, true => event received.
//
// Return false if the time elapsed from when we started the
// process of waiting until afterwards exceeded the currently
// allotted timeout. That *DOES NOT* mean false => "no event" (==
// timeout); there are many circumstances that could cause the
// measured elapsed time to exceed the timeout EVEN WHEN we were
// notified.
//
// So in sum: true is a meaningful return value; false isn't,
// necessarily.
//
bool WaitForNotify();
bool ShouldContinueFromTimeout();
// Executed on the IO thread.
void NotifyWorkerThread();
// On both
bool AwaitingSyncReply() const {
mMonitor->AssertCurrentThreadOwns();
return mPendingReply != 0;
}
Message TakeReply() {
Message reply = mRecvd;
mRecvd = Message();
return reply;
}
int32_t NextSeqno() {
AssertWorkerThread();
return mChild ? --mNextSeqno : ++mNextSeqno;
}
msgid_t mPendingReply;
bool mProcessingSyncMessage;
Message mRecvd;
// This is only accessed from the worker thread; seqno's are
// completely opaque to the IO thread.
int32_t mNextSeqno;
static bool sIsPumpingMessages;
// Timeout periods are broken up in two to prevent system suspension from
// triggering an abort. This method (called by WaitForNotify with a 'did
// timeout' flag) decides if we should wait again for half of mTimeoutMs
// or give up.
bool WaitResponse(bool aWaitTimedOut);
bool mInTimeoutSecondHalf;
int32_t mTimeoutMs;
std::deque<Message> mUrgent;
#ifdef OS_WIN
HANDLE mEvent;
#endif
private:
bool EventOccurred();
bool ProcessUrgentMessages();
};
} // namespace ipc
} // namespace mozilla
#endif // ifndef ipc_glue_SyncChannel_h

View File

@@ -8,7 +8,7 @@
#include "mozilla/DebugOnly.h"
#include "WindowsMessageLoop.h"
#include "RPCChannel.h"
#include "MessageChannel.h"
#include "nsAutoPtr.h"
#include "nsServiceManagerUtils.h"
@@ -115,10 +115,10 @@ DeferredMessageHook(int nCode,
// Only run deferred messages if all of these conditions are met:
// 1. The |nCode| indicates that this hook should do something.
// 2. We have deferred messages to run.
// 3. We're not being called from the PeekMessage within the WaitForNotify
// function (indicated with SyncChannel::IsPumpingMessages). We really
// 3. We're not being called from the PeekMessage within the WaitFor*Notify
// function (indicated with MessageChannel::IsPumpingMessages). We really
// only want to run after returning to the main event loop.
if (nCode >= 0 && gDeferredMessages && !SyncChannel::IsPumpingMessages()) {
if (nCode >= 0 && gDeferredMessages && !MessageChannel::IsPumpingMessages()) {
NS_ASSERTION(gDeferredGetMsgHook && gDeferredCallWndProcHook,
"These hooks must be set if we're being called!");
NS_ASSERTION(gDeferredMessages->Length(), "No deferred messages?!");
@@ -587,7 +587,7 @@ TimeoutHasExpired(const TimeoutData& aData)
} // anonymous namespace
RPCChannel::SyncStackFrame::SyncStackFrame(SyncChannel* channel, bool rpc)
MessageChannel::SyncStackFrame::SyncStackFrame(MessageChannel* channel, bool rpc)
: mRPC(rpc)
, mSpinNestedEvents(false)
, mListenerNotified(false)
@@ -605,7 +605,7 @@ RPCChannel::SyncStackFrame::SyncStackFrame(SyncChannel* channel, bool rpc)
}
}
RPCChannel::SyncStackFrame::~SyncStackFrame()
MessageChannel::SyncStackFrame::~SyncStackFrame()
{
NS_ASSERTION(this == mChannel->mTopFrame,
"Mismatched RPC stack frames");
@@ -622,28 +622,28 @@ RPCChannel::SyncStackFrame::~SyncStackFrame()
}
}
SyncChannel::SyncStackFrame* SyncChannel::sStaticTopFrame;
MessageChannel::SyncStackFrame* MessageChannel::sStaticTopFrame;
// nsAppShell's notification that gecko events are being processed.
// If we are here and there is an RPC Incall active, we are spinning
// a nested gecko event loop. In which case the remote process needs
// to know about it.
void /* static */
RPCChannel::NotifyGeckoEventDispatch()
MessageChannel::NotifyGeckoEventDispatch()
{
// sStaticTopFrame is only valid for RPC channels
if (!sStaticTopFrame || sStaticTopFrame->mListenerNotified)
return;
sStaticTopFrame->mListenerNotified = true;
RPCChannel* channel = static_cast<RPCChannel*>(sStaticTopFrame->mChannel);
MessageChannel* channel = static_cast<MessageChannel*>(sStaticTopFrame->mChannel);
channel->Listener()->ProcessRemoteNativeEventsInRPCCall();
}
// invoked by the module that receives the spin event loop
// message.
void
RPCChannel::ProcessNativeEventsInRPCCall()
MessageChannel::ProcessNativeEventsInRPCCall()
{
if (!mTopFrame) {
NS_ERROR("Spin logic error: no RPC frame");
@@ -653,14 +653,14 @@ RPCChannel::ProcessNativeEventsInRPCCall()
mTopFrame->mSpinNestedEvents = true;
}
// Spin loop is called in place of WaitForNotify when modal ui is being shown
// Spin loop is called in place of WaitFor*Notify when modal ui is being shown
// in a child. There are some intricacies in using it however. Spin loop is
// enabled for a particular RPC frame by the client calling
// RPCChannel::ProcessNativeEventsInRPCCall().
// MessageChannel::ProcessNativeEventsInRPCCall().
// This call can be nested for multiple RPC frames in a single plugin or
// multiple unrelated plugins.
void
RPCChannel::SpinInternalEventLoop()
MessageChannel::SpinInternalEventLoop()
{
if (mozilla::PaintTracker::IsPainting()) {
NS_RUNTIMEABORT("Don't spin an event loop while painting.");
@@ -694,7 +694,7 @@ RPCChannel::SpinInternalEventLoop()
NS_ERROR("WM_QUIT received in SpinInternalEventLoop!");
} else {
TranslateMessage(&msg);
DispatchMessageW(&msg);
::DispatchMessageW(&msg);
return;
}
}
@@ -715,7 +715,7 @@ RPCChannel::SpinInternalEventLoop()
}
bool
SyncChannel::WaitForNotify()
MessageChannel::WaitForSyncNotify()
{
mMonitor->AssertCurrentThreadOwns();
@@ -742,10 +742,10 @@ SyncChannel::WaitForNotify()
}
// Setup deferred processing of native events while we wait for a response.
NS_ASSERTION(!SyncChannel::IsPumpingMessages(),
NS_ASSERTION(!MessageChannel::IsPumpingMessages(),
"Shouldn't be pumping already!");
SyncChannel::SetIsPumpingMessages(true);
MessageChannel::SetIsPumpingMessages(true);
HHOOK windowHook = SetWindowsHookEx(WH_CALLWNDPROC, CallWindowProcedureHook,
nullptr, gUIThreadId);
NS_ASSERTION(windowHook, "Failed to set hook!");
@@ -831,19 +831,19 @@ SyncChannel::WaitForNotify()
KillTimer(nullptr, timerId);
}
SyncChannel::SetIsPumpingMessages(false);
MessageChannel::SetIsPumpingMessages(false);
return WaitResponse(timedout);
}
bool
RPCChannel::WaitForNotify()
MessageChannel::WaitForRPCNotify()
{
mMonitor->AssertCurrentThreadOwns();
if (!StackDepth()) {
if (!RPCStackDepth()) {
// There is currently no way to recover from this condition.
NS_RUNTIMEABORT("StackDepth() is 0 in call to RPCChannel::WaitForNotify!");
NS_RUNTIMEABORT("StackDepth() is 0 in call to MessageChannel::WaitForNotify!");
}
// Initialize global objects used in deferred messaging.
@@ -862,11 +862,11 @@ RPCChannel::WaitForNotify()
// windowHook is used as a flag variable for the loop below: if it is set
// and we start to spin a nested event loop, we need to clear the hook and
// process deferred/pending messages.
// If windowHook is nullptr, SyncChannel::IsPumpingMessages should be false.
// If windowHook is nullptr, MessageChannel::IsPumpingMessages should be false.
HHOOK windowHook = nullptr;
while (1) {
NS_ASSERTION((!!windowHook) == SyncChannel::IsPumpingMessages(),
NS_ASSERTION((!!windowHook) == MessageChannel::IsPumpingMessages(),
"windowHook out of sync with reality");
if (mTopFrame->mSpinNestedEvents) {
@@ -880,7 +880,7 @@ RPCChannel::WaitForNotify()
}
// Used by widget to assert on incoming native events
SyncChannel::SetIsPumpingMessages(false);
MessageChannel::SetIsPumpingMessages(false);
// Unhook any neutered windows procedures so messages can be delievered
// normally.
@@ -897,7 +897,7 @@ RPCChannel::WaitForNotify()
}
if (!windowHook) {
SyncChannel::SetIsPumpingMessages(true);
MessageChannel::SetIsPumpingMessages(true);
windowHook = SetWindowsHookEx(WH_CALLWNDPROC, CallWindowProcedureHook,
nullptr, gUIThreadId);
NS_ASSERTION(windowHook, "Failed to set hook!");
@@ -939,7 +939,7 @@ RPCChannel::WaitForNotify()
break;
}
// See SyncChannel's WaitForNotify for details.
// See MessageChannel's WaitFor*Notify for details.
bool haveSentMessagesPending =
(HIWORD(GetQueueStatus(QS_SENDMESSAGE)) & QS_SENDMESSAGE) != 0;
@@ -971,13 +971,13 @@ RPCChannel::WaitForNotify()
}
}
SyncChannel::SetIsPumpingMessages(false);
MessageChannel::SetIsPumpingMessages(false);
return WaitResponse(timedout);
}
void
SyncChannel::NotifyWorkerThread()
MessageChannel::NotifyWorkerThread()
{
mMonitor->AssertCurrentThreadOwns();
NS_ASSERTION(mEvent, "No signal event to set, this is really bad!");

View File

@@ -12,7 +12,6 @@ EXPORTS += [
]
EXPORTS.mozilla.ipc += [
'AsyncChannel.h',
'BrowserProcessSubThread.h',
'CrossProcessMutex.h',
'FileDescriptor.h',
@@ -20,15 +19,15 @@ EXPORTS.mozilla.ipc += [
'GeckoChildProcessHost.h',
'IOThreadChild.h',
'InputStreamUtils.h',
'MessageChannel.h',
'MessageLink.h',
'ProcessChild.h',
'ProtocolUtils.h',
'RPCChannel.h',
'ScopedXREEmbed.h',
'SharedMemory.h',
'SharedMemoryBasic.h',
'SharedMemorySysV.h',
'Shmem.h',
'SyncChannel.h',
'Transport.h',
'URIUtils.h',
]
@@ -78,21 +77,20 @@ EXPORTS.ipc += [
]
CPP_SOURCES += [
'AsyncChannel.cpp',
'BrowserProcessSubThread.cpp',
'FileDescriptor.cpp',
'FileDescriptorUtils.cpp',
'GeckoChildProcessHost.cpp',
'InputStreamUtils.cpp',
'MessageChannel.cpp',
'MessageLink.cpp',
'MessagePump.cpp',
'ProcessChild.cpp',
'ProtocolUtils.cpp',
'RPCChannel.cpp',
'ScopedXREEmbed.cpp',
'SharedMemory.cpp',
'Shmem.cpp',
'StringUtil.cpp',
'SyncChannel.cpp',
'URIUtils.cpp',
]

View File

@@ -277,13 +277,13 @@ def _putInNamespaces(cxxthing, namespaces):
def _sendPrefix(msgtype):
"""Prefix of the name of the C++ method that sends |msgtype|."""
if msgtype.isRpc():
if msgtype.isRpc() or msgtype.isUrgent():
return 'Call'
return 'Send'
def _recvPrefix(msgtype):
"""Prefix of the name of the C++ method that handles |msgtype|."""
if msgtype.isRpc():
if msgtype.isRpc() or msgtype.isUrgent():
return 'Answer'
return 'Recv'
@@ -981,15 +981,7 @@ class MessageDecl(ipdl.ast.MessageDecl):
##--------------------------------------------------
def _semsToChannelParts(sems):
if ipdl.ast.ASYNC == sems: channel = 'AsyncChannel'
elif ipdl.ast.SYNC == sems: channel = 'SyncChannel'
elif ipdl.ast.RPC == sems: channel = 'RPCChannel'
return [ 'mozilla', 'ipc', channel ]
def _semsToListener(sems):
return { ipdl.ast.ASYNC: 'AsyncListener',
ipdl.ast.SYNC: 'SyncListener',
ipdl.ast.RPC: 'RPCListener' }[sems]
return [ 'mozilla', 'ipc', 'MessageChannel' ]
def _usesShmem(p):
for md in p.messageDecls:
@@ -1032,11 +1024,8 @@ class Protocol(ipdl.ast.Protocol):
def channelHeaderFile(self):
return '/'.join(_semsToChannelParts(self.sendSems())) +'.h'
def listenerName(self):
return _semsToListener(self.sendSems())
def fqListenerName(self):
return self.channelName() +'::'+ _semsToListener(self.sendSems())
return 'mozilla::ipc::MessageListener'
def managerInterfaceType(self, ptr=0):
return Type('mozilla::ipc::IProtocolManager',
@@ -1565,14 +1554,14 @@ class _GenerateProtocolCode(ipdl.ast.Visitor):
typedefs = self.protocol.decl.cxxtypedefs
for md in p.messageDecls:
ns.addstmts([
_generateMessageClass(md, md.msgClass(), md.msgId(),
_generateMessageClass(md.msgClass(), md.msgId(),
typedefs, md.prettyMsgName(p.name+'::'),
md.decl.type.compress),
Whitespace.NL ])
if md.hasReply():
ns.addstmts([
_generateMessageClass(
md, md.replyClass(), md.replyId(),
md.replyClass(), md.replyId(),
typedefs, md.prettyReplyName(p.name+'::'),
md.decl.type.compress),
Whitespace.NL ])
@@ -1764,7 +1753,7 @@ class _GenerateProtocolCode(ipdl.ast.Visitor):
##--------------------------------------------------
def _generateMessageClass(md, clsname, msgid, typedefs, prettyName, compress):
def _generateMessageClass(clsname, msgid, typedefs, prettyName, compress):
cls = Class(name=clsname, inherits=[ Inherit(Type('IPC::Message')) ])
cls.addstmt(Label.PRIVATE)
cls.addstmts(typedefs)
@@ -1781,16 +1770,12 @@ def _generateMessageClass(md, clsname, msgid, typedefs, prettyName, compress):
compression = ExprVar('COMPRESSION_ENABLED')
else:
compression = ExprVar('COMPRESSION_NONE')
if md.decl.type.isUrgent():
priority = 'PRIORITY_HIGH'
else:
priority = 'PRIORITY_NORMAL'
ctor = ConstructorDefn(
ConstructorDecl(clsname),
memberinits=[ ExprMemberInit(ExprVar('IPC::Message'),
[ ExprVar('MSG_ROUTING_NONE'),
ExprVar('ID'),
ExprVar(priority),
ExprVar('IPC::Message::PRIORITY_NORMAL'),
compression,
ExprLiteral.String(prettyName) ]) ])
cls.addstmts([ ctor, Whitespace.NL ])
@@ -2471,7 +2456,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
Typedef(Type(self.protocol.channelName()), 'Channel'),
Typedef(Type(self.protocol.fqListenerName()), 'ChannelListener'),
Typedef(Type('base::ProcessHandle'), 'ProcessHandle'),
Typedef(Type('mozilla::ipc::AsyncChannel'), 'AsyncChannel'),
Typedef(Type('mozilla::ipc::MessageChannel'), 'MessageChannel'),
Typedef(Type('mozilla::ipc::SharedMemory'), 'SharedMemory'),
Typedef(Type('mozilla::ipc::Trigger'), 'Trigger')
]
@@ -2827,9 +2812,9 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
Param(Type('MessageLoop', ptr=True),
aThreadVar.name,
default=ExprLiteral.NULL),
Param(Type('AsyncChannel::Side'),
Param(Type('mozilla::ipc::Side'),
sidevar.name,
default=ExprVar('Channel::Unknown')) ],
default=ExprVar('mozilla::ipc::UnknownSide')) ],
ret=Type.BOOL))
openmeth.addstmts([
@@ -2841,20 +2826,20 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
openmeth,
Whitespace.NL ])
# Open(AsyncChannel *, MessageLoop *, Side)
# Open(MessageChannel *, MessageLoop *, Side)
aChannel = ExprVar('aChannel')
aMessageLoop = ExprVar('aMessageLoop')
sidevar = ExprVar('aSide')
openmeth = MethodDefn(
MethodDecl(
'Open',
params=[ Decl(Type('AsyncChannel', ptr=True),
params=[ Decl(Type('MessageChannel', ptr=True),
aChannel.name),
Param(Type('MessageLoop', ptr=True),
aMessageLoop.name),
Param(Type('AsyncChannel::Side'),
Param(Type('mozilla::ipc::Side'),
sidevar.name,
default=ExprVar('Channel::Unknown')) ],
default=ExprVar('mozilla::ipc::UnknownSide')) ],
ret=Type.BOOL))
openmeth.addstmts([
@@ -2979,6 +2964,13 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
method = MethodDefn(MethodDecl(name, virtual=True,
params=params, ret=_Result.Type()))
if not switch:
crash = StmtExpr(ExprCall(ExprVar('MOZ_ASSUME_UNREACHABLE'),
args=[ExprLiteral.String('message protocol not supported')]))
method.addstmts([crash, StmtReturn(_Result.NotKnown)])
return method
if dispatches:
routevar = ExprVar('__route')
routedecl = StmtDecl(
@@ -3032,18 +3024,20 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
hasReply=0, dispatches=dispatches),
Whitespace.NL
])
if toplevel.talksSync():
self.cls.addstmts([
makeHandlerMethod('OnMessageReceived', self.syncSwitch,
hasReply=1, dispatches=dispatches),
Whitespace.NL
])
if toplevel.talksRpc():
self.cls.addstmts([
makeHandlerMethod('OnCallReceived', self.rpcSwitch,
hasReply=1, dispatches=dispatches),
Whitespace.NL
])
if not toplevel.talksRpc():
self.rpcSwitch = None
if not toplevel.talksSync():
self.syncSwitch = None
self.cls.addstmts([
makeHandlerMethod('OnMessageReceived', self.syncSwitch,
hasReply=1, dispatches=dispatches),
Whitespace.NL
])
self.cls.addstmts([
makeHandlerMethod('OnCallReceived', self.rpcSwitch,
hasReply=1, dispatches=dispatches),
Whitespace.NL
])
destroysubtreevar = ExprVar('DestroySubtree')
deallocsubtreevar = ExprVar('DeallocSubtree')
@@ -3085,7 +3079,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
self.cls.addstmts([ ontimeout, Whitespace.NL ])
# C++-stack-related methods
if ptype.isToplevel() and toplevel.talksRpc():
if ptype.isToplevel():
# OnEnteredCxxStack()
onentered = MethodDefn(MethodDecl('OnEnteredCxxStack'))
onentered.addstmt(StmtReturn(ExprCall(p.enteredCxxStackVar())))
@@ -3470,7 +3464,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
virtual=1))
getchannel = MethodDefn(MethodDecl(
p.getChannelMethod().name,
ret=Type('AsyncChannel', ptr=1),
ret=Type('MessageChannel', ptr=1),
virtual=1))
if p.decl.type.isToplevel():
@@ -4909,6 +4903,9 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
if md.decl.type.isSync():
stmts.append(StmtExpr(ExprCall(
ExprSelect(var, '->', 'set_sync'))))
elif md.decl.type.isUrgent():
stmts.append(StmtExpr(ExprCall(
ExprSelect(var, '->', 'set_urgent'))))
elif md.decl.type.isRpc():
stmts.append(StmtExpr(ExprCall(
ExprSelect(var, '->', 'set_rpc'))))

View File

@@ -204,14 +204,14 @@ class IPDLType(Type):
def isAsync(self): return self.sendSemantics is ASYNC
def isSync(self): return self.sendSemantics is SYNC
def isRpc(self): return self.sendSemantics is RPC or self.sendSemantics is URGENT
def isRpc(self): return self.sendSemantics is RPC
def isUrgent(self): return self.sendSemantics is URGENT
def talksAsync(self): return True
def talksSync(self): return self.isSync() or self.isRpc()
def talksRpc(self): return self.isRpc()
def hasReply(self): return self.isSync() or self.isRpc()
def hasReply(self): return self.isSync() or self.isRpc() or self.isUrgent()
def needsMoreJuiceThan(self, o):
return (o.isAsync() and not self.isAsync()
@@ -1459,6 +1459,12 @@ class CheckTypes(TcheckVisitor):
"sync parent-to-child messages are verboten (here, message `%s' in protocol `%s')",
mname, pname)
if mtype.isUrgent() and (mtype.isIn() or mtype.isInout()):
self.error(
loc,
"urgent child-to-parent messages are verboten (here, message `%s' in protocol `%s')",
mname, pname)
if mtype.needsMoreJuiceThan(ptype):
self.error(
loc,

View File

@@ -3,6 +3,8 @@ include PTestDataStructuresCommon;
include "mozilla/GfxMessageUtils.h";
include "mozilla/GfxMessageUtils.h";
namespace mozilla {
namespace _ipdltest {

View File

@@ -35,7 +35,7 @@ TestBridgeMainParent::AllocPTestBridgeMainSubParent(Transport* transport,
}
nsAutoPtr<TestBridgeMainSubParent> a(new TestBridgeMainSubParent(transport));
if (!a->Open(transport, h, XRE_GetIOMessageLoop(), AsyncChannel::Parent)) {
if (!a->Open(transport, h, XRE_GetIOMessageLoop(), ipc::ParentSide)) {
return nullptr;
}
a.forget();
@@ -187,7 +187,7 @@ TestBridgeSubChild::AllocPTestBridgeMainSubChild(Transport* transport,
}
nsAutoPtr<TestBridgeMainSubChild> a(new TestBridgeMainSubChild(transport));
if (!a->Open(transport, h, XRE_GetIOMessageLoop(), AsyncChannel::Child)) {
if (!a->Open(transport, h, XRE_GetIOMessageLoop(), ipc::ChildSide)) {
return false;
}

View File

@@ -60,7 +60,7 @@ OpenParent(TestOpensOpenedParent* aParent,
// Messages will be delivered to this thread's message loop
// instead of the main thread's.
if (!aParent->Open(aTransport, aOtherProcess,
XRE_GetIOMessageLoop(), AsyncChannel::Parent))
XRE_GetIOMessageLoop(), ipc::ParentSide))
fail("opening Parent");
}
@@ -169,7 +169,7 @@ OpenChild(TestOpensOpenedChild* aChild,
// Messages will be delivered to this thread's message loop
// instead of the main thread's.
if (!aChild->Open(aTransport, aOtherProcess,
XRE_GetIOMessageLoop(), AsyncChannel::Child))
XRE_GetIOMessageLoop(), ipc::ChildSide))
fail("opening Child");
// Kick off the unit tests

View File

@@ -2,7 +2,7 @@
#include "IPDLUnitTests.h" // fail etc.
using mozilla::ipc::RPCChannel;
using mozilla::ipc::MessageChannel;
template<>
struct RunnableMethodTraits<mozilla::_ipdltest::TestRPCRacesParent>
@@ -15,12 +15,12 @@ struct RunnableMethodTraits<mozilla::_ipdltest::TestRPCRacesParent>
namespace mozilla {
namespace _ipdltest {
RPCChannel::RacyRPCPolicy
MediateRace(const RPCChannel::Message& parent,
const RPCChannel::Message& child)
ipc::RacyRPCPolicy
MediateRace(const MessageChannel::Message& parent,
const MessageChannel::Message& child)
{
return (PTestRPCRaces::Msg_Child__ID == parent.type()) ?
RPCChannel::RRPParentWins : RPCChannel::RRPChildWins;
ipc::RRPParentWins : ipc::RRPChildWins;
}
//-----------------------------------------------------------------------------

View File

@@ -9,9 +9,9 @@
namespace mozilla {
namespace _ipdltest {
mozilla::ipc::RPCChannel::RacyRPCPolicy
MediateRace(const mozilla::ipc::RPCChannel::Message& parent,
const mozilla::ipc::RPCChannel::Message& child);
mozilla::ipc::RacyRPCPolicy
MediateRace(const mozilla::ipc::MessageChannel::Message& parent,
const mozilla::ipc::MessageChannel::Message& child);
class TestRPCRacesParent :
public PTestRPCRacesParent
@@ -47,7 +47,7 @@ protected:
virtual bool
RecvGetAnsweredParent(bool* answeredParent) MOZ_OVERRIDE;
virtual mozilla::ipc::RPCChannel::RacyRPCPolicy
virtual mozilla::ipc::RacyRPCPolicy
MediateRPCRace(const Message& parent, const Message& child) MOZ_OVERRIDE
{
return MediateRace(parent, child);
@@ -104,7 +104,7 @@ protected:
virtual bool
AnswerChild() MOZ_OVERRIDE;
virtual mozilla::ipc::RPCChannel::RacyRPCPolicy
virtual mozilla::ipc::RacyRPCPolicy
MediateRPCRace(const Message& parent, const Message& child) MOZ_OVERRIDE
{
return MediateRace(parent, child);

View File

@@ -3,8 +3,7 @@
#include "IPDLUnitTests.h" // fail etc.
using namespace mozilla::ipc;
typedef mozilla::ipc::RPCChannel::Message Message;
typedef mozilla::ipc::RPCChannel::RacyRPCPolicy RacyRPCPolicy;
typedef mozilla::ipc::MessageChannel::Message Message;
namespace mozilla {
namespace _ipdltest {
@@ -13,7 +12,7 @@ static RacyRPCPolicy
MediateRace(const Message& parent, const Message& child)
{
return (PTestRaceDeferral::Msg_Win__ID == parent.type()) ?
RPCChannel::RRPParentWins : RPCChannel::RRPChildWins;
RRPParentWins : RRPChildWins;
}
//-----------------------------------------------------------------------------

View File

@@ -26,7 +26,7 @@ protected:
virtual bool AnswerLose() MOZ_OVERRIDE;
virtual mozilla::ipc::RPCChannel::RacyRPCPolicy
virtual mozilla::ipc::RacyRPCPolicy
MediateRPCRace(const Message& parent, const Message& child) MOZ_OVERRIDE;
virtual void ActorDestroy(ActorDestroyReason why) MOZ_OVERRIDE
@@ -55,7 +55,7 @@ protected:
virtual bool AnswerRpc() MOZ_OVERRIDE;
virtual mozilla::ipc::RPCChannel::RacyRPCPolicy
virtual mozilla::ipc::RacyRPCPolicy
MediateRPCRace(const Message& parent, const Message& child) MOZ_OVERRIDE;
virtual void ActorDestroy(ActorDestroyReason why) MOZ_OVERRIDE

View File

@@ -80,7 +80,7 @@ TestStackHooksChild::RecvStart()
fail("EnteredCall/ExitedCall malfunction");
// kick off tests from a runnable so that we can start with
// RPCChannel code on the C++ stack
// MessageChannel code on the C++ stack
MessageLoop::current()->PostTask(FROM_HERE,
NewRunnableFunction(RunTestsFn));

View File

@@ -2,6 +2,10 @@
#include "IPDLUnitTests.h" // fail etc.
#include <unistd.h>
#if defined(OS_POSIX)
#else
#include <windows.h>
#endif
template<>
struct RunnableMethodTraits<mozilla::_ipdltest::TestUrgencyParent>
@@ -13,6 +17,13 @@ struct RunnableMethodTraits<mozilla::_ipdltest::TestUrgencyParent>
namespace mozilla {
namespace _ipdltest {
#if defined(OS_POSIX)
static void Sleep(int ms)
{
sleep(ms / 1000);
}
#endif
//-----------------------------------------------------------------------------
// parent
@@ -164,7 +175,7 @@ TestUrgencyChild::AnswerReply2(uint32_t *reply)
fail("wrong test # in AnswerReply2");
// sleep for 5 seconds so the parent process tries to deliver more messages.
sleep(5);
Sleep(5000);
*reply = 500;
test_ = kSecondTestGotReply;

View File

@@ -94,9 +94,9 @@ def main(argv):
reinterpret_cast<%sChild**>(&gChildActor);
*child = new %sChild();
::mozilla::ipc::AsyncChannel *childChannel = (*child)->GetIPCChannel();
::mozilla::ipc::AsyncChannel::Side parentSide =
::mozilla::ipc::AsyncChannel::Parent;
::mozilla::ipc::MessageChannel *childChannel = (*child)->GetIPCChannel();
::mozilla::ipc::Side parentSide =
::mozilla::ipc::ParentSide;
(*parent)->Open(childChannel, childMessageLoop, parentSide);
return (*parent)->Main();

View File

@@ -3,7 +3,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "mozilla/ipc/RPCChannel.h"
#include "mozilla/ipc/MessageChannel.h"
#include "nsAppShell.h"
#include "nsToolkit.h"
#include "nsThreadUtils.h"
@@ -175,7 +175,7 @@ bool
nsAppShell::ProcessNextNativeEvent(bool mayWait)
{
// Notify ipc we are spinning a (possibly nested) gecko event loop.
mozilla::ipc::RPCChannel::NotifyGeckoEventDispatch();
mozilla::ipc::MessageChannel::NotifyGeckoEventDispatch();
bool gotMessage = false;

View File

@@ -60,7 +60,7 @@
#include "mozilla/TouchEvents.h"
#include "mozilla/Util.h"
#include "mozilla/ipc/RPCChannel.h"
#include "mozilla/ipc/MessageChannel.h"
#include <algorithm>
#include "nsWindow.h"
@@ -4192,11 +4192,11 @@ nsWindow::IsAsyncResponseEvent(UINT aMsg, LRESULT& aResult)
void
nsWindow::IPCWindowProcHandler(UINT& msg, WPARAM& wParam, LPARAM& lParam)
{
NS_ASSERTION(!mozilla::ipc::SyncChannel::IsPumpingMessages(),
NS_ASSERTION(!mozilla::ipc::MessageChannel::IsPumpingMessages(),
"Failed to prevent a nonqueued message from running!");
// Modal UI being displayed in windowless plugins.
if (mozilla::ipc::RPCChannel::IsSpinLoopActive() &&
if (mozilla::ipc::MessageChannel::IsSpinLoopActive() &&
(InSendMessageEx(NULL)&(ISMEX_REPLIED|ISMEX_SEND)) == ISMEX_SEND) {
LRESULT res;
if (IsAsyncResponseEvent(msg, res)) {

View File

@@ -177,7 +177,7 @@ bool nsWindow::OnPaint(HDC aDC, uint32_t aNestingLevel)
// windows event spin loop. If we don't trap for this, we'll try to paint,
// but view manager will refuse to paint the surface, resulting is black
// flashes on the plugin rendering surface.
if (mozilla::ipc::RPCChannel::IsSpinLoopActive() && mPainting)
if (mozilla::ipc::MessageChannel::IsSpinLoopActive() && mPainting)
return false;
if (mWindowType == eWindowType_plugin) {

View File

@@ -59,9 +59,9 @@ static int32_t gNumWidgets;
nsIRollupListener* nsBaseWidget::gRollupListener = nullptr;
using namespace mozilla::layers;
using namespace mozilla::ipc;
using namespace mozilla;
using base::Thread;
using mozilla::ipc::AsyncChannel;
nsIContent* nsBaseWidget::mLastRollup = nullptr;
// Global user preference for disabling native theme. Used
@@ -950,12 +950,11 @@ void nsBaseWidget::CreateCompositor(int aWidth, int aHeight)
}
mCompositorParent = NewCompositorParent(aWidth, aHeight);
AsyncChannel *parentChannel = mCompositorParent->GetIPCChannel();
MessageChannel *parentChannel = mCompositorParent->GetIPCChannel();
LayerManager* lm = new ClientLayerManager(this);
MessageLoop *childMessageLoop = CompositorParent::CompositorLoop();
mCompositorChild = new CompositorChild(lm);
AsyncChannel::Side childSide = mozilla::ipc::AsyncChannel::Child;
mCompositorChild->Open(parentChannel, childMessageLoop, childSide);
mCompositorChild->Open(parentChannel, childMessageLoop, ipc::ChildSide);
TextureFactoryIdentifier textureFactoryIdentifier;
PLayerTransactionChild* shadowManager;