Bug 1371699 - Use of NonBlockingAsyncInputStream in our code base, r=froydnj

This commit is contained in:
Andrea Marchesini
2017-10-03 07:20:18 +02:00
parent ddad802806
commit 452ce73f77
7 changed files with 111 additions and 76 deletions

View File

@@ -6,9 +6,8 @@
#include "FetchStream.h" #include "FetchStream.h"
#include "mozilla/dom/DOMException.h" #include "mozilla/dom/DOMException.h"
#include "nsITransport.h"
#include "nsIStreamTransportService.h"
#include "nsProxyRelease.h" #include "nsProxyRelease.h"
#include "nsStreamUtils.h"
#include "WorkerPrivate.h" #include "WorkerPrivate.h"
#include "Workers.h" #include "Workers.h"
@@ -205,45 +204,15 @@ FetchStream::RequestDataCallback(JSContext* aCx,
// mOriginalInputStream into an nsIAsyncInputStream. // mOriginalInputStream into an nsIAsyncInputStream.
MOZ_ASSERT(stream->mOriginalInputStream); MOZ_ASSERT(stream->mOriginalInputStream);
bool nonBlocking = false; nsCOMPtr<nsIAsyncInputStream> asyncStream;
nsresult rv = stream->mOriginalInputStream->IsNonBlocking(&nonBlocking); nsresult rv =
NS_MakeAsyncNonBlockingInputStream(stream->mOriginalInputStream,
getter_AddRefs(asyncStream));
if (NS_WARN_IF(NS_FAILED(rv))) { if (NS_WARN_IF(NS_FAILED(rv))) {
stream->ErrorPropagation(aCx, aStream, rv); stream->ErrorPropagation(aCx, aStream, rv);
return; return;
} }
nsCOMPtr<nsIAsyncInputStream> asyncStream =
do_QueryInterface(stream->mOriginalInputStream);
if (!nonBlocking || !asyncStream) {
nsCOMPtr<nsIStreamTransportService> sts =
do_GetService(kStreamTransportServiceCID, &rv);
if (NS_WARN_IF(NS_FAILED(rv))) {
stream->ErrorPropagation(aCx, aStream, rv);
return;
}
nsCOMPtr<nsITransport> transport;
rv = sts->CreateInputTransport(stream->mOriginalInputStream,
/* aCloseWhenDone */ true,
getter_AddRefs(transport));
if (NS_WARN_IF(NS_FAILED(rv))) {
stream->ErrorPropagation(aCx, aStream, rv);
return;
}
nsCOMPtr<nsIInputStream> wrapper;
rv = transport->OpenInputStream(/* aFlags */ 0,
/* aSegmentSize */ 0,
/* aSegmentCount */ 0,
getter_AddRefs(wrapper));
if (NS_WARN_IF(NS_FAILED(rv))) {
stream->ErrorPropagation(aCx, aStream, rv);
return;
}
asyncStream = do_QueryInterface(wrapper);
}
stream->mInputStream = asyncStream; stream->mInputStream = asyncStream;
stream->mOriginalInputStream = nullptr; stream->mOriginalInputStream = nullptr;
} }

View File

