Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions default_dial_option_server_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (s) TestJoinDialOption(t *testing.T) {
if cc.dopts.copts.InitialWindowSize != initialWindowSize {
t.Fatalf("Unexpected cc.dopts.copts.InitialWindowSize: %d != %d", cc.dopts.copts.InitialWindowSize, initialWindowSize)
}
// Make sure static window size is not enabled when using WithInitialWindowSize.
if cc.dopts.copts.StaticWindowSize {
t.Fatalf("Unexpected cc.dopts.copts.StaticWindowSize: %t", cc.dopts.copts.StaticWindowSize)
}
}

// TestJoinServerOption tests the join server option. It configures a joined
Expand All @@ -162,6 +166,10 @@ func (s) TestJoinServerOption(t *testing.T) {
if s.opts.initialWindowSize != initialWindowSize {
t.Fatalf("Unexpected s.opts.initialWindowSize: %d != %d", s.opts.initialWindowSize, initialWindowSize)
}
// Make sure static window size is not enabled when using InitialWindowSize.
if s.opts.staticWindowSize {
t.Fatalf("Unexpected s.opts.staticWindowSize: %t", s.opts.staticWindowSize)
}
}

// funcTestHeaderListSizeDialOptionServerOption tests
Expand Down
13 changes: 11 additions & 2 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,20 +210,29 @@ func WithReadBufferSize(s int) DialOption {
// WithInitialWindowSize returns a DialOption which sets the value for initial
// window size on a stream. The lower bound for window size is 64K and any value
// smaller than that will be ignored.
//
// Deprecated: use [WithInitialStreamWindowSize] to set a stream window size
// without disabling dynamic flow control. Will be supported throughout 1.x.
func WithInitialWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialWindowSize = s
o.copts.StaticWindowSize = true
})
}

// WithInitialStreamWindowSize returns a DialOption which sets the value for
// initial window size on a stream without disabling dynamic flow control. The
// lower bound for window size is 64K and any value smaller than that will be
// ignored.
func WithInitialStreamWindowSize(s int32) DialOption {
return WithInitialWindowSize(s)
}

// WithInitialConnWindowSize returns a DialOption which sets the value for
// initial window size on a connection. The lower bound for window size is 64K
// and any value smaller than that will be ignored.
func WithInitialConnWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialConnWindowSize = s
o.copts.StaticWindowSize = true
})
}

Expand Down
15 changes: 12 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,20 +277,29 @@ func ReadBufferSize(s int) ServerOption {
}

// InitialWindowSize returns a ServerOption that sets window size for stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
// The lower bound for window size is 64K and any value smaller than that will
// be ignored.
//
// Deprecated: use [InitialStreamWindowSize] to set a stream window size without
// disabling dynamic flow control. Will be supported throughout 1.x.
func InitialWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialWindowSize = s
o.staticWindowSize = true
})
}

// InitialStreamWindowSize returns a ServerOption that sets window size for
// stream without disabling dynamic flow control. The lower bound for window
// size is 64K and any value smaller than that will be ignored.
func InitialStreamWindowSize(s int32) ServerOption {
return InitialWindowSize(s)
}

// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func InitialConnWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialConnWindowSize = s
o.staticWindowSize = true
})
}

Expand Down
5 changes: 5 additions & 0 deletions test/channelz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,9 @@ func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) {
func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) {
e := tcpClearRREnv
te := newTest(t, e)
// Disable BDP estimation to make sure the flow control is violated when a
// large message is sent.
te.isServerStaticWindow = true
te.serverInitialWindowSize = 65536
// Avoid overflowing connection level flow control window, which will lead to
// transport being closed.
Expand Down Expand Up @@ -1146,6 +1149,8 @@ func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
e := tcpClearRREnv
te := newTest(t, e)
// disable BDP
te.isServerStaticWindow = true
te.clientStaticWindow = true
te.serverInitialWindowSize = 65536
te.serverInitialConnWindowSize = 65536
te.clientInitialWindowSize = 65536
Expand Down
100 changes: 80 additions & 20 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ type test struct {
unaryServerInt grpc.UnaryServerInterceptor
streamServerInt grpc.StreamServerInterceptor
serverInitialWindowSize int32
isServerStaticWindow bool
serverInitialConnWindowSize int32
customServerOptions []grpc.ServerOption

Expand All @@ -505,14 +506,16 @@ type test struct {
// Used to test the new compressor registration API UseCompressor.
clientUseCompression bool
// clientNopCompression is set to create a compressor whose type is not supported.
clientNopCompression bool
unaryClientInt grpc.UnaryClientInterceptor
streamClientInt grpc.StreamClientInterceptor
clientInitialWindowSize int32
clientInitialConnWindowSize int32
perRPCCreds credentials.PerRPCCredentials
customDialOptions []grpc.DialOption
resolverScheme string
clientNopCompression bool
unaryClientInt grpc.UnaryClientInterceptor
streamClientInt grpc.StreamClientInterceptor
clientInitialWindowSize int32
clientUseInitialStreamWindowSize bool
clientInitialConnWindowSize int32
clientStaticWindow bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the server side boolean, please change re-name to isClientWindowStatic

perRPCCreds credentials.PerRPCCredentials
customDialOptions []grpc.DialOption
resolverScheme string

// These are set once startServer is called. The common case is to have
// only one testServer.
Expand Down Expand Up @@ -606,11 +609,21 @@ func (te *test) listenAndServe(ts testgrpc.TestServiceServer, listen func(networ
sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
}
if te.serverInitialWindowSize > 0 {
sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
if te.isServerStaticWindow {
sopts = append(sopts, grpc.StaticStreamWindowSize(te.serverInitialWindowSize))
} else {
sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
}
}

if te.serverInitialConnWindowSize > 0 {
sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
if te.isServerStaticWindow {
sopts = append(sopts, grpc.StaticConnWindowSize(te.serverInitialConnWindowSize))
} else {
sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
}
}

la := ":0"
if te.e.network == "unix" {
la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano())
Expand Down Expand Up @@ -817,10 +830,21 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string)
opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, te.e.balancer)))
}
if te.clientInitialWindowSize > 0 {
opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
if te.clientStaticWindow {
opts = append(opts, grpc.WithStaticStreamWindowSize(te.clientInitialWindowSize))
} else if te.clientUseInitialStreamWindowSize {
opts = append(opts, grpc.WithInitialStreamWindowSize(te.clientInitialWindowSize))
} else {
opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add tests with both WithInitialStreamWindowSize and WithInitialWindowSize options to ensure both code paths are verified if they diverge in future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}

if te.clientInitialConnWindowSize > 0 {
opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
if te.clientStaticWindow {
opts = append(opts, grpc.WithStaticConnWindowSize(te.clientInitialConnWindowSize))
} else {
opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
}
}
if te.perRPCCreds != nil {
opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds))
Expand Down Expand Up @@ -5418,18 +5442,23 @@ func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) {
}

