Bug 1874102 - Implement RemoteStreamLimits for incoming stream flow control, r=necko-reviewers,jesup

Differential Revision: https://phabricator.services.mozilla.com/D243782
This commit is contained in:
Kershaw Chang
2025-04-29 08:45:29 +00:00
parent 7c3aab211b
commit 2f83405f00
7 changed files with 421 additions and 22 deletions

View File

@@ -10,6 +10,7 @@
#include "mozilla/net/NeqoHttp3Conn.h"
#include "mozilla/RefPtr.h"
#include "mozilla/Span.h"
#include "mozilla/net/NeqoHttp3Conn.h"
#include "nsTArray.h"
namespace mozilla::net {

View File

@@ -41,7 +41,10 @@ Http2WebTransportSessionImpl::CapsuleQueue::operator[](
Http2WebTransportSessionImpl::Http2WebTransportSessionImpl(
CapsuleIOHandler* aHandler, Http2WebTransportInitialSettings aSettings)
: mHandler(aHandler), mSettings(aSettings) {
: mSettings(aSettings),
mRemoteStreamsFlowControl(aSettings.mInitialLocalMaxStreamsBidi,
aSettings.mInitialLocalMaxStreamsUnidi),
mHandler(aHandler) {
LOG(("Http2WebTransportSessionImpl ctor:%p", this));
mLocalStreamsFlowControl[WebTransportStreamType::UniDi].Update(
mSettings.mInitialMaxStreamsUni);
@@ -162,6 +165,18 @@ void Http2WebTransportSessionImpl::SendFlowControlCapsules(
if (encoder) {
mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
}
encoder = mRemoteStreamsFlowControl[WebTransportStreamType::BiDi]
.FlowControl()
.CreateMaxStreamsCapsule();
if (encoder) {
mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
}
encoder = mRemoteStreamsFlowControl[WebTransportStreamType::UniDi]
.FlowControl()
.CreateMaxStreamsCapsule();
if (encoder) {
mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
}
}
void Http2WebTransportSessionImpl::PrepareCapsulesToSend(
@@ -198,6 +213,20 @@ void Http2WebTransportSessionImpl::Close(nsresult aReason) {
mIncomingStreams.Clear();
}
void Http2WebTransportSessionImpl::OnStreamClosed(
Http2WebTransportStream* aStream) {
LOG(("Http2WebTransportSessionImpl::OnStreamClosed %p stream:%p", this,
aStream));
RefPtr<Http2WebTransportStream> stream = aStream;
StreamId id = stream->WebTransportStreamId();
if (id.IsClientInitiated()) {
mOutgoingStreams.Remove(id);
} else {
mIncomingStreams.Remove(id);
mRemoteStreamsFlowControl[id.StreamType()].FlowControl().AddRetired(1);
}
}
bool Http2WebTransportSessionImpl::OnCapsule(Capsule&& aCapsule) {
switch (aCapsule.Type()) {
case CapsuleType::CLOSE_WEBTRANSPORT_SESSION:
@@ -218,25 +247,10 @@ bool Http2WebTransportSessionImpl::OnCapsule(Capsule&& aCapsule) {
case CapsuleType::WT_STREAM: {
WebTransportStreamDataCapsule& streamData =
aCapsule.GetWebTransportStreamDataCapsule();
// TODO: implement stream-level flow control
if (streamData.mID & 1) {
RefPtr<Http2WebTransportStream> stream =
mIncomingStreams.Get(streamData.mID);
if (!stream) {
stream =
new Http2WebTransportStream(this, StreamId::From(streamData.mID));
if (NS_FAILED(stream->Init())) {
return false;
}
mIncomingStreams.InsertOrUpdate(streamData.mID, stream);
if (nsCOMPtr<WebTransportSessionEventListenerInternal> listener =
do_QueryInterface(mListener)) {
listener->OnIncomingStreamAvailableInternal(stream);
}
}
if (NS_FAILED(stream->OnCapsule(std::move(aCapsule)))) {
return false;
}
StreamId id = StreamId(streamData.mID);
return ProcessIncomingStreamCapsule(std::move(aCapsule), id,
id.StreamType());
} else {
RefPtr<Http2WebTransportStream> stream =
mOutgoingStreams.Get(streamData.mID);
@@ -302,6 +316,45 @@ bool Http2WebTransportSessionImpl::OnCapsule(Capsule&& aCapsule) {
return true;
}
bool Http2WebTransportSessionImpl::ProcessIncomingStreamCapsule(
Capsule&& aCapsule, StreamId aID, WebTransportStreamType aStreamType) {
LOG(
("Http2WebTransportSessionImpl::ProcessIncomingStreamCapsule %p "
"aID=%" PRIu64 " type:%s",
this, (uint64_t)aID,
aStreamType == WebTransportStreamType::BiDi ? "BiDi" : "UniDi"));
RefPtr<Http2WebTransportStream> stream = mIncomingStreams.Get(aID);
if (stream) {
return NS_SUCCEEDED(stream->OnCapsule(std::move(aCapsule)));
}
while (true) {
auto res = mRemoteStreamsFlowControl[aStreamType].IsNewStream(aID);
if (res.isErr() || !res.unwrap()) {
break;
}
StreamId newStreamID =
mRemoteStreamsFlowControl[aStreamType].TakeStreamId();
stream = new Http2WebTransportStream(this, newStreamID);
if (NS_FAILED(stream->Init())) {
return false;
}
mIncomingStreams.InsertOrUpdate(newStreamID, stream);
if (nsCOMPtr<WebTransportSessionEventListenerInternal> listener =
do_QueryInterface(mListener)) {
listener->OnIncomingStreamAvailableInternal(stream);
}
}
stream = mIncomingStreams.Get(aID);
if (stream) {
return NS_SUCCEEDED(stream->OnCapsule(std::move(aCapsule)));
}
return true;
}
void Http2WebTransportSessionImpl::OnCapsuleParseFailure(nsresult aError) {
mHandler->OnCapsuleParseFailure(aError);
}

View File

@@ -40,14 +40,18 @@ class CapsuleIOHandler {
struct Http2WebTransportInitialSettings {
// Initial session-level data limit.
uint32_t mInitialMaxData = 0;
// Initial stream-level data limit for incoming unidirectional streams.
// Initial stream-level data limit for outgoing unidirectional streams.
uint32_t mInitialMaxStreamDataUni = 0;
// Initial stream-level data limit for incoming bidirectional streams.
// Initial stream-level data limit for outgoing bidirectional streams.
uint32_t mInitialMaxStreamDataBidi = 0;
// Initial max unidirectional streams per session.
uint32_t mInitialMaxStreamsUni = 0;
// Initial max bidirectional streams per session.
uint32_t mInitialMaxStreamsBidi = 0;
// Initial limit on unidirectional streams that the peer creates.
uint32_t mInitialLocalMaxStreamsUnidi = 16;
// Initial limit on bidirectional streams that the peer creates.
uint32_t mInitialLocalMaxStreamsBidi = 16;
};
enum class CapsuleTransmissionPriority : uint8_t {
@@ -84,6 +88,7 @@ class Http2WebTransportSessionImpl final : public WebTransportSessionBase,
void StartReading() override;
void Close(nsresult aReason);
void OnStreamClosed(Http2WebTransportStream* aStream);
void SendStreamDataCapsule(UniquePtr<CapsuleEncoder>&& aData);
void PrepareCapsulesToSend(
mozilla::Queue<UniquePtr<CapsuleEncoder>>& aOutput);
@@ -114,6 +119,8 @@ class Http2WebTransportSessionImpl final : public WebTransportSessionBase,
void ProcessPendingStreamCallbacks(
mozilla::Queue<UniquePtr<PendingStreamCallback>>& aCallbacks,
WebTransportStreamType aStreamType);
bool ProcessIncomingStreamCapsule(Capsule&& aCapsule, StreamId aID,
WebTransportStreamType aStreamType);
void SendFlowControlCapsules(CapsuleTransmissionPriority aPriority);
class CapsuleQueue final {
@@ -138,10 +145,11 @@ class Http2WebTransportSessionImpl final : public WebTransportSessionBase,
mozilla::Queue<UniquePtr<PendingStreamCallback>> mBidiPendingStreamCallbacks;
mozilla::Queue<UniquePtr<PendingStreamCallback>> mUnidiPendingStreamCallbacks;
Http2WebTransportInitialSettings mSettings;
LocalStreamLimits mLocalStreamsFlowControl;
RemoteStreamLimits mRemoteStreamsFlowControl;
RefPtr<CapsuleIOHandler> mHandler;
Http2WebTransportInitialSettings mSettings;
CapsuleQueue mCapsuleQueue;
};

View File

@@ -22,4 +22,18 @@ SenderFlowControlStreamType::CreateStreamsBlockedCapsule() {
return Some(encoder);
}
Maybe<CapsuleEncoder> ReceiverFlowControlStreamType::CreateMaxStreamsCapsule() {
if (!CapsuleNeeded()) {
return Nothing();
}
uint64_t maxStreams = NextLimit();
Capsule capsule = Capsule::WebTransportMaxStreams(
maxStreams, mType == WebTransportStreamType::BiDi);
CapsuleEncoder encoder;
encoder.EncodeCapsule(capsule);
CapsuleSent(maxStreams);
return Some(encoder);
}
} // namespace mozilla::net

View File

@@ -10,6 +10,7 @@
#include "CapsuleEncoder.h"
#include "mozilla/Assertions.h"
#include "mozilla/Maybe.h"
#include "mozilla/Result.h"
#include "mozilla/net/neqo_glue_ffi_generated.h"
#include "WebTransportStreamBase.h"
@@ -125,6 +126,131 @@ class LocalStreamLimits {
SenderFlowControlStreamType mUnidirectional;
};
class ReceiverFlowControlBase {
public:
explicit ReceiverFlowControlBase(uint64_t aMax)
: mMaxActive(aMax), mMaxAllowed(aMax) {}
void Retire(uint64_t aRetired) {
if (aRetired <= mRetired) {
return;
}
mRetired = aRetired;
if (mRetired + mMaxActive / 2 > mMaxAllowed) {
mCapsulePending = true;
}
}
void SendFlowControlUpdate() {
if (mRetired + mMaxActive > mMaxAllowed) {
mCapsulePending = true;
}
}
bool CapsuleNeeded() const { return mCapsulePending; }
uint64_t NextLimit() const { return mRetired + mMaxActive; }
uint64_t MaxActive() const { return mMaxActive; }
void SetMaxActive(uint64_t aMax) {
mCapsulePending |= (mMaxActive < aMax);
mMaxActive = aMax;
}
uint64_t Retired() const { return mRetired; }
uint64_t Consumed() const { return mConsumed; }
void CapsuleSent(uint64_t aNewMax) {
mMaxAllowed = aNewMax;
mCapsulePending = false;
}
protected:
uint64_t mMaxActive = 0;
uint64_t mMaxAllowed = 0;
uint64_t mConsumed = 0;
uint64_t mRetired = 0;
bool mCapsulePending = false;
};
class ReceiverFlowControlStreamType : public ReceiverFlowControlBase {
public:
ReceiverFlowControlStreamType(WebTransportStreamType aStreamType,
uint64_t aMax)
: ReceiverFlowControlBase(aMax), mType(aStreamType) {}
Maybe<CapsuleEncoder> CreateMaxStreamsCapsule();
bool CheckAllowed(uint64_t aNewEnd) const { return aNewEnd < mMaxAllowed; }
void AddRetired(uint64_t aCount) {
mRetired += aCount;
if (aCount > 0) {
SendFlowControlUpdate();
}
}
private:
WebTransportStreamType mType = WebTransportStreamType::BiDi;
};
class RemoteStreamLimit {
public:
RemoteStreamLimit(WebTransportStreamType aStreamType, uint64_t aMaxStreams)
: mStreamsFC(aStreamType, aMaxStreams) {
uint64_t typeBit = (aStreamType == WebTransportStreamType::BiDi) ? 0 : 2;
// Server initiated stream starts with 1.
mNextStreamId = StreamId(typeBit + 1);
}
bool IsAllowed(StreamId aStreamId) const {
uint64_t streamIndex = aStreamId >> 2;
return mStreamsFC.CheckAllowed(streamIndex);
}
Result<bool, nsresult> IsNewStream(StreamId aStreamId) const {
if (!IsAllowed(aStreamId)) {
return Err(NS_ERROR_NOT_AVAILABLE);
}
return aStreamId >= mNextStreamId;
}
StreamId TakeStreamId() {
StreamId newStream = mNextStreamId;
mNextStreamId.Next();
MOZ_ASSERT(IsAllowed(newStream));
return newStream;
}
ReceiverFlowControlStreamType& FlowControl() { return mStreamsFC; }
const ReceiverFlowControlStreamType& FlowControl() const {
return mStreamsFC;
}
private:
ReceiverFlowControlStreamType mStreamsFC;
StreamId mNextStreamId{1u};
};
class RemoteStreamLimits {
public:
RemoteStreamLimits(uint64_t aBidiMax, uint64_t aUniMax)
: mBidi(WebTransportStreamType::BiDi, aBidiMax),
mUni(WebTransportStreamType::UniDi, aUniMax) {}
RemoteStreamLimit& operator[](WebTransportStreamType aType) {
return aType == WebTransportStreamType::BiDi ? mBidi : mUni;
}
const RemoteStreamLimit& operator[](WebTransportStreamType aType) const {
return aType == WebTransportStreamType::BiDi ? mBidi : mUni;
}
private:
RemoteStreamLimit mBidi;
RemoteStreamLimit mUni;
};
} // namespace mozilla::net
#endif

View File

@@ -145,9 +145,14 @@ class MockWebTransportSessionEventListener
NS_DECL_WEBTRANSPORTSESSIONEVENTLISTENERINTERNAL
MockWebTransportSessionEventListener() {}
nsTArray<RefPtr<WebTransportStreamBase>> TakeIncomingStreams() {
return std::move(mIncomingStreams);
}
private:
virtual ~MockWebTransportSessionEventListener() = default;
nsTArray<RefPtr<WebTransportStreamBase>> mIncomingStreams;
};
NS_IMPL_ISUPPORTS(MockWebTransportSessionEventListener,
@@ -163,6 +168,7 @@ MockWebTransportSessionEventListener::OnSessionReadyInternal(
NS_IMETHODIMP
MockWebTransportSessionEventListener::OnIncomingStreamAvailableInternal(
WebTransportStreamBase* aStream) {
mIncomingStreams.AppendElement(RefPtr{aStream});
return NS_OK;
}
@@ -481,3 +487,57 @@ TEST(TestHttp2WebTransport, OutgoingBidiStream)
client->Done();
server->Done();
}
TEST(TestHttp2WebTransport, IncomingBidiStream)
{
Http2WebTransportInitialSettings settings;
settings.mInitialLocalMaxStreamsBidi = 1;
settings.mInitialLocalMaxStreamDataBidi = 512;
RefPtr<MockWebTransportClient> client = new MockWebTransportClient(settings);
RefPtr<MockWebTransportSessionEventListener> listener =
new MockWebTransportSessionEventListener();
client->Session()->SetWebTransportSessionEventListener(listener);
RefPtr<MockWebTransportServer> server = new MockWebTransportServer();
nsTArray<uint8_t> inputData;
CreateTestData(512, inputData);
nsTArray<uint8_t> cloned(inputData.Clone());
server->SendWebTransportStreamDataCapsule(1, false, std::move(cloned));
ClientProcessCapsules(server, client);
nsTArray<RefPtr<WebTransportStreamBase>> streams =
listener->TakeIncomingStreams();
ASSERT_EQ(streams.Length(), 1u);
nsCOMPtr<nsIAsyncOutputStream> writer;
nsCOMPtr<nsIAsyncInputStream> reader;
RefPtr<WebTransportStreamBase> stream = streams[0];
stream->GetWriterAndReader(getter_AddRefs(writer), getter_AddRefs(reader));
ValidateData(reader, inputData);
cloned = inputData.Clone();
server->SendWebTransportStreamDataCapsule(5, false, std::move(cloned));
ClientProcessCapsules(server, client);
streams = listener->TakeIncomingStreams();
ASSERT_EQ(streams.Length(), 0u);
client->Session()->OnStreamClosed(
static_cast<Http2WebTransportStream*>(stream.get()));
ServerProcessCapsules(server, client);
cloned = inputData.Clone();
server->SendWebTransportStreamDataCapsule(5, false, std::move(cloned));
ClientProcessCapsules(server, client);
streams = listener->TakeIncomingStreams();
ASSERT_EQ(streams.Length(), 1u);
client->Done();
server->Done();
}

View File

@@ -98,3 +98,140 @@ TEST(LocalStreamLimitsTest, StreamIdAllocation)
EXPECT_TRUE(encoder2.isSome());
EXPECT_EQ(extractLimitFromEncoder(*encoder2), 3u);
}
TEST(ReceiverFlowControlTest, NoNeedMaxAllowedFrameAtStart)
{
ReceiverFlowControlBase fc(0);
EXPECT_FALSE(fc.CapsuleNeeded());
}
TEST(ReceiverFlowControlTest, MaxAllowedAfterItemsRetired)
{
ReceiverFlowControlBase fc(100);
fc.Retire(49);
EXPECT_FALSE(fc.CapsuleNeeded());
fc.Retire(51);
EXPECT_TRUE(fc.CapsuleNeeded());
EXPECT_EQ(fc.NextLimit(), 151u);
}
TEST(ReceiverFlowControlTest, ForceSendMaxAllowed)
{
ReceiverFlowControlBase fc(100);
fc.Retire(10);
EXPECT_FALSE(fc.CapsuleNeeded());
}
TEST(ReceiverFlowControlTest, MultipleRetriesAfterFramePendingIsSet)
{
ReceiverFlowControlBase fc(100);
fc.Retire(51);
EXPECT_TRUE(fc.CapsuleNeeded());
EXPECT_EQ(fc.NextLimit(), 151u);
fc.Retire(61);
EXPECT_TRUE(fc.CapsuleNeeded());
EXPECT_EQ(fc.NextLimit(), 161u);
fc.Retire(88);
EXPECT_TRUE(fc.CapsuleNeeded());
EXPECT_EQ(fc.NextLimit(), 188u);
fc.Retire(90);
EXPECT_TRUE(fc.CapsuleNeeded());
EXPECT_EQ(fc.NextLimit(), 190u);
fc.CapsuleSent(190);
EXPECT_FALSE(fc.CapsuleNeeded());
fc.Retire(141);
EXPECT_TRUE(fc.CapsuleNeeded());
EXPECT_EQ(fc.NextLimit(), 241u);
fc.CapsuleSent(241);
EXPECT_FALSE(fc.CapsuleNeeded());
}
TEST(ReceiverFlowControlTest, ChangingMaxActive)
{
ReceiverFlowControlBase fc(100);
fc.SetMaxActive(50);
EXPECT_FALSE(fc.CapsuleNeeded());
fc.Retire(60);
EXPECT_FALSE(fc.CapsuleNeeded());
fc.Retire(76);
EXPECT_TRUE(fc.CapsuleNeeded());
EXPECT_EQ(fc.NextLimit(), 126u);
fc.SetMaxActive(60);
EXPECT_TRUE(fc.CapsuleNeeded());
EXPECT_EQ(fc.NextLimit(), 136u);
fc.Retire(136);
EXPECT_TRUE(fc.CapsuleNeeded());
EXPECT_EQ(fc.NextLimit(), 196u);
}
TEST(RemoteStreamLimitsTest, HandlesStreamLimitLogicWithRawIds)
{
RemoteStreamLimits fc(/*bidi=*/2, /*unidi=*/1);
StreamId bidi0(1); // Stream 0 (BiDi, server-initiated)
StreamId bidi1(5); // Stream 1
StreamId bidi2(9); // Stream 2
StreamId bidi3(13); // Stream 3
StreamId uni0(3); // Stream 0 (UniDi, server-initiated)
StreamId uni1(7); // Stream 1
StreamId uni2(11); // Stream 2
// Initial streams should be allowed
EXPECT_TRUE(fc[WebTransportStreamType::BiDi].IsNewStream(bidi0).unwrap());
EXPECT_TRUE(fc[WebTransportStreamType::BiDi].IsNewStream(bidi1).unwrap());
EXPECT_TRUE(fc[WebTransportStreamType::UniDi].IsNewStream(uni0).unwrap());
// Exceed limits
EXPECT_EQ(fc[WebTransportStreamType::BiDi].IsNewStream(bidi2).unwrapErr(),
NS_ERROR_NOT_AVAILABLE);
EXPECT_EQ(fc[WebTransportStreamType::UniDi].IsNewStream(uni1).unwrapErr(),
NS_ERROR_NOT_AVAILABLE);
// Take stream IDs
EXPECT_EQ(fc[WebTransportStreamType::BiDi].TakeStreamId(), bidi0);
EXPECT_EQ(fc[WebTransportStreamType::BiDi].TakeStreamId(), bidi1);
EXPECT_EQ(fc[WebTransportStreamType::UniDi].TakeStreamId(), uni0);
// Retire and allow new BiDi stream
fc[WebTransportStreamType::BiDi].FlowControl().AddRetired(1);
fc[WebTransportStreamType::BiDi].FlowControl().SendFlowControlUpdate();
// Send MaxStreams capsule
auto encoder =
fc[WebTransportStreamType::BiDi].FlowControl().CreateMaxStreamsCapsule();
EXPECT_TRUE(encoder.isSome());
auto extractLimitFromEncoder = [](CapsuleEncoder& encoder) -> uint64_t {
auto buffer = encoder.GetBuffer();
RefPtr<CapsuleParserListener> listener = new CapsuleParserListener();
UniquePtr<CapsuleParser> parser = MakeUnique<CapsuleParser>(listener);
parser->ProcessCapsuleData(buffer.Elements(), buffer.Length());
nsTArray<Capsule> parsed = listener->GetParsedCapsules();
WebTransportMaxStreamsCapsule maxStreams =
parsed[0].GetWebTransportMaxStreamsCapsule();
return maxStreams.mLimit;
};
EXPECT_EQ(extractLimitFromEncoder(*encoder), 3u);
EXPECT_TRUE(fc[WebTransportStreamType::BiDi].IsNewStream(bidi2).unwrap());
EXPECT_EQ(fc[WebTransportStreamType::BiDi].TakeStreamId(), bidi2);
EXPECT_EQ(fc[WebTransportStreamType::BiDi].IsNewStream(bidi3).unwrapErr(),
NS_ERROR_NOT_AVAILABLE);
// Retire and allow new UniDi stream
fc[WebTransportStreamType::UniDi].FlowControl().AddRetired(1);
fc[WebTransportStreamType::UniDi].FlowControl().SendFlowControlUpdate();
auto encoder1 =
fc[WebTransportStreamType::UniDi].FlowControl().CreateMaxStreamsCapsule();
EXPECT_TRUE(encoder1.isSome());
EXPECT_EQ(extractLimitFromEncoder(*encoder1), 2u);
EXPECT_TRUE(fc[WebTransportStreamType::UniDi].IsNewStream(uni1).unwrap());
EXPECT_EQ(fc[WebTransportStreamType::UniDi].TakeStreamId(), uni1);
EXPECT_EQ(fc[WebTransportStreamType::UniDi].IsNewStream(uni2).unwrapErr(),
NS_ERROR_NOT_AVAILABLE);
}