@@ -9,8 +9,6 @@
#include "nsIEventTarget.h" #include "nsIEventTarget.h"
#include "nsIGlobalObject.h" #include "nsIGlobalObject.h"
#include "nsITimer.h" #include "nsITimer.h"
#include "nsITransport.h"
#include "nsIStreamTransportService.h"
#include "mozilla/Base64.h" #include "mozilla/Base64.h"
#include "mozilla/CheckedInt.h" #include "mozilla/CheckedInt.h"
@@ -22,7 +20,6 @@
#include "nsCycleCollectionParticipant.h" #include "nsCycleCollectionParticipant.h"
#include "nsDOMJSUtils.h" #include "nsDOMJSUtils.h"
#include "nsError.h" #include "nsError.h"
#include "nsNetCID.h"
#include "nsNetUtil.h" #include "nsNetUtil.h"
#include "xpcpublic.h" #include "xpcpublic.h"
@@ -43,8 +40,6 @@ using namespace workers;
const uint64_t kUnknownSize = uint64_t(-1); const uint64_t kUnknownSize = uint64_t(-1);
static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
NS_IMPL_CYCLE_COLLECTION_CLASS(FileReader) NS_IMPL_CYCLE_COLLECTION_CLASS(FileReader)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(FileReader, NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(FileReader,
@@ -383,44 +378,12 @@ FileReader::ReadFileContent(Blob& aBlob,
return; return;
} }
bool nonBlocking = false; aRv = NS_MakeAsyncNonBlockingInputStream(stream,
aRv = stream->IsNonBlocking(&nonBlocking); getter_AddRefs(mAsyncStream));
if (NS_WARN_IF(aRv.Failed())) { if (NS_WARN_IF(aRv.Failed())) {
return; return;
} }
mAsyncStream = do_QueryInterface(stream);
// We want to have a non-blocking nsIAsyncInputStream.
if (!mAsyncStream || !nonBlocking) {
nsresult rv;
nsCOMPtr<nsIStreamTransportService> sts =
do_GetService(kStreamTransportServiceCID, &rv);
if (NS_WARN_IF(NS_FAILED(rv))) {
aRv.Throw(rv);
return;
}
nsCOMPtr<nsITransport> transport;
aRv = sts->CreateInputTransport(stream,
/* aCloseWhenDone */ true,
getter_AddRefs(transport));
if (NS_WARN_IF(aRv.Failed())) {
return;
}
nsCOMPtr<nsIInputStream> wrapper;
aRv = transport->OpenInputStream(/* aFlags */ 0,
/* aSegmentSize */ 0,
/* aSegmentCount */ 0,
getter_AddRefs(wrapper));
if (NS_WARN_IF(aRv.Failed())) {
return;
}
mAsyncStream = do_QueryInterface(wrapper);
}
MOZ_ASSERT(mAsyncStream); MOZ_ASSERT(mAsyncStream);
mTotal = mBlob->GetSize(aRv); mTotal = mBlob->GetSize(aRv);

View File

@@ -8,6 +8,7 @@
#include "IPCBlobInputStreamChild.h" #include "IPCBlobInputStreamChild.h"
#include "IPCBlobInputStreamStorage.h" #include "IPCBlobInputStreamStorage.h"
#include "mozilla/ipc/InputStreamParams.h" #include "mozilla/ipc/InputStreamParams.h"
#include "mozilla/NonBlockingAsyncInputStream.h"
#include "IPCBlobInputStreamThread.h" #include "IPCBlobInputStreamThread.h"
#include "nsIAsyncInputStream.h" #include "nsIAsyncInputStream.h"
#include "nsIAsyncOutputStream.h" #include "nsIAsyncOutputStream.h"
@@ -641,7 +642,17 @@ IPCBlobInputStream::EnsureAsyncRemoteStream()
} }
nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(mRemoteStream); nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(mRemoteStream);
if (!asyncStream || !nonBlocking) {
// If non-blocking and non-async, let's use NonBlockingAsyncInputStream.
if (nonBlocking && !asyncStream) {
rv = NonBlockingAsyncInputStream::Create(mRemoteStream,
getter_AddRefs(asyncStream));
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
}
if (!asyncStream) {
// Let's make the stream async using the DOMFile thread. // Let's make the stream async using the DOMFile thread.
nsCOMPtr<nsIAsyncInputStream> pipeIn; nsCOMPtr<nsIAsyncInputStream> pipeIn;
nsCOMPtr<nsIAsyncOutputStream> pipeOut; nsCOMPtr<nsIAsyncOutputStream> pipeOut;

View File

@@ -12,6 +12,9 @@ interface nsIInputAvailableCallback;
/** /**
* This service read/writes a stream on a background thread. * This service read/writes a stream on a background thread.
* *
* Note: instead of using this interface, probably you want to use
* NS_MakeAsyncNonBlockingInputStream.
*
* Use this service to transform any blocking stream (e.g., file stream) * Use this service to transform any blocking stream (e.g., file stream)
* into a fully asynchronous stream that can be read/written without * into a fully asynchronous stream that can be read/written without
* blocking the main thread. * blocking the main thread.

View File

@@ -13,11 +13,13 @@
#include "nsThreadUtils.h" #include "nsThreadUtils.h"
#include "nsCOMPtr.h" #include "nsCOMPtr.h"
#include "mozilla/Logging.h" #include "mozilla/Logging.h"
#include "mozilla/NonBlockingAsyncInputStream.h"
#include "GeckoProfiler.h" #include "GeckoProfiler.h"
#include "nsIStreamListener.h" #include "nsIStreamListener.h"
#include "nsILoadGroup.h" #include "nsILoadGroup.h"
#include "nsNetCID.h" #include "nsNetCID.h"
#include "nsStreamUtils.h" #include "nsStreamUtils.h"
#include "SlicedInputStream.h"
#include <algorithm> #include <algorithm>
static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID); static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
@@ -331,6 +333,11 @@ nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt)
if (nonBlocking) { if (nonBlocking) {
mAsyncStream = do_QueryInterface(mStream); mAsyncStream = do_QueryInterface(mStream);
if (!mAsyncStream) {
rv = NonBlockingAsyncInputStream::Create(mStream,
getter_AddRefs(mAsyncStream));
if (NS_WARN_IF(NS_FAILED(rv))) return rv;
}
} }
if (!mAsyncStream) { if (!mAsyncStream) {

View File

@@ -21,9 +21,14 @@
#include "nsNetCID.h" #include "nsNetCID.h"
#include "nsServiceManagerUtils.h" #include "nsServiceManagerUtils.h"
#include "nsThreadUtils.h" #include "nsThreadUtils.h"
#include "nsITransport.h"
#include "nsIStreamTransportService.h"
#include "NonBlockingAsyncInputStream.h"
using namespace mozilla; using namespace mozilla;
static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// This is a nsICancelableRunnable because we can dispatch it to Workers and // This is a nsICancelableRunnable because we can dispatch it to Workers and
@@ -960,3 +965,60 @@ NS_CloneInputStream(nsIInputStream* aSource, nsIInputStream** aCloneOut,
return NS_OK; return NS_OK;
} }
nsresult
NS_MakeAsyncNonBlockingInputStream(nsIInputStream* aSource,
nsIAsyncInputStream** aAsyncInputStream)
{
if (NS_WARN_IF(!aSource || !aAsyncInputStream)) {
return NS_ERROR_FAILURE;
}
bool nonBlocking = false;
nsresult rv = aSource->IsNonBlocking(&nonBlocking);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(aSource);
if (nonBlocking && asyncStream) {
// This stream is perfect!
asyncStream.forget(aAsyncInputStream);
return NS_OK;
}
if (nonBlocking) {
// If the stream is non-blocking but not async, we wrap it.
return NonBlockingAsyncInputStream::Create(aSource, aAsyncInputStream);
}
nsCOMPtr<nsIStreamTransportService> sts =
do_GetService(kStreamTransportServiceCID, &rv);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
nsCOMPtr<nsITransport> transport;
rv = sts->CreateInputTransport(aSource,
/* aCloseWhenDone */ true,
getter_AddRefs(transport));
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
nsCOMPtr<nsIInputStream> wrapper;
rv = transport->OpenInputStream(/* aFlags */ 0,
/* aSegmentSize */ 0,
/* aSegmentCount */ 0,
getter_AddRefs(wrapper));
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
asyncStream = do_QueryInterface(wrapper);
MOZ_ASSERT(asyncStream);
asyncStream.forget(aAsyncInputStream);
return NS_OK;
}