type windowSizeConfig struct {
serverStream int32
serverConn int32
clientStream int32
clientConn int32
serverStream int32
serverConn int32
clientStream int32
clientConn int32
serverStaticWindow bool
clientStaticWindow bool
Comment on lines +5449 to +5450
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename the variables to be similar to isClientWindowStatic and isServerWindowStatic.

clientUseInitialStreamWindowSize bool
}

func (s) TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
wc := windowSizeConfig{
serverStream: 8 * 1024 * 1024,
serverConn: 12 * 1024 * 1024,
clientStream: 6 * 1024 * 1024,
clientConn: 8 * 1024 * 1024,
serverStream: 8 * 1024 * 1024,
serverConn: 12 * 1024 * 1024,
clientStream: 6 * 1024 * 1024,
clientConn: 8 * 1024 * 1024,
serverStaticWindow: true,
clientStaticWindow: true,
}
for _, e := range listTestEnv() {
testConfigurableWindowSize(t, e, wc)
Expand All @@ -5448,12 +5477,43 @@ func (s) TestConfigurableWindowSizeWithSmallWindow(t *testing.T) {
}
}

func (s) TestConfigurableInitialStreamWindowSizeWithLargeWindow(t *testing.T) {
wc := windowSizeConfig{
serverStream: 8 * 1024 * 1024,
serverConn: 12 * 1024 * 1024,
clientStream: 6 * 1024 * 1024,
clientConn: 8 * 1024 * 1024,
serverStaticWindow: true,
clientStaticWindow: true,
clientUseInitialStreamWindowSize: true,
}
for _, e := range listTestEnv() {
testConfigurableWindowSize(t, e, wc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test explicitly assert the static/dynamic behavior of the client/server windows? I would expect the test to break if the window type changes unexpectedly. Is that currently covered?

}
}

func (s) TestConfigurableInitialStreamWindowSizeWithSmallWindow(t *testing.T) {
wc := windowSizeConfig{
serverStream: 1,
serverConn: 1,
clientStream: 1,
clientConn: 1,
clientUseInitialStreamWindowSize: true,
}
for _, e := range listTestEnv() {
testConfigurableWindowSize(t, e, wc)
}
}

func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
te := newTest(t, e)
te.serverInitialWindowSize = wc.serverStream
te.serverInitialConnWindowSize = wc.serverConn
te.clientInitialWindowSize = wc.clientStream
te.clientInitialConnWindowSize = wc.clientConn
te.isServerStaticWindow = wc.serverStaticWindow
te.clientStaticWindow = wc.clientStaticWindow
te.clientUseInitialStreamWindowSize = wc.clientUseInitialStreamWindowSize

te.startServer(&testServer{security: e.security})
defer te.tearDown()
Expand Down
8 changes: 6 additions & 2 deletions test/stream_cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func (s) TestStreamCleanup(t *testing.T) {
return &testpb.Empty{}, nil
},
}
if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {

// Disable BDP to ensure the message size exceeds window size.
if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithStaticStreamWindowSize(int32(initialWindowSize))); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
Expand Down Expand Up @@ -81,7 +83,9 @@ func (s) TestStreamCleanupAfterSendStatus(t *testing.T) {
})
},
}
if err := ss.Start(nil, grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {

// Disable BDP to ensure the message size exceeds window size.
if err := ss.Start(nil, grpc.WithStaticStreamWindowSize(int32(initialWindowSize))); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
Expand Down