Bug 1879375 - Ensure IPDL actors are notified of peer node destruction, r=ipc-reviewers,mccr8
Suppose we have 3 nodes: A, B, and C. In this scenario A is the broker, and there are two child-nodes B and C. A creates a port-pair (Ab <-> Ac), and sends one each to B (Ab => Bb) and C (Ac => Cc). Assuming both directions of the proxy bypass occur concurrently, we'll send a number of ObserveProxy messages between node A and nodes B/C, eventually cleaning up the proxies in A, such that Bb's peer is Cc, and vice versa. During this process, we never attempt to send a message directly between nodes B and C. In NodeController, direct connections between a pair of nodes are established via the broker node when attempting to send a message directly between nodes, but as we have not attempted to send any messages directly, no such connection has been established. That means that when one of these child nodes dies, the other node will not be notified of the peer being lost, and the IPDL actor will appear to remain open. Only once a message is sent will the death of the peer node be discovered, and the corresponding actor destroyed. To fix this, we modify the routing code, adding a couple of callbacks when accepting ports over IPC and bypassing proxies which notify NodeController, allowing it to attempt an introduction eagerly. This helps ensure that actors will reliably be notified when their peer processes die. In addition, some tweaks to the introduction logic were made to both make introductions happen reliably, and to ensure we clean up missing peer nodes in error conditions. Differential Revision: https://phabricator.services.mozilla.com/D201153
This commit is contained in:
@@ -237,6 +237,10 @@ class TestNode : public NodeDelegate {
|
||||
}
|
||||
}
|
||||
|
||||
void ObserveRemoteNode(const NodeName& node) override {
|
||||
DCHECK(node != node_name_);
|
||||
}
|
||||
|
||||
void ClosePortsInEvent(Event* event) {
|
||||
if (event->type() != Event::Type::kUserMessage) {
|
||||
return;
|
||||
|
||||
@@ -837,6 +837,10 @@ int Node::OnObserveProxy(const PortRef& port_ref,
|
||||
MaybeResendAckRequest(port_ref);
|
||||
|
||||
delegate_->PortStatusChanged(port_ref);
|
||||
|
||||
if (event->proxy_target_node_name() != name_) {
|
||||
delegate_->ObserveRemoteNode(event->proxy_target_node_name());
|
||||
}
|
||||
}
|
||||
|
||||
return OK;
|
||||
@@ -1513,6 +1517,11 @@ int Node::AcceptPort(const PortName& port_name,
|
||||
mozilla::MakeUnique<PortAcceptedEvent>(
|
||||
port_descriptor.referring_port_name,
|
||||
kInvalidPortName, kInvalidSequenceNum));
|
||||
|
||||
if (port_descriptor.peer_node_name != name_) {
|
||||
delegate_->ObserveRemoteNode(port_descriptor.peer_node_name);
|
||||
}
|
||||
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,12 @@ class NodeDelegate {
|
||||
// to query the latest status of the port. Note, this event could be spurious
|
||||
// if another thread is simultaneously modifying the status of the port.
|
||||
virtual void PortStatusChanged(const PortRef& port_ref) = 0;
|
||||
|
||||
// Called after receiving a port with a remote peer, or bypassing a proxy to a
|
||||
// remote peer. Embedders can use this to ensure a connection to the remote
|
||||
// peer, reducing message queueing and ensuring prompt notification of peer
|
||||
// node death.
|
||||
virtual void ObserveRemoteNode(const NodeName& node) = 0;
|
||||
};
|
||||
|
||||
} // namespace ports
|
||||
|
||||
@@ -307,67 +307,78 @@ void NodeController::DropPeer(NodeName aNodeName) {
|
||||
mNode->LostConnectionToNode(aNodeName);
|
||||
}
|
||||
|
||||
void NodeController::ForwardEvent(const NodeName& aNode,
|
||||
UniquePtr<Event> aEvent) {
|
||||
if (aNode == mName) {
|
||||
(void)mNode->AcceptEvent(mName, std::move(aEvent));
|
||||
} else {
|
||||
// On Windows and macOS, messages holding HANDLEs or mach ports must be
|
||||
// relayed via the broker process so it can transfer ownership.
|
||||
bool needsRelay = false;
|
||||
void NodeController::ContactRemotePeer(const NodeName& aNode,
|
||||
UniquePtr<Event> aEvent) {
|
||||
// On Windows and macOS, messages holding HANDLEs or mach ports must be
|
||||
// relayed via the broker process so it can transfer ownership.
|
||||
bool needsRelay = false;
|
||||
#if defined(XP_WIN) || defined(XP_DARWIN)
|
||||
if (!IsBroker() && aNode != kBrokerNodeName &&
|
||||
aEvent->type() == Event::kUserMessage) {
|
||||
auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get());
|
||||
needsRelay =
|
||||
userEvent->HasMessage() &&
|
||||
userEvent->GetMessage<IPC::Message>()->num_relayed_attachments() > 0;
|
||||
}
|
||||
if (aEvent && !IsBroker() && aNode != kBrokerNodeName &&
|
||||
aEvent->type() == Event::kUserMessage) {
|
||||
auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get());
|
||||
needsRelay =
|
||||
userEvent->HasMessage() &&
|
||||
userEvent->GetMessage<IPC::Message>()->num_relayed_attachments() > 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
UniquePtr<IPC::Message> message =
|
||||
UniquePtr<IPC::Message> message;
|
||||
if (aEvent) {
|
||||
message =
|
||||
SerializeEventMessage(std::move(aEvent), needsRelay ? &aNode : nullptr);
|
||||
MOZ_ASSERT(message->is_relay() == needsRelay,
|
||||
"Message relay status set incorrectly");
|
||||
}
|
||||
|
||||
RefPtr<NodeChannel> peer;
|
||||
RefPtr<NodeChannel> broker;
|
||||
bool needsIntroduction = false;
|
||||
{
|
||||
auto state = mState.Lock();
|
||||
RefPtr<NodeChannel> peer;
|
||||
RefPtr<NodeChannel> broker;
|
||||
bool needsIntroduction = false;
|
||||
bool needsBroker = needsRelay;
|
||||
{
|
||||
auto state = mState.Lock();
|
||||
|
||||
// Check if we know this peer. If we don't, we'll need to request an
|
||||
// introduction.
|
||||
peer = state->mPeers.Get(aNode);
|
||||
if (!peer || needsRelay) {
|
||||
if (IsBroker()) {
|
||||
NODECONTROLLER_WARNING("Ignoring message '%s' to unknown peer %s",
|
||||
message->name(), ToString(aNode).c_str());
|
||||
return;
|
||||
}
|
||||
|
||||
broker = state->mPeers.Get(kBrokerNodeName);
|
||||
if (!broker) {
|
||||
NODECONTROLLER_WARNING(
|
||||
"Ignoring message '%s' to peer %s due to a missing broker",
|
||||
message->name(), ToString(aNode).c_str());
|
||||
return;
|
||||
}
|
||||
|
||||
if (!needsRelay) {
|
||||
auto& queue =
|
||||
state->mPendingMessages.LookupOrInsertWith(aNode, [&]() {
|
||||
needsIntroduction = true;
|
||||
return Queue<UniquePtr<IPC::Message>, 64>{};
|
||||
});
|
||||
queue.Push(std::move(message));
|
||||
}
|
||||
// Check if we know this peer. If we don't, we'll need to request an
|
||||
// introduction.
|
||||
peer = state->mPeers.Get(aNode);
|
||||
if (!peer) {
|
||||
// We don't know the peer, check if we've already requested an
|
||||
// introduction, or if we need to request a new one.
|
||||
auto& queue = state->mPendingMessages.LookupOrInsertWith(aNode, [&]() {
|
||||
needsIntroduction = true;
|
||||
needsBroker = true;
|
||||
return Queue<UniquePtr<IPC::Message>, 64>{};
|
||||
});
|
||||
// If we aren't relaying, queue up the message to be sent.
|
||||
if (message && !needsRelay) {
|
||||
queue.Push(std::move(message));
|
||||
}
|
||||
}
|
||||
|
||||
MOZ_ASSERT(!needsIntroduction || !needsRelay,
|
||||
"Only one of the two should ever be set");
|
||||
if (needsBroker && !IsBroker()) {
|
||||
broker = state->mPeers.Get(kBrokerNodeName);
|
||||
}
|
||||
}
|
||||
|
||||
if (needsBroker && !broker) {
|
||||
NODECONTROLLER_WARNING(
|
||||
"Dropping message '%s'; no connection to unknown peer %s",
|
||||
message ? message->name() : "<null>", ToString(aNode).c_str());
|
||||
if (needsIntroduction) {
|
||||
// We have no broker and will never be able to be introduced to this node.
|
||||
// Queue a task to clean up any ports connected to it.
|
||||
XRE_GetIOMessageLoop()->PostTask(NewRunnableMethod<NodeName>(
|
||||
"NodeController::DropPeer", this, &NodeController::DropPeer, aNode));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (needsIntroduction) {
|
||||
NODECONTROLLER_LOG(LogLevel::Info, "Requesting introduction to peer %s",
|
||||
ToString(aNode).c_str());
|
||||
broker->RequestIntroduction(aNode);
|
||||
}
|
||||
|
||||
if (message) {
|
||||
if (needsRelay) {
|
||||
NODECONTROLLER_LOG(LogLevel::Info,
|
||||
"Relaying message '%s' for peer %s due to %" PRIu32
|
||||
@@ -376,15 +387,22 @@ void NodeController::ForwardEvent(const NodeName& aNode,
|
||||
message->num_relayed_attachments());
|
||||
MOZ_ASSERT(message->num_relayed_attachments() > 0 && broker);
|
||||
broker->SendEventMessage(std::move(message));
|
||||
} else if (needsIntroduction) {
|
||||
MOZ_ASSERT(broker);
|
||||
broker->RequestIntroduction(aNode);
|
||||
} else if (peer) {
|
||||
peer->SendEventMessage(std::move(message));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void NodeController::ForwardEvent(const NodeName& aNode,
|
||||
UniquePtr<Event> aEvent) {
|
||||
MOZ_ASSERT(aEvent, "cannot forward null event");
|
||||
if (aNode == mName) {
|
||||
(void)mNode->AcceptEvent(mName, std::move(aEvent));
|
||||
} else {
|
||||
ContactRemotePeer(aNode, std::move(aEvent));
|
||||
}
|
||||
}
|
||||
|
||||
void NodeController::BroadcastEvent(UniquePtr<Event> aEvent) {
|
||||
UniquePtr<IPC::Message> message =
|
||||
SerializeEventMessage(std::move(aEvent), nullptr, BROADCAST_MESSAGE_TYPE);
|
||||
@@ -415,6 +433,11 @@ void NodeController::PortStatusChanged(const PortRef& aPortRef) {
|
||||
}
|
||||
}
|
||||
|
||||
void NodeController::ObserveRemoteNode(const NodeName& aNode) {
|
||||
MOZ_ASSERT(aNode != mName);
|
||||
ContactRemotePeer(aNode, nullptr);
|
||||
}
|
||||
|
||||
void NodeController::OnEventMessage(const NodeName& aFromNode,
|
||||
UniquePtr<IPC::Message> aMessage) {
|
||||
AssertIOThread();
|
||||
@@ -840,6 +863,9 @@ void NodeController::CleanUp() {
|
||||
lostConnections.AppendElement(chan.GetKey());
|
||||
channelsToClose.AppendElement(chan.GetData());
|
||||
}
|
||||
for (const auto& pending : state->mPendingMessages.Keys()) {
|
||||
lostConnections.AppendElement(pending);
|
||||
}
|
||||
for (const auto& invite : state->mInvites.Values()) {
|
||||
channelsToClose.AppendElement(invite.mChannel);
|
||||
portsToClose.AppendElement(invite.mToMerge);
|
||||
|
||||
@@ -121,6 +121,11 @@ class NodeController final : public mojo::core::ports::NodeDelegate,
|
||||
// Stop communicating with this peer. Must be called on the IO thread.
|
||||
void DropPeer(NodeName aNodeName);
|
||||
|
||||
// Ensure that there is a direct connection to a remote node, requesting an
|
||||
// introduction if there is not.
|
||||
// If provided, will optionally send an event to the remote node.
|
||||
void ContactRemotePeer(const NodeName& aNode, UniquePtr<Event> aEvent);
|
||||
|
||||
// Message Handlers
|
||||
void OnEventMessage(const NodeName& aFromNode,
|
||||
UniquePtr<IPC::Message> aMessage) override;
|
||||
@@ -138,6 +143,7 @@ class NodeController final : public mojo::core::ports::NodeDelegate,
|
||||
void ForwardEvent(const NodeName& aNode, UniquePtr<Event> aEvent) override;
|
||||
void BroadcastEvent(UniquePtr<Event> aEvent) override;
|
||||
void PortStatusChanged(const PortRef& aPortRef) override;
|
||||
void ObserveRemoteNode(const NodeName& aNode) override;
|
||||
|
||||
const NodeName mName;
|
||||
const UniquePtr<Node> mNode;
|
||||
|
||||
Reference in New Issue
Block a user