Bug 1816388 - Use NS_AsyncCopy directly in FileSystemWritableFileStream::Write. r=dom-storage-reviewers,janv

Differential Revision: https://phabricator.services.mozilla.com/D169609
This commit is contained in:
Jan Varga
2023-02-13 16:11:36 +00:00
parent 2bfe76b88b
commit 3b682567d6
7 changed files with 158 additions and 146 deletions

View File

@@ -6,6 +6,7 @@
#include "FileSystemWritableFileStream.h"
#include "fs/FileSystemAsyncCopy.h"
#include "fs/FileSystemThreadSafeStreamOwner.h"
#include "mozilla/Buffer.h"
#include "mozilla/ErrorResult.h"
@@ -75,104 +76,72 @@ class WritableFileStreamUnderlyingSinkAlgorithms final
RefPtr<FileSystemWritableFileStream> mStream;
};
class CopyPromiseResolver final : public nsIRequestObserver {
RefPtr<Promise> mPromise;
nsCOMPtr<nsISerialEventTarget> mResolveTarget;
public:
explicit CopyPromiseResolver(already_AddRefed<Promise> aPromise,
nsISerialEventTarget* aResolveTarget)
: mPromise(aPromise), mResolveTarget(aResolveTarget) {}
NS_DECL_ISUPPORTS
NS_DECL_NSIREQUESTOBSERVER
private:
~CopyPromiseResolver() = default;
};
NS_IMPL_ISUPPORTS(CopyPromiseResolver, nsIRequestObserver)
NS_IMETHODIMP
CopyPromiseResolver::OnStartRequest(nsIRequest* aRequest) { return NS_OK; }
NS_IMETHODIMP
CopyPromiseResolver::OnStopRequest(nsIRequest* /*aRequest*/, nsresult aStatus) {
InvokeAsync(mResolveTarget, __func__,
[promise = std::move(mPromise), aStatus]() {
if (NS_SUCCEEDED(aStatus)) {
promise->MaybeResolve(0); // Written amount?
} else if (IsFileNotFoundError(aStatus)) {
promise->MaybeRejectWithNotFoundError("File not found");
} else if (aStatus == NS_ERROR_FILE_NO_DEVICE_SPACE) {
promise->MaybeRejectWithQuotaExceededError("Quota exceeded");
} else {
promise->MaybeReject(aStatus);
}
return BoolPromise::CreateAndResolve(true, __func__);
});
return NS_OK;
}
// Same value as in FileSystemSyncAccessHandle
const uint32_t kWritableStreamCopyBlockSize = 1024 * 1024;
// TODO: Refactor this function, see Bug 1804614
void WriteImpl(const RefPtr<nsISerialEventTarget>& aTaskQueue,
already_AddRefed<nsIInputStream> aInputStream,
nsCOMPtr<nsIInputStream> aInputStream,
RefPtr<fs::FileSystemThreadSafeStreamOwner>& aOutStreamOwner,
const Maybe<uint64_t> aPosition,
const RefPtr<Promise>& aPromise) {
auto rejectAndReturn = [&aPromise](const nsresult rv) {
if (IsFileNotFoundError(rv)) {
aPromise->MaybeRejectWithNotFoundError("File not found");
return;
}
aPromise->MaybeReject(rv);
};
InvokeAsync(
aTaskQueue, __func__,
[aTaskQueue, inputStream = std::move(aInputStream), aOutStreamOwner,
aPosition]() {
if (aPosition.isSome()) {
LOG(("%p: Seeking to %" PRIu64, aOutStreamOwner.get(),
aPosition.value()));
nsresult rv = NS_ERROR_UNEXPECTED;
nsCOMPtr<nsIAsyncStreamCopier> copier =
do_CreateInstance("@mozilla.org/network/async-stream-copier;1", &rv);
QM_TRY(MOZ_TO_RESULT(rv), rejectAndReturn);
MOZ_ASSERT(copier);
QM_TRY(MOZ_TO_RESULT(aOutStreamOwner->Seek(aPosition.value())),
CreateAndRejectInt64Promise);
}
nsCOMPtr<nsIInputStream> bufferedSource;
QM_TRY(MOZ_TO_RESULT(NS_NewBufferedInputStream(getter_AddRefs(bufferedSource),
std::move(aInputStream),
kWritableStreamCopyBlockSize)),
rejectAndReturn);
MOZ_ASSERT(bufferedSource);
nsCOMPtr<nsIOutputStream> streamSink = aOutStreamOwner->OutputStream();
if (aPosition.isSome()) {
LOG(("%p: Seeking to %" PRIu64, aOutStreamOwner.get(), aPosition.value()));
auto written = std::make_shared<int64_t>(0);
auto writingProgress = [written](uint32_t aDelta) {
*written += static_cast<int64_t>(aDelta);
};
QM_TRY(MOZ_TO_RESULT(aOutStreamOwner->Seek(aPosition.value())),
rejectAndReturn);
}
auto promiseHolder = MakeUnique<MozPromiseHolder<Int64Promise>>();
RefPtr<Int64Promise> promise = promiseHolder->Ensure(__func__);
nsCOMPtr<nsIOutputStream> streamSink = aOutStreamOwner->OutputStream();
MOZ_ASSERT(streamSink);
QM_TRY(MOZ_TO_RESULT(copier->Init(
bufferedSource, streamSink, aTaskQueue, /* sourceBuffered */ true,
/* sinkBuffered */ false, kWritableStreamCopyBlockSize,
/* closeSource */ true, /* closeSink */ false)),
rejectAndReturn);
auto writingCompletion =
[written,
promiseHolder = std::move(promiseHolder)](nsresult aStatus) {
if (NS_SUCCEEDED(aStatus)) {
promiseHolder->ResolveIfExists(*written, __func__);
return;
}
nsCOMPtr<nsISerialEventTarget> resolveTarget = GetCurrentSerialEventTarget();
promiseHolder->RejectIfExists(aStatus, __func__);
};
// Observer will live on main thread, copier dispatches work on taskQueue
InvokeAsync(GetMainThreadSerialEventTarget(), __func__,
[copier = std::move(copier), promise = aPromise,
resolveTarget = std::move(resolveTarget)]() mutable {
RefPtr<CopyPromiseResolver> callbacks =
new CopyPromiseResolver(promise.forget(), resolveTarget);
// Error through callback promise
QM_TRY(MOZ_TO_RESULT(copier->AsyncCopy(callbacks, nullptr)),
CreateAndRejectBoolPromise);
return BoolPromise::CreateAndResolve(true, __func__);
});
QM_TRY(MOZ_TO_RESULT(fs::AsyncCopy(
inputStream, streamSink, aTaskQueue,
nsAsyncCopyMode::NS_ASYNCCOPY_VIA_READSEGMENTS,
/* aCloseSource */ true, /* aCloseSink */ false,
std::move(writingProgress), std::move(writingCompletion))),
CreateAndRejectInt64Promise);
return promise;
})
->Then(
GetCurrentSerialEventTarget(), __func__,
[aPromise](const Int64Promise::ResolveOrRejectValue& aValue) {
if (aValue.IsResolve()) {
aPromise->MaybeResolve(aValue.ResolveValue());
return;
}
if (IsFileNotFoundError(aValue.RejectValue())) {
aPromise->MaybeRejectWithNotFoundError("File not found");
} else if (aValue.RejectValue() == NS_ERROR_FILE_NO_DEVICE_SPACE) {
aPromise->MaybeRejectWithQuotaExceededError("Quota exceeded");
} else {
aPromise->MaybeReject(aValue.RejectValue());
}
aPromise->MaybeReject(aValue.RejectValue());
});
}
} // namespace
@@ -741,7 +710,7 @@ void FileSystemWritableFileStream::Write(const T& aData,
NS_ASSIGNMENT_COPY)),
rejectAndReturn);
WriteImpl(mTaskQueue, inputStream.forget(), mStreamOwner, aPosition,
WriteImpl(mTaskQueue, std::move(inputStream), mStreamOwner, aPosition,
aPromise);
return;
}
@@ -757,7 +726,7 @@ void FileSystemWritableFileStream::Write(const T& aData,
})),
rejectAndReturn);
WriteImpl(mTaskQueue, inputStream.forget(), mStreamOwner, aPosition,
WriteImpl(mTaskQueue, std::move(inputStream), mStreamOwner, aPosition,
aPromise);
return;
}
@@ -777,7 +746,7 @@ void FileSystemWritableFileStream::Write(const T& aData,
std::move(dataString))),
rejectAndReturn);
WriteImpl(mTaskQueue, inputStream.forget(), mStreamOwner, aPosition,
WriteImpl(mTaskQueue, std::move(inputStream), mStreamOwner, aPosition,
aPromise);
}