Source file
src/net/http/clientconn_test.go
1
2
3
4
5 package http_test
6
7 import (
8 "context"
9 "fmt"
10 "io"
11 "net/http"
12 "sync"
13 "sync/atomic"
14 "testing"
15 "testing/synctest"
16 )
17
18 func TestTransportNewClientConnRoundTrip(t *testing.T) { run(t, testTransportNewClientConnRoundTrip) }
19 func testTransportNewClientConnRoundTrip(t *testing.T, mode testMode) {
20 cst := newClientServerTest(t, mode, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
21 io.WriteString(w, req.Host)
22 }), optFakeNet)
23
24 scheme := mode.Scheme()
25 cc, err := cst.tr.NewClientConn(t.Context(), scheme, cst.ts.Listener.Addr().String())
26 if err != nil {
27 t.Fatal(err)
28 }
29 defer cc.Close()
30
31
32
33 for _, host := range []string{"example.tld", "go.dev"} {
34 req, _ := http.NewRequest("GET", fmt.Sprintf("%v://%v/", scheme, host), nil)
35 resp, err := cc.RoundTrip(req)
36 if err != nil {
37 t.Fatal(err)
38 }
39 got, _ := io.ReadAll(resp.Body)
40 if string(got) != host {
41 t.Errorf("got response body %q, want %v", got, host)
42 }
43 resp.Body.Close()
44
45
46 cst.tr.CloseIdleConnections()
47 }
48
49 if err := cc.Err(); err != nil {
50 t.Errorf("before close: ClientConn.Err() = %v, want nil", err)
51 }
52
53 cc.Close()
54 if err := cc.Err(); err == nil {
55 t.Errorf("after close: ClientConn.Err() = nil, want error")
56 }
57
58 req, _ := http.NewRequest("GET", scheme+"://example.tld/", nil)
59 resp, err := cc.RoundTrip(req)
60 if err == nil {
61 resp.Body.Close()
62 t.Errorf("after close: cc.RoundTrip succeeded, want error")
63 }
64 t.Log(err)
65 }
66
67 func newClientConnTest(t testing.TB, mode testMode, h http.HandlerFunc, opts ...any) (*clientServerTest, *http.ClientConn) {
68 if h == nil {
69 h = func(w http.ResponseWriter, req *http.Request) {}
70 }
71 cst := newClientServerTest(t, mode, h, opts...)
72 cc, err := cst.tr.NewClientConn(t.Context(), mode.Scheme(), cst.ts.Listener.Addr().String())
73 if err != nil {
74 t.Fatal(err)
75 }
76 t.Cleanup(func() {
77 cc.Close()
78 })
79 synctest.Wait()
80 return cst, cc
81 }
82
83
84 func TestClientConnReserveAll(t *testing.T) { runSynctest(t, testClientConnReserveAll) }
85 func testClientConnReserveAll(t *testing.T, mode testMode) {
86 cst, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
87 s.HTTP2 = &http.HTTP2Config{
88 MaxConcurrentStreams: 3,
89 }
90 })
91
92 want := 1
93 switch mode {
94 case http2Mode, http2UnencryptedMode:
95 want = cst.ts.Config.HTTP2.MaxConcurrentStreams
96 }
97 available := cc.Available()
98 if available != want {
99 t.Fatalf("cc.Available() = %v, want %v", available, want)
100 }
101
102
103 for i := range available {
104 if err := cc.Reserve(); err != nil {
105 t.Fatalf("cc.Reserve() #%v = %v, want nil", i, err)
106 }
107 if got, want := cc.Available(), available-i-1; got != want {
108 t.Fatalf("cc.Available() = %v, want %v", got, want)
109 }
110 if got, want := cc.InFlight(), i+1; got != want {
111 t.Fatalf("cc.InFlight() = %v, want %v", got, want)
112 }
113 }
114
115
116 if err := cc.Reserve(); err == nil {
117 t.Fatalf("cc.Reserve() = nil, want error")
118 }
119 }
120
121
122
123 func TestClientConnReserveParallel(t *testing.T) { runSynctest(t, testClientConnReserveParallel) }
124 func testClientConnReserveParallel(t *testing.T, mode testMode) {
125 _, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
126 s.HTTP2 = &http.HTTP2Config{
127 MaxConcurrentStreams: 3,
128 }
129 })
130 var (
131 wg sync.WaitGroup
132 mu sync.Mutex
133 success int
134 failure int
135 )
136 available := cc.Available()
137 const extra = 2
138 for range available + extra {
139 wg.Go(func() {
140 err := cc.Reserve()
141 mu.Lock()
142 defer mu.Unlock()
143 if err == nil {
144 success++
145 } else {
146 failure++
147 }
148 })
149 }
150 wg.Wait()
151
152 if got, want := success, available; got != want {
153 t.Errorf("%v successful reservations, want %v", got, want)
154 }
155 if got, want := failure, extra; got != want {
156 t.Errorf("%v failed reservations, want %v", got, want)
157 }
158 }
159
160
161 func TestClientConnReserveRelease(t *testing.T) { runSynctest(t, testClientConnReserveRelease) }
162 func testClientConnReserveRelease(t *testing.T, mode testMode) {
163 _, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
164 s.HTTP2 = &http.HTTP2Config{
165 MaxConcurrentStreams: 3,
166 }
167 })
168
169 available := cc.Available()
170 for i := range 2 * available {
171 if err := cc.Reserve(); err != nil {
172 t.Fatalf("cc.Reserve() #%v = %v, want nil", i, err)
173 }
174 cc.Release()
175 }
176
177 if got, want := cc.Available(), available; got != want {
178 t.Fatalf("cc.Available() = %v, want %v", available, want)
179 }
180 }
181
182
183
184 func TestClientConnReserveAndConsume(t *testing.T) {
185 for _, test := range []struct {
186 name string
187 consume func(t *testing.T, cc *http.ClientConn, mode testMode)
188 handler func(w http.ResponseWriter, req *http.Request, donec chan struct{})
189 h1Closed bool
190 }{{
191
192 name: "release",
193 consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
194 cc.Release()
195 },
196 }, {
197
198 name: "invalid field name",
199 consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
200 req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
201 req.Header["invalid field name"] = []string{"x"}
202 _, err := cc.RoundTrip(req)
203 if err == nil {
204 t.Fatalf("RoundTrip succeeded, want failure")
205 }
206 },
207 }, {
208
209 name: "body close",
210 consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
211 req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
212 resp, err := cc.RoundTrip(req)
213 if err != nil {
214 t.Fatalf("RoundTrip: %v", err)
215 }
216 resp.Body.Close()
217 },
218 }, {
219
220 name: "cancel",
221 consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
222 ctx, cancel := context.WithCancel(t.Context())
223 go func() {
224 req, _ := http.NewRequestWithContext(ctx, "GET", mode.Scheme()+"://example.tld/", nil)
225 _, err := cc.RoundTrip(req)
226 if err == nil {
227 t.Errorf("RoundTrip succeeded, want failure")
228 }
229 }()
230 synctest.Wait()
231 cancel()
232 },
233 handler: func(w http.ResponseWriter, req *http.Request, donec chan struct{}) {
234 <-donec
235 },
236
237 h1Closed: true,
238 }, {
239
240 name: "early body close",
241 consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
242 req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
243 resp, err := cc.RoundTrip(req)
244 if err != nil {
245 t.Fatalf("RoundTrip: %v", err)
246 }
247 t.Logf("%T", resp.Body)
248 resp.Body.Close()
249 },
250 handler: func(w http.ResponseWriter, req *http.Request, donec chan struct{}) {
251 w.WriteHeader(200)
252 http.NewResponseController(w).Flush()
253 <-donec
254 },
255
256 h1Closed: true,
257 }} {
258 t.Run(test.name, func(t *testing.T) {
259 runSynctest(t, func(t *testing.T, mode testMode) {
260 donec := make(chan struct{})
261 defer close(donec)
262 handler := func(w http.ResponseWriter, req *http.Request) {
263 if test.handler != nil {
264 test.handler(w, req, donec)
265 }
266 }
267
268 _, cc := newClientConnTest(t, mode, handler, optFakeNet)
269 stateHookCalls := 0
270 cc.SetStateHook(func(cc *http.ClientConn) {
271 stateHookCalls++
272 })
273 synctest.Wait()
274 stateHookCalls = 0
275
276 avail := cc.Available()
277 if err := cc.Reserve(); err != nil {
278 t.Fatalf("cc.Reserve() = %v, want nil", err)
279 }
280 synctest.Wait()
281 if got, want := stateHookCalls, 0; got != want {
282 t.Errorf("connection state hook calls: %v, want %v", got, want)
283 }
284
285 test.consume(t, cc, mode)
286 synctest.Wait()
287
288
289
290 if got, want := stateHookCalls, 1; got != want {
291 t.Errorf("connection state hook calls: %v, want %v", got, want)
292 }
293
294 if test.h1Closed && (mode == http1Mode || mode == https1Mode) {
295 if got, want := cc.Available(), 0; got != want {
296 t.Errorf("cc.Available() = %v, want %v", got, want)
297 }
298 if got, want := cc.InFlight(), 0; got != want {
299 t.Errorf("cc.InFlight() = %v, want %v", got, want)
300 }
301 if err := cc.Err(); err == nil {
302 t.Errorf("cc.Err() = nil, want closed connection")
303 }
304 } else {
305 if got, want := cc.Available(), avail; got != want {
306 t.Errorf("cc.Available() = %v, want %v", got, want)
307 }
308 if got, want := cc.InFlight(), 0; got != want {
309 t.Errorf("cc.InFlight() = %v, want %v", got, want)
310 }
311 if err := cc.Err(); err != nil {
312 t.Errorf("cc.Err() = %v, want nil", err)
313 }
314 }
315
316 if cc.Available() > 0 {
317 if err := cc.Reserve(); err != nil {
318 t.Errorf("cc.Reserve() = %v, want success", err)
319 }
320 }
321 })
322 })
323 }
324
325 }
326
327
328
329 func TestClientConnRoundTripBlocks(t *testing.T) { runSynctest(t, testClientConnRoundTripBlocks) }
330 func testClientConnRoundTripBlocks(t *testing.T, mode testMode) {
331 var handlerCalls atomic.Int64
332 requestc := make(chan struct{})
333 handler := func(w http.ResponseWriter, req *http.Request) {
334 handlerCalls.Add(1)
335 <-requestc
336 }
337 _, cc := newClientConnTest(t, mode, handler, optFakeNet, func(s *http.Server) {
338 s.HTTP2 = &http.HTTP2Config{
339 MaxConcurrentStreams: 3,
340 }
341 })
342
343 available := cc.Available()
344 var responses atomic.Int64
345 const extra = 2
346 for range available + extra {
347 go func() {
348 req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
349 resp, err := cc.RoundTrip(req)
350 responses.Add(1)
351 if err != nil {
352 t.Errorf("RoundTrip: %v", err)
353 return
354 }
355 resp.Body.Close()
356 }()
357 }
358
359 synctest.Wait()
360 if got, want := int(handlerCalls.Load()), available; got != want {
361 t.Errorf("got %v handler calls, want %v", got, want)
362 }
363 if got, want := int(responses.Load()), 0; got != want {
364 t.Errorf("got %v responses, want %v", got, want)
365 }
366
367 for i := range available + extra {
368 requestc <- struct{}{}
369 synctest.Wait()
370 if got, want := int(responses.Load()), i+1; got != want {
371 t.Errorf("got %v responses, want %v", got, want)
372 }
373 }
374 }
375
View as plain text