Bug 1899812 - Handle would block error for Http3WebTransportStream, r=necko-reviewers,jesup

Differential Revision: https://phabricator.services.mozilla.com/D234502
This commit is contained in:
Kershaw Chang
2025-01-24 20:43:09 +00:00
parent 8172db6ee5
commit dc04135e25
4 changed files with 63 additions and 17 deletions

View File

@@ -168,3 +168,18 @@ add_task(async function test_wt_incoming_bidi_stream() {
wt.close();
});
add_task(async function test_wt_incoming_bidi_stream_huge_data() {
let wt = new WebTransport(
"https://" + host + "/create_bidi_stream_and_large_data"
);
// await wt.ready; // causes occasional hang on release --verify
const stream_reader = wt.incomingBidirectionalStreams.getReader();
const { value: bidi_stream } = await stream_reader.read();
stream_reader.releaseLock();
const str = await read_stream_as_string(bidi_stream.readable);
Assert.equal(str.length, 32 * 1024 * 1024);
wt.close();
});

View File

@@ -119,7 +119,8 @@ NS_IMPL_ISUPPORTS(WebTransportReceiveStreamStats,
} // namespace
NS_IMPL_ISUPPORTS(Http3WebTransportStream, nsIInputStreamCallback)
NS_IMPL_ISUPPORTS(Http3WebTransportStream, nsIInputStreamCallback,
nsIOutputStreamCallback)
Http3WebTransportStream::Http3WebTransportStream(
Http3Session* aSession, uint64_t aSessionId, WebTransportStreamType aType,
@@ -174,6 +175,17 @@ NS_IMETHODIMP Http3WebTransportStream::OnInputStreamReady(
return NS_OK;
}
NS_IMETHODIMP
Http3WebTransportStream::OnOutputStreamReady(nsIAsyncOutputStream* aOutStream) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
if (!mSession) {
return NS_OK;
}
mSession->ConnectSlowConsumer(this);
return NS_OK;
}
nsresult Http3WebTransportStream::InitOutputPipe() {
nsCOMPtr<nsIAsyncOutputStream> out;
nsCOMPtr<nsIAsyncInputStream> in;
@@ -525,7 +537,14 @@ nsresult Http3WebTransportStream::WriteSegments() {
static_cast<uint32_t>(mSocketInCondition), this));
if (NS_FAILED(rv)) {
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
rv = NS_OK;
nsCOMPtr<nsIEventTarget> target;
Unused << gHttpHandler->GetSocketThreadTarget(getter_AddRefs(target));
if (target) {
mReceiveStreamPipeOut->AsyncWait(this, 0, 0, target);
rv = NS_OK;
} else {
rv = NS_ERROR_UNEXPECTED;
}
}
if (rv == NS_BASE_STREAM_CLOSED) {
mReceiveStreamPipeOut->Close();

View File

@@ -25,12 +25,14 @@ class Http3WebTransportSession;
class Http3WebTransportStream final : public Http3StreamBase,
public nsAHttpSegmentWriter,
public nsAHttpSegmentReader,
public nsIInputStreamCallback {
public nsIInputStreamCallback,
public nsIOutputStreamCallback {
public:
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSAHTTPSEGMENTWRITER
NS_DECL_NSAHTTPSEGMENTREADER
NS_DECL_NSIINPUTSTREAMCALLBACK
NS_DECL_NSIOUTPUTSTREAMCALLBACK
explicit Http3WebTransportStream(
Http3Session* aSession, uint64_t aSessionId, WebTransportStreamType aType,
@@ -133,4 +135,8 @@ class Http3WebTransportStream final : public Http3StreamBase,
} // namespace mozilla::net
inline nsISupports* ToSupports(mozilla::net::Http3WebTransportStream* aStream) {
return static_cast<nsIInputStreamCallback*>(aStream);
}
#endif // mozilla_net_Http3WebTransportStream_h

View File

@@ -66,7 +66,7 @@ struct Http3TestServer {
responses: HashMap<Http3OrWebTransportStream, Vec<u8>>,
current_connection_hash: u64,
sessions_to_close: HashMap<Instant, Vec<WebTransportRequest>>,
sessions_to_create_stream: Vec<(WebTransportRequest, StreamType, bool)>,
sessions_to_create_stream: Vec<(WebTransportRequest, StreamType, Option<Vec<u8>>)>,
webtransport_bidi_stream: HashSet<Http3OrWebTransportStream>,
wt_unidi_conn_to_stream: HashMap<ConnectionRef, Http3OrWebTransportStream>,
wt_unidi_echo_back: HashMap<Http3OrWebTransportStream, Http3OrWebTransportStream>,
@@ -153,9 +153,8 @@ impl Http3TestServer {
let session = tuple.0;
let wt_server_stream = session.create_stream(tuple.1).unwrap();
if tuple.1 == StreamType::UniDi {
if tuple.2 {
wt_server_stream.send_data(b"qwerty").unwrap();
wt_server_stream.stream_close_send().unwrap();
if let Some(data) = tuple.2 {
self.new_response(wt_server_stream, data);
} else {
// relaying Http3ServerEvent::Data to uni streams
// slows down netwerk/test/unit/test_webtransport_simple.js
@@ -164,12 +163,8 @@ impl Http3TestServer {
.insert(wt_server_stream.conn.clone(), wt_server_stream);
}
} else {
if tuple.2 {
wt_server_stream.send_data(b"asdfg").unwrap();
wt_server_stream.stream_close_send().unwrap();
wt_server_stream
.stream_stop_sending(Error::HttpNoError.code())
.unwrap();
if let Some(data) = tuple.2 {
self.new_response(wt_server_stream, data);
} else {
self.webtransport_bidi_stream.insert(wt_server_stream);
}
@@ -534,7 +529,7 @@ impl HttpServer for Http3TestServer {
self.sessions_to_create_stream.push((
session,
StreamType::UniDi,
false,
None,
));
} else if path == "/create_unidi_stream_and_hello" {
session
@@ -543,7 +538,7 @@ impl HttpServer for Http3TestServer {
self.sessions_to_create_stream.push((
session,
StreamType::UniDi,
true,
Some(Vec::from("qwerty")),
));
} else if path == "/create_bidi_stream" {
session
@@ -552,7 +547,7 @@ impl HttpServer for Http3TestServer {
self.sessions_to_create_stream.push((
session,
StreamType::BiDi,
false,
None,
));
} else if path == "/create_bidi_stream_and_hello" {
self.webtransport_bidi_stream.clear();
@@ -562,7 +557,18 @@ impl HttpServer for Http3TestServer {
self.sessions_to_create_stream.push((
session,
StreamType::BiDi,
true,
Some(Vec::from("asdfg")),
));
} else if path == "/create_bidi_stream_and_large_data" {
self.webtransport_bidi_stream.clear();
let data: Vec<u8> = vec![1u8; 32 * 1024 * 1024];
session
.response(&WebTransportSessionAcceptAction::Accept)
.unwrap();
self.sessions_to_create_stream.push((
session,
StreamType::BiDi,
Some(data),
));
} else {
session