View File

@@ -12,6 +12,7 @@
#include "nsIInputStream.h" #include "nsIInputStream.h"
#include "nsTArray.h" #include "nsTArray.h"
class nsIAsyncInputStream;
class nsIOutputStream; class nsIOutputStream;
class nsIInputStreamCallback; class nsIInputStreamCallback;
class nsIOutputStreamCallback; class nsIOutputStreamCallback;
@@ -293,4 +294,23 @@ extern nsresult
NS_CloneInputStream(nsIInputStream* aSource, nsIInputStream** aCloneOut, NS_CloneInputStream(nsIInputStream* aSource, nsIInputStream** aCloneOut,
nsIInputStream** aReplacementOut = nullptr); nsIInputStream** aReplacementOut = nullptr);
/*
* This function returns a non-blocking nsIAsyncInputStream. Internally,
* different approaches are used based on what |aSource| is and what it
* implements.
*
* If the |aSource| is already a non-blocking and async stream,
* |aAsyncInputStream| will be equal to |aSource|.
*
* Otherwise, if |aSource| is just non-blocking, NonBlockingAsyncInputStream
* class is used in order to make it async.
*
* The last step is to use nsIStreamTransportService and create a pipe in order
* to expose a non-blocking async inputStream and read |aSource| data from
* a separate thread.
*/
extern nsresult
NS_MakeAsyncNonBlockingInputStream(nsIInputStream* aSource,
nsIAsyncInputStream** aAsyncInputStream);
#endif // !nsStreamUtils_h__ #endif // !nsStreamUtils_h__