Bug 1754004 - Part 1: Switch IPCStream to use DataPipe instead of P{ChildToParent,ParentToChild}Stream, r=asuth,necko-reviewers,kershaw

This gives us various positive benefits, such as using a shared memory ring
buffer for faster communication, not having data streaming being bound to the
thread which transferred the nsIInputStream (which is often the main thread),
and the ability for some backpressure to be applied to data streaming.

After this change, the "delayed start" parameter for IPCStream serialization is
less relevant, as backpressure will serve a similar purpose. It will still be
used to determine whether or not to use RemoteLazyInputStream when serializing
from the parent process.

Differential Revision: https://phabricator.services.mozilla.com/D141038
This commit is contained in:
Nika Layzell
2022-05-13 14:16:09 +00:00
parent 7a52bd0d1b
commit d1b9f58acb
63 changed files with 32 additions and 1916 deletions

View File

@@ -13,8 +13,6 @@
#include "mozilla/dom/quota/DecryptingInputStream_impl.h"
#include "mozilla/dom/quota/IPCStreamCipherStrategy.h"
#include "mozilla/ipc/DataPipe.h"
#include "mozilla/ipc/IPCStreamDestination.h"
#include "mozilla/ipc/IPCStreamSource.h"
#include "mozilla/InputStreamLengthHelper.h"
#include "mozilla/RemoteLazyInputStream.h"
#include "mozilla/RemoteLazyInputStreamChild.h"
@@ -84,42 +82,28 @@ void SerializeInputStreamAsPipeInternal(nsIInputStream* aInputStream,
length = -1;
}
nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(aInputStream);
// As a fallback, attempt to stream the data across using a IPCStream
// actor. For blocking streams, create a nonblocking pipe instead,
bool nonBlocking = false;
MOZ_ALWAYS_TRUE(NS_SUCCEEDED(aInputStream->IsNonBlocking(&nonBlocking)));
if (!nonBlocking || !asyncStream) {
const uint32_t kBufferSize = 32768; // matches IPCStream buffer size.
nsCOMPtr<nsIAsyncOutputStream> sink;
nsresult rv = NS_NewPipe2(getter_AddRefs(asyncStream), getter_AddRefs(sink),
true, false, kBufferSize, UINT32_MAX);
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
nsCOMPtr<nsIEventTarget> target =
do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID);
// Since the source stream could be used by others, let's not close it when
// the copy is done.
rv = NS_AsyncCopy(aInputStream, sink, target, NS_ASYNCCOPY_VIA_READSEGMENTS,
kBufferSize, nullptr, nullptr, false);
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
}
MOZ_DIAGNOSTIC_ASSERT(asyncStream);
auto* streamSource = IPCStreamSource::Create(asyncStream, aManager);
if (NS_WARN_IF(!streamSource)) {
// Failed to create IPCStreamSource, which would cause a failure should we
// attempt to serialize it later. So abort now.
RefPtr<DataPipeSender> sender;
RefPtr<DataPipeReceiver> receiver;
nsresult rv = NewDataPipe(kDefaultDataPipeCapacity, getter_AddRefs(sender),
getter_AddRefs(receiver));
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
aParams = IPCRemoteStreamParams(aDelayedStart, streamSource, length);
nsCOMPtr<nsIEventTarget> target =
do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID);
rv =
NS_AsyncCopy(aInputStream, sender, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS,
kDefaultDataPipeCapacity, nullptr, nullptr);
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
aParams = DataPipeReceiverStreamParams(receiver);
if (length != -1) {
aParams = InputStreamLengthWrapperParams(aParams, length, false);
}
}
} // namespace
@@ -173,116 +157,6 @@ void InputStreamHelper::SerializeInputStreamAsPipe(
aManager);
}
void InputStreamHelper::PostSerializationActivation(InputStreamParams& aParams,
bool aConsumedByIPC,
bool aDelayedStart) {
switch (aParams.type()) {
case InputStreamParams::TBufferedInputStreamParams: {
BufferedInputStreamParams& params =
aParams.get_BufferedInputStreamParams();
InputStreamHelper::PostSerializationActivation(
params.optionalStream(), aConsumedByIPC, aDelayedStart);
return;
}
case InputStreamParams::TMIMEInputStreamParams: {
MIMEInputStreamParams& params = aParams.get_MIMEInputStreamParams();
InputStreamHelper::PostSerializationActivation(
params.optionalStream(), aConsumedByIPC, aDelayedStart);
return;
}
case InputStreamParams::TMultiplexInputStreamParams: {
MultiplexInputStreamParams& params =
aParams.get_MultiplexInputStreamParams();
for (InputStreamParams& subParams : params.streams()) {
InputStreamHelper::PostSerializationActivation(
subParams, aConsumedByIPC, aDelayedStart);
}
return;
}
case InputStreamParams::TSlicedInputStreamParams: {
SlicedInputStreamParams& params = aParams.get_SlicedInputStreamParams();
InputStreamHelper::PostSerializationActivation(
params.stream(), aConsumedByIPC, aDelayedStart);
return;
}
case InputStreamParams::TInputStreamLengthWrapperParams: {
InputStreamLengthWrapperParams& params =
aParams.get_InputStreamLengthWrapperParams();
InputStreamHelper::PostSerializationActivation(
params.stream(), aConsumedByIPC, aDelayedStart);
return;
}
case InputStreamParams::TIPCRemoteStreamParams: {
IPCRemoteStreamType& remoteInputStream =
aParams.get_IPCRemoteStreamParams().stream();
IPCStreamSource* source = nullptr;
if (remoteInputStream.type() ==
IPCRemoteStreamType::TPChildToParentStreamChild) {
source = IPCStreamSource::Cast(
remoteInputStream.get_PChildToParentStreamChild());
} else {
MOZ_ASSERT(remoteInputStream.type() ==
IPCRemoteStreamType::TPParentToChildStreamParent);
source = IPCStreamSource::Cast(
remoteInputStream.get_PParentToChildStreamParent());
}
MOZ_ASSERT(source);
// If the source stream has not been taken to be sent to the other side,
// we can destroy it.
if (!aConsumedByIPC) {
source->StartDestroy();
return;
}
if (!aDelayedStart) {
// If we don't need to do a delayedStart, we start it now. Otherwise,
// the Start() will be called at the first use by the
// IPCStreamDestination::DelayedStartInputStream.
source->Start();
}
return;
}
case InputStreamParams::TDataPipeReceiverStreamParams:
break;
case InputStreamParams::TStringInputStreamParams:
break;
case InputStreamParams::TFileInputStreamParams:
break;
case InputStreamParams::TRemoteLazyInputStreamParams:
break;
case InputStreamParams::TEncryptedFileInputStreamParams:
break;
default:
MOZ_CRASH(
"A new stream? Should decide if it must be processed recursively or "
"not.");
}
}
void InputStreamHelper::PostSerializationActivation(
Maybe<InputStreamParams>& aParams, bool aConsumedByIPC,
bool aDelayedStart) {
if (aParams.isSome()) {
InputStreamHelper::PostSerializationActivation(
aParams.ref(), aConsumedByIPC, aDelayedStart);
}
}
already_AddRefed<nsIInputStream> InputStreamHelper::DeserializeInputStream(
const InputStreamParams& aParams,
const nsTArray<FileDescriptor>& aFileDescriptors) {
@@ -317,28 +191,6 @@ already_AddRefed<nsIInputStream> InputStreamHelper::DeserializeInputStream(
return stream.forget();
}
if (aParams.type() == InputStreamParams::TIPCRemoteStreamParams) {
const IPCRemoteStreamParams& remoteStream =
aParams.get_IPCRemoteStreamParams();
const IPCRemoteStreamType& remoteStreamType = remoteStream.stream();
IPCStreamDestination* destinationStream;
if (remoteStreamType.type() ==
IPCRemoteStreamType::TPChildToParentStreamParent) {
destinationStream = IPCStreamDestination::Cast(
remoteStreamType.get_PChildToParentStreamParent());
} else {
MOZ_ASSERT(remoteStreamType.type() ==
IPCRemoteStreamType::TPParentToChildStreamChild);
destinationStream = IPCStreamDestination::Cast(
remoteStreamType.get_PParentToChildStreamChild());
}
destinationStream->SetDelayedStart(remoteStream.delayedStart());
destinationStream->SetLength(remoteStream.length());
return destinationStream->TakeReader();
}
if (aParams.type() == InputStreamParams::TDataPipeReceiverStreamParams) {
const DataPipeReceiverStreamParams& pipeParams =
aParams.get_DataPipeReceiverStreamParams();