Source file src/net/http/clientconn_test.go

     1  // Copyright 2025 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package http_test
     6  
     7  import (
     8  	"context"
     9  	"fmt"
    10  	"io"
    11  	"net/http"
    12  	"sync"
    13  	"sync/atomic"
    14  	"testing"
    15  	"testing/synctest"
    16  )
    17  
    18  func TestTransportNewClientConnRoundTrip(t *testing.T) { run(t, testTransportNewClientConnRoundTrip) }
    19  func testTransportNewClientConnRoundTrip(t *testing.T, mode testMode) {
    20  	cst := newClientServerTest(t, mode, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    21  		io.WriteString(w, req.Host)
    22  	}), optFakeNet)
    23  
    24  	scheme := mode.Scheme() // http or https
    25  	cc, err := cst.tr.NewClientConn(t.Context(), scheme, cst.ts.Listener.Addr().String())
    26  	if err != nil {
    27  		t.Fatal(err)
    28  	}
    29  	defer cc.Close()
    30  
    31  	// Send requests for a couple different domains.
    32  	// All use the same connection.
    33  	for _, host := range []string{"example.tld", "go.dev"} {
    34  		req, _ := http.NewRequest("GET", fmt.Sprintf("%v://%v/", scheme, host), nil)
    35  		resp, err := cc.RoundTrip(req)
    36  		if err != nil {
    37  			t.Fatal(err)
    38  		}
    39  		got, _ := io.ReadAll(resp.Body)
    40  		if string(got) != host {
    41  			t.Errorf("got response body %q, want %v", got, host)
    42  		}
    43  		resp.Body.Close()
    44  
    45  		// CloseIdleConnections does not close connections created by NewClientConn.
    46  		cst.tr.CloseIdleConnections()
    47  	}
    48  
    49  	if err := cc.Err(); err != nil {
    50  		t.Errorf("before close: ClientConn.Err() = %v, want nil", err)
    51  	}
    52  
    53  	cc.Close()
    54  	if err := cc.Err(); err == nil {
    55  		t.Errorf("after close: ClientConn.Err() = nil, want error")
    56  	}
    57  
    58  	req, _ := http.NewRequest("GET", scheme+"://example.tld/", nil)
    59  	resp, err := cc.RoundTrip(req)
    60  	if err == nil {
    61  		resp.Body.Close()
    62  		t.Errorf("after close: cc.RoundTrip succeeded, want error")
    63  	}
    64  	t.Log(err)
    65  }
    66  
    67  func newClientConnTest(t testing.TB, mode testMode, h http.HandlerFunc, opts ...any) (*clientServerTest, *http.ClientConn) {
    68  	if h == nil {
    69  		h = func(w http.ResponseWriter, req *http.Request) {}
    70  	}
    71  	cst := newClientServerTest(t, mode, h, opts...)
    72  	cc, err := cst.tr.NewClientConn(t.Context(), mode.Scheme(), cst.ts.Listener.Addr().String())
    73  	if err != nil {
    74  		t.Fatal(err)
    75  	}
    76  	t.Cleanup(func() {
    77  		cc.Close()
    78  	})
    79  	synctest.Wait()
    80  	return cst, cc
    81  }
    82  
    83  // TestClientConnReserveAll reserves every concurrency slot on a connection.
    84  func TestClientConnReserveAll(t *testing.T) { runSynctest(t, testClientConnReserveAll) }
    85  func testClientConnReserveAll(t *testing.T, mode testMode) {
    86  	cst, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
    87  		s.HTTP2 = &http.HTTP2Config{
    88  			MaxConcurrentStreams: 3,
    89  		}
    90  	})
    91  
    92  	want := 1
    93  	switch mode {
    94  	case http2Mode, http2UnencryptedMode:
    95  		want = cst.ts.Config.HTTP2.MaxConcurrentStreams
    96  	}
    97  	available := cc.Available()
    98  	if available != want {
    99  		t.Fatalf("cc.Available() = %v, want %v", available, want)
   100  	}
   101  
   102  	// Reserve every available concurrency slot on the connection.
   103  	for i := range available {
   104  		if err := cc.Reserve(); err != nil {
   105  			t.Fatalf("cc.Reserve() #%v = %v, want nil", i, err)
   106  		}
   107  		if got, want := cc.Available(), available-i-1; got != want {
   108  			t.Fatalf("cc.Available() = %v, want %v", got, want)
   109  		}
   110  		if got, want := cc.InFlight(), i+1; got != want {
   111  			t.Fatalf("cc.InFlight() = %v, want %v", got, want)
   112  		}
   113  	}
   114  
   115  	// The next reservation attempt should fail, since every slot is consumed.
   116  	if err := cc.Reserve(); err == nil {
   117  		t.Fatalf("cc.Reserve() = nil, want error")
   118  	}
   119  }
   120  
   121  // TestClientConnReserveParallel starts concurrent goroutines which reserve every
   122  // concurrency slot on a connection.
   123  func TestClientConnReserveParallel(t *testing.T) { runSynctest(t, testClientConnReserveParallel) }
   124  func testClientConnReserveParallel(t *testing.T, mode testMode) {
   125  	_, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
   126  		s.HTTP2 = &http.HTTP2Config{
   127  			MaxConcurrentStreams: 3,
   128  		}
   129  	})
   130  	var (
   131  		wg      sync.WaitGroup
   132  		mu      sync.Mutex
   133  		success int
   134  		failure int
   135  	)
   136  	available := cc.Available()
   137  	const extra = 2
   138  	for range available + extra {
   139  		wg.Go(func() {
   140  			err := cc.Reserve()
   141  			mu.Lock()
   142  			defer mu.Unlock()
   143  			if err == nil {
   144  				success++
   145  			} else {
   146  				failure++
   147  			}
   148  		})
   149  	}
   150  	wg.Wait()
   151  
   152  	if got, want := success, available; got != want {
   153  		t.Errorf("%v successful reservations, want %v", got, want)
   154  	}
   155  	if got, want := failure, extra; got != want {
   156  		t.Errorf("%v failed reservations, want %v", got, want)
   157  	}
   158  }
   159  
   160  // TestClientConnReserveRelease repeatedly reserves and releases concurrency slots.
   161  func TestClientConnReserveRelease(t *testing.T) { runSynctest(t, testClientConnReserveRelease) }
   162  func testClientConnReserveRelease(t *testing.T, mode testMode) {
   163  	_, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
   164  		s.HTTP2 = &http.HTTP2Config{
   165  			MaxConcurrentStreams: 3,
   166  		}
   167  	})
   168  
   169  	available := cc.Available()
   170  	for i := range 2 * available {
   171  		if err := cc.Reserve(); err != nil {
   172  			t.Fatalf("cc.Reserve() #%v = %v, want nil", i, err)
   173  		}
   174  		cc.Release()
   175  	}
   176  
   177  	if got, want := cc.Available(), available; got != want {
   178  		t.Fatalf("cc.Available() = %v, want %v", available, want)
   179  	}
   180  }
   181  
   182  // TestClientConnReserveAndConsume reserves a concurrency slot on a connection,
   183  // and then verifies that various events consume the reservation.
   184  func TestClientConnReserveAndConsume(t *testing.T) {
   185  	for _, test := range []struct {
   186  		name     string
   187  		consume  func(t *testing.T, cc *http.ClientConn, mode testMode)
   188  		handler  func(w http.ResponseWriter, req *http.Request, donec chan struct{})
   189  		h1Closed bool
   190  	}{{
   191  		// Explicit release.
   192  		name: "release",
   193  		consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
   194  			cc.Release()
   195  		},
   196  	}, {
   197  		// Invalid request sent to RoundTrip.
   198  		name: "invalid field name",
   199  		consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
   200  			req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
   201  			req.Header["invalid field name"] = []string{"x"}
   202  			_, err := cc.RoundTrip(req)
   203  			if err == nil {
   204  				t.Fatalf("RoundTrip succeeded, want failure")
   205  			}
   206  		},
   207  	}, {
   208  		// Successful request/response cycle.
   209  		name: "body close",
   210  		consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
   211  			req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
   212  			resp, err := cc.RoundTrip(req)
   213  			if err != nil {
   214  				t.Fatalf("RoundTrip: %v", err)
   215  			}
   216  			resp.Body.Close()
   217  		},
   218  	}, {
   219  		// Request context canceled before headers received.
   220  		name: "cancel",
   221  		consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
   222  			ctx, cancel := context.WithCancel(t.Context())
   223  			go func() {
   224  				req, _ := http.NewRequestWithContext(ctx, "GET", mode.Scheme()+"://example.tld/", nil)
   225  				_, err := cc.RoundTrip(req)
   226  				if err == nil {
   227  					t.Errorf("RoundTrip succeeded, want failure")
   228  				}
   229  			}()
   230  			synctest.Wait()
   231  			cancel()
   232  		},
   233  		handler: func(w http.ResponseWriter, req *http.Request, donec chan struct{}) {
   234  			<-donec
   235  		},
   236  		// An HTTP/1 connection is closed after a request is canceled on it.
   237  		h1Closed: true,
   238  	}, {
   239  		// Response body closed before full response received.
   240  		name: "early body close",
   241  		consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
   242  			req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
   243  			resp, err := cc.RoundTrip(req)
   244  			if err != nil {
   245  				t.Fatalf("RoundTrip: %v", err)
   246  			}
   247  			t.Logf("%T", resp.Body)
   248  			resp.Body.Close()
   249  		},
   250  		handler: func(w http.ResponseWriter, req *http.Request, donec chan struct{}) {
   251  			w.WriteHeader(200)
   252  			http.NewResponseController(w).Flush()
   253  			<-donec
   254  		},
   255  		// An HTTP/1 connection is closed after a request is canceled on it.
   256  		h1Closed: true,
   257  	}} {
   258  		t.Run(test.name, func(t *testing.T) {
   259  			runSynctest(t, func(t *testing.T, mode testMode) {
   260  				donec := make(chan struct{})
   261  				defer close(donec)
   262  				handler := func(w http.ResponseWriter, req *http.Request) {
   263  					if test.handler != nil {
   264  						test.handler(w, req, donec)
   265  					}
   266  				}
   267  
   268  				_, cc := newClientConnTest(t, mode, handler, optFakeNet)
   269  				stateHookCalls := 0
   270  				cc.SetStateHook(func(cc *http.ClientConn) {
   271  					stateHookCalls++
   272  				})
   273  				synctest.Wait()
   274  				stateHookCalls = 0 // ignore any initial update call
   275  
   276  				avail := cc.Available()
   277  				if err := cc.Reserve(); err != nil {
   278  					t.Fatalf("cc.Reserve() = %v, want nil", err)
   279  				}
   280  				synctest.Wait()
   281  				if got, want := stateHookCalls, 0; got != want {
   282  					t.Errorf("connection state hook calls: %v, want %v", got, want)
   283  				}
   284  
   285  				test.consume(t, cc, mode)
   286  				synctest.Wait()
   287  
   288  				// State hook should be called, either to report the
   289  				// connection availability increasing or the connection closing.
   290  				if got, want := stateHookCalls, 1; got != want {
   291  					t.Errorf("connection state hook calls: %v, want %v", got, want)
   292  				}
   293  
   294  				if test.h1Closed && (mode == http1Mode || mode == https1Mode) {
   295  					if got, want := cc.Available(), 0; got != want {
   296  						t.Errorf("cc.Available() = %v, want %v", got, want)
   297  					}
   298  					if got, want := cc.InFlight(), 0; got != want {
   299  						t.Errorf("cc.InFlight() = %v, want %v", got, want)
   300  					}
   301  					if err := cc.Err(); err == nil {
   302  						t.Errorf("cc.Err() = nil, want closed connection")
   303  					}
   304  				} else {
   305  					if got, want := cc.Available(), avail; got != want {
   306  						t.Errorf("cc.Available() = %v, want %v", got, want)
   307  					}
   308  					if got, want := cc.InFlight(), 0; got != want {
   309  						t.Errorf("cc.InFlight() = %v, want %v", got, want)
   310  					}
   311  					if err := cc.Err(); err != nil {
   312  						t.Errorf("cc.Err() = %v, want nil", err)
   313  					}
   314  				}
   315  
   316  				if cc.Available() > 0 {
   317  					if err := cc.Reserve(); err != nil {
   318  						t.Errorf("cc.Reserve() = %v, want success", err)
   319  					}
   320  				}
   321  			})
   322  		})
   323  	}
   324  
   325  }
   326  
   327  // TestClientConnRoundTripBlocks verifies that RoundTrip blocks until a concurrency
   328  // slot is available on a connection.
   329  func TestClientConnRoundTripBlocks(t *testing.T) { runSynctest(t, testClientConnRoundTripBlocks) }
   330  func testClientConnRoundTripBlocks(t *testing.T, mode testMode) {
   331  	var handlerCalls atomic.Int64
   332  	requestc := make(chan struct{})
   333  	handler := func(w http.ResponseWriter, req *http.Request) {
   334  		handlerCalls.Add(1)
   335  		<-requestc
   336  	}
   337  	_, cc := newClientConnTest(t, mode, handler, optFakeNet, func(s *http.Server) {
   338  		s.HTTP2 = &http.HTTP2Config{
   339  			MaxConcurrentStreams: 3,
   340  		}
   341  	})
   342  
   343  	available := cc.Available()
   344  	var responses atomic.Int64
   345  	const extra = 2
   346  	for range available + extra {
   347  		go func() {
   348  			req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
   349  			resp, err := cc.RoundTrip(req)
   350  			responses.Add(1)
   351  			if err != nil {
   352  				t.Errorf("RoundTrip: %v", err)
   353  				return
   354  			}
   355  			resp.Body.Close()
   356  		}()
   357  	}
   358  
   359  	synctest.Wait()
   360  	if got, want := int(handlerCalls.Load()), available; got != want {
   361  		t.Errorf("got %v handler calls, want %v", got, want)
   362  	}
   363  	if got, want := int(responses.Load()), 0; got != want {
   364  		t.Errorf("got %v responses, want %v", got, want)
   365  	}
   366  
   367  	for i := range available + extra {
   368  		requestc <- struct{}{}
   369  		synctest.Wait()
   370  		if got, want := int(responses.Load()), i+1; got != want {
   371  			t.Errorf("got %v responses, want %v", got, want)
   372  		}
   373  	}
   374  }
   375  

View as plain text