Source file
src/net/http/clientconn.go
1
2
3
4
5 package http
6
7 import (
8 "context"
9 "errors"
10 "fmt"
11 "net"
12 "net/http/httptrace"
13 "net/url"
14 "sync"
15 )
16
17
18
19
20
21 type ClientConn struct {
22 cc genericClientConn
23
24 stateHookMu sync.Mutex
25 userStateHook func(*ClientConn)
26 stateHookRunning bool
27 lastAvailable int
28 lastInFlight int
29 lastClosed bool
30 }
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 type newClientConner interface {
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 NewClientConn(nc net.Conn, internalStateHook func()) (RoundTripper, error)
77 }
78
79
80
81
82
83 type genericClientConn interface {
84 Close() error
85 Err() error
86 RoundTrip(req *Request) (*Response, error)
87 Reserve() error
88 Release()
89 Available() int
90 InFlight() int
91 }
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 func (t *Transport) NewClientConn(ctx context.Context, scheme, address string) (*ClientConn, error) {
114 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
115
116 switch scheme {
117 case "http", "https":
118 default:
119 return nil, fmt.Errorf("net/http: invalid scheme %q", scheme)
120 }
121
122 host, port, err := net.SplitHostPort(address)
123 if err != nil {
124 return nil, err
125 }
126 if port == "" {
127 port = schemePort(scheme)
128 }
129
130 var proxyURL *url.URL
131 if t.Proxy != nil {
132
133 req := &Request{
134 ctx: ctx,
135 Method: "GET",
136 URL: &url.URL{
137 Scheme: scheme,
138 Host: host,
139 Path: "/",
140 },
141 Proto: "HTTP/1.1",
142 ProtoMajor: 1,
143 ProtoMinor: 1,
144 Header: make(Header),
145 Body: NoBody,
146 Host: host,
147 }
148 var err error
149 proxyURL, err = t.Proxy(req)
150 if err != nil {
151 return nil, err
152 }
153 }
154
155 cm := connectMethod{
156 targetScheme: scheme,
157 targetAddr: net.JoinHostPort(host, port),
158 proxyURL: proxyURL,
159 }
160
161
162
163
164
165
166
167
168
169 cc := &ClientConn{}
170 const isClientConn = true
171 pconn, err := t.dialConn(ctx, cm, isClientConn, cc.maybeRunStateHook)
172 if err != nil {
173 return nil, err
174 }
175
176
177
178
179 cc.stateHookMu.Lock()
180 defer cc.stateHookMu.Unlock()
181 if pconn.alt != nil {
182
183
184 gc, ok := pconn.alt.(genericClientConn)
185 if !ok {
186 return nil, errors.New("http: NewClientConn returned something that is not a ClientConn")
187 }
188 cc.cc = gc
189 cc.lastAvailable = gc.Available()
190 } else {
191
192 pconn.availch = make(chan struct{}, 1)
193 pconn.availch <- struct{}{}
194 cc.cc = http1ClientConn{pconn}
195 cc.lastAvailable = 1
196 }
197 return cc, nil
198 }
199
200
201
202 func (cc *ClientConn) Close() error {
203 defer cc.maybeRunStateHook()
204 return cc.cc.Close()
205 }
206
207
208
209
210 func (cc *ClientConn) Err() error {
211 return cc.cc.Err()
212 }
213
214 func validateClientConnRequest(req *Request) error {
215 if req.URL == nil {
216 return errors.New("http: nil Request.URL")
217 }
218 if req.Header == nil {
219 return errors.New("http: nil Request.Header")
220 }
221
222 if err := validateHeaders(req.Header); err != "" {
223 return fmt.Errorf("http: invalid header %s", err)
224 }
225
226 if err := validateHeaders(req.Trailer); err != "" {
227 return fmt.Errorf("http: invalid trailer %s", err)
228 }
229 if req.Method != "" && !validMethod(req.Method) {
230 return fmt.Errorf("http: invalid method %q", req.Method)
231 }
232 if req.URL.Host == "" {
233 return errors.New("http: no Host in request URL")
234 }
235 return nil
236 }
237
238
239
240
241
242
243
244
245
246 func (cc *ClientConn) RoundTrip(req *Request) (*Response, error) {
247 defer cc.maybeRunStateHook()
248 if err := validateClientConnRequest(req); err != nil {
249 cc.Release()
250 return nil, err
251 }
252 return cc.cc.RoundTrip(req)
253 }
254
255
256
257
258 func (cc *ClientConn) Available() int {
259 return cc.cc.Available()
260 }
261
262
263
264
265 func (cc *ClientConn) InFlight() int {
266 return cc.cc.InFlight()
267 }
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282 func (cc *ClientConn) Reserve() error {
283 defer cc.maybeRunStateHook()
284 return cc.cc.Reserve()
285 }
286
287
288
289 func (cc *ClientConn) Release() {
290 defer cc.maybeRunStateHook()
291 cc.cc.Release()
292 }
293
294
295
296 func (cc *ClientConn) shouldRunStateHook(stopRunning bool) func(*ClientConn) {
297 cc.stateHookMu.Lock()
298 defer cc.stateHookMu.Unlock()
299 if cc.cc == nil {
300 return nil
301 }
302 if stopRunning {
303 cc.stateHookRunning = false
304 }
305 if cc.userStateHook == nil {
306 return nil
307 }
308 if cc.stateHookRunning {
309 return nil
310 }
311 var (
312 available = cc.Available()
313 inFlight = cc.InFlight()
314 closed = cc.Err() != nil
315 )
316 var hook func(*ClientConn)
317 if available > cc.lastAvailable || inFlight < cc.lastInFlight || closed != cc.lastClosed {
318 hook = cc.userStateHook
319 cc.stateHookRunning = true
320 }
321 cc.lastAvailable = available
322 cc.lastInFlight = inFlight
323 cc.lastClosed = closed
324 return hook
325 }
326
327 func (cc *ClientConn) maybeRunStateHook() {
328 hook := cc.shouldRunStateHook(false)
329 if hook == nil {
330 return
331 }
332
333
334
335
336
337 hook(cc)
338
339
340
341
342
343 hook = cc.shouldRunStateHook(true)
344 if hook != nil {
345 go func() {
346 for hook != nil {
347 hook(cc)
348 hook = cc.shouldRunStateHook(true)
349 }
350 }()
351 }
352 }
353
354
355
356
357
358
359
360
361
362
363
364
365
366 func (cc *ClientConn) SetStateHook(f func(*ClientConn)) {
367 cc.stateHookMu.Lock()
368 cc.userStateHook = f
369 cc.stateHookMu.Unlock()
370 cc.maybeRunStateHook()
371 }
372
373
374
375 type http1ClientConn struct {
376 pconn *persistConn
377 }
378
379 func (cc http1ClientConn) RoundTrip(req *Request) (*Response, error) {
380 ctx := req.Context()
381 trace := httptrace.ContextClientTrace(ctx)
382
383
384 ctx, cancel := context.WithCancelCause(req.Context())
385 if req.Cancel != nil {
386 go awaitLegacyCancel(ctx, cancel, req)
387 }
388
389 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
390 resp, err := cc.pconn.roundTrip(treq)
391 if err != nil {
392 return nil, err
393 }
394 resp.Request = req
395 return resp, nil
396 }
397
398 func (cc http1ClientConn) Close() error {
399 cc.pconn.close(errors.New("ClientConn closed"))
400 return nil
401 }
402
403 func (cc http1ClientConn) Err() error {
404 select {
405 case <-cc.pconn.closech:
406 return cc.pconn.closed
407 default:
408 return nil
409 }
410 }
411
412 func (cc http1ClientConn) Available() int {
413 cc.pconn.mu.Lock()
414 defer cc.pconn.mu.Unlock()
415 if cc.pconn.closed != nil || cc.pconn.reserved || cc.pconn.inFlight {
416 return 0
417 }
418 return 1
419 }
420
421 func (cc http1ClientConn) InFlight() int {
422 cc.pconn.mu.Lock()
423 defer cc.pconn.mu.Unlock()
424 if cc.pconn.closed == nil && (cc.pconn.reserved || cc.pconn.inFlight) {
425 return 1
426 }
427 return 0
428 }
429
430 func (cc http1ClientConn) Reserve() error {
431 cc.pconn.mu.Lock()
432 defer cc.pconn.mu.Unlock()
433 if cc.pconn.closed != nil {
434 return cc.pconn.closed
435 }
436 select {
437 case <-cc.pconn.availch:
438 default:
439 return errors.New("connection is unavailable")
440 }
441 cc.pconn.reserved = true
442 return nil
443 }
444
445 func (cc http1ClientConn) Release() {
446 cc.pconn.mu.Lock()
447 defer cc.pconn.mu.Unlock()
448 if cc.pconn.reserved {
449 select {
450 case cc.pconn.availch <- struct{}{}:
451 default:
452 panic("cannot release reservation")
453 }
454 cc.pconn.reserved = false
455 }
456 }
457
View as plain text