256
257
sc := &serverConn{
258
hs: opts.baseConfig(),
261
remoteAddrStr: c.RemoteAddr().String(),
262
bw: newBufferedWriter(c),
263
handler: opts.handler(),
264
streams: make(map[uint32]*stream),
265
readFrameCh: make(chan readFrameResult),
266
wantWriteFrameCh: make(chan frameWriteMsg, 8),
267
wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
268
bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
269
doneServing: make(chan struct{}),
270
advMaxStreams: s.maxConcurrentStreams(),
271
writeSched: writeScheduler{
272
maxFrameSize: initialMaxFrameSize,
259
hs: opts.baseConfig(),
262
remoteAddrStr: c.RemoteAddr().String(),
263
bw: newBufferedWriter(c),
264
handler: opts.handler(),
265
streams: make(map[uint32]*stream),
266
readFrameCh: make(chan readFrameResult),
267
wantWriteFrameCh: make(chan FrameWriteRequest, 8),
268
wantStartPushCh: make(chan startPushRequest, 8),
269
wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
270
bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
271
doneServing: make(chan struct{}),
272
clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
273
advMaxStreams: s.maxConcurrentStreams(),
274
274
initialWindowSize: initialWindowSize,
275
maxFrameSize: initialMaxFrameSize,
275
276
headerTableSize: initialHeaderTableSize,
276
277
serveG: newGoroutineLock(),
277
278
pushEnabled: true,
281
// The net/http package sets the write deadline from the
282
// http.Server.WriteTimeout during the TLS handshake, but then
283
// passes the connection off to us with the deadline already
284
// set. Disarm it here so that it is not applied to additional
285
// streams opened on this connection.
286
// TODO: implement WriteTimeout fully. See Issue 18437.
287
if sc.hs.WriteTimeout != 0 {
288
sc.conn.SetWriteDeadline(time.Time{})
291
if s.NewWriteScheduler != nil {
292
sc.writeSched = s.NewWriteScheduler()
294
sc.writeSched = NewRandomWriteScheduler()
280
297
sc.flow.add(initialWindowSize)
281
298
sc.inflow.add(initialWindowSize)
282
299
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
356
373
handler http.Handler
357
374
baseCtx contextContext
359
doneServing chan struct{} // closed when serverConn.serve ends
360
readFrameCh chan readFrameResult // written by serverConn.readFrames
361
wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
362
wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
363
bodyReadCh chan bodyReadMsg // from handlers -> serve
364
testHookCh chan func(int) // code to run on the serve loop
365
flow flow // conn-wide (not stream-specific) outbound flow control
366
inflow flow // conn-wide inbound flow control
367
tlsState *tls.ConnectionState // shared by all handlers, like net/http
376
doneServing chan struct{} // closed when serverConn.serve ends
377
readFrameCh chan readFrameResult // written by serverConn.readFrames
378
wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
379
wantStartPushCh chan startPushRequest // from handlers -> serve
380
wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
381
bodyReadCh chan bodyReadMsg // from handlers -> serve
382
testHookCh chan func(int) // code to run on the serve loop
383
flow flow // conn-wide (not stream-specific) outbound flow control
384
inflow flow // conn-wide inbound flow control
385
tlsState *tls.ConnectionState // shared by all handlers, like net/http
368
386
remoteAddrStr string
387
writeSched WriteScheduler
370
389
// Everything following is owned by the serve loop; use serveG.check():
371
390
serveG goroutineLock // used to verify funcs are on serve()
375
394
unackedSettings int // how many SETTINGS have we sent without ACKs?
376
395
clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
377
396
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
378
curOpenStreams uint32 // client's number of open streams
379
maxStreamID uint32 // max ever seen
397
curClientStreams uint32 // number of open streams initiated by the client
398
curPushedStreams uint32 // number of open streams initiated by server push
399
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
400
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
380
401
streams map[uint32]*stream
381
402
initialWindowSize int32
382
404
headerTableSize uint32
383
405
peerMaxHeaderListSize uint32 // zero means unknown (default)
384
406
canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
385
writingFrame bool // started write goroutine but haven't heard back on wroteFrameCh
407
writingFrame bool // started writing a frame (on serve goroutine or separate)
408
writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
386
409
needsFrameFlush bool // last frame write wasn't a flush
387
writeSched writeScheduler
388
inGoAway bool // we've started to or sent GOAWAY
389
needToSendGoAway bool // we need to schedule a GOAWAY frame write
410
inGoAway bool // we've started to or sent GOAWAY
411
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
412
needToSendGoAway bool // we need to schedule a GOAWAY frame write
390
413
goAwayCode ErrCode
391
414
shutdownTimerCh <-chan time.Time // nil until used
392
415
shutdownTimer *time.Timer // nil until used
393
freeRequestBodyBuf []byte // if non-nil, a free initialWindowSize buffer for getRequestBodyBuf
416
idleTimer *time.Timer // nil if unused
417
idleTimerCh <-chan time.Time // nil if unused
395
419
// Owned by the writeFrameAsync goroutine:
396
420
headerWriteBuf bytes.Buffer
823
880
// make it onto the wire
825
882
// If you're not on the serve goroutine, use writeFrameFromHandler instead.
826
func (sc *serverConn) writeFrame(wm frameWriteMsg) {
883
func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
827
884
sc.serveG.check()
886
// If true, wr will not be written and wr.done will not be signaled.
829
887
var ignoreWrite bool
889
// We are not allowed to write frames on closed streams. RFC 7540 Section
890
// 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
891
// a closed stream." Our server never sends PRIORITY, so that exception
894
// The serverConn might close an open stream while the stream's handler
895
// is still running. For example, the server might close a stream when it
896
// receives bad data from the client. If this happens, the handler might
897
// attempt to write a frame after the stream has been closed (since the
898
// handler hasn't yet been notified of the close). In this case, we simply
899
// ignore the frame. The handler will notice that the stream is closed when
900
// it waits for the frame to be written.
902
// As an exception to this rule, we allow sending RST_STREAM after close.
903
// This allows us to immediately reject new streams without tracking any
904
// state for those streams (except for the queued RST_STREAM frame). This
905
// may result in duplicate RST_STREAMs in some cases, but the client should
907
if wr.StreamID() != 0 {
908
_, isReset := wr.write.(StreamError)
909
if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset {
831
914
// Don't send a 100-continue response if we've already sent headers.
832
915
// See golang.org/issue/14030.
833
switch wm.write.(type) {
916
switch wr.write.(type) {
834
917
case *writeResHeaders:
835
wm.stream.wroteHeaders = true
918
wr.stream.wroteHeaders = true
836
919
case write100ContinueHeadersFrame:
837
if wm.stream.wroteHeaders {
920
if wr.stream.wroteHeaders {
921
// We do not need to notify wr.done because this frame is
922
// never written with wr.done != nil.
924
panic("wr.done != nil for write100ContinueHeadersFrame")
838
926
ignoreWrite = true
842
930
if !ignoreWrite {
843
sc.writeSched.add(wm)
931
sc.writeSched.Push(wr)
845
933
sc.scheduleFrameWrite()
848
// startFrameWrite starts a goroutine to write wm (in a separate
936
// startFrameWrite starts a goroutine to write wr (in a separate
849
937
// goroutine since that might block on the network), and updates the
850
// serve goroutine's state about the world, updated from info in wm.
851
func (sc *serverConn) startFrameWrite(wm frameWriteMsg) {
938
// serve goroutine's state about the world, updated from info in wr.
939
func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
852
940
sc.serveG.check()
853
941
if sc.writingFrame {
854
942
panic("internal error: can only be writing one frame at a time")
859
947
switch st.state {
860
948
case stateHalfClosedLocal:
861
panic("internal error: attempt to send frame on half-closed-local stream")
949
switch wr.write.(type) {
950
case StreamError, handlerPanicRST, writeWindowUpdate:
951
// RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
952
// in this state. (We never send PRIORITY from the server, so that is not checked.)
954
panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
862
956
case stateClosed:
863
if st.sentReset || st.gotReset {
865
sc.scheduleFrameWrite()
868
panic(fmt.Sprintf("internal error: attempt to send a write %v on a closed stream", wm))
957
panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr))
960
if wpp, ok := wr.write.(*writePushPromise); ok {
962
wpp.promisedID, err = wpp.allocatePromisedID()
964
sc.writingFrameAsync = false
965
wr.replyToWriter(err)
872
970
sc.writingFrame = true
873
971
sc.needsFrameFlush = true
874
go sc.writeFrameAsync(wm)
972
if wr.write.staysWithinBuffer(sc.bw.Available()) {
973
sc.writingFrameAsync = false
974
err := wr.write.writeFrame(sc)
975
sc.wroteFrame(frameWriteResult{wr, err})
977
sc.writingFrameAsync = true
978
go sc.writeFrameAsync(wr)
877
982
// errHandlerPanicked is the error given to any callers blocked in a read from
916
1006
// Here we would go to stateHalfClosedLocal in
917
1007
// theory, but since our handler is done and
918
1008
// the net/http package provides no mechanism
919
// for finishing writing to a ResponseWriter
920
// while still reading data (see possible TODO
921
// at top of this file), we go into closed
922
// state here anyway, after telling the peer
923
// we're hanging up on them.
924
st.state = stateHalfClosedLocal // won't last long, but necessary for closeStream via resetStream
925
errCancel := streamError(st.id, ErrCodeCancel)
926
sc.resetStream(errCancel)
1009
// for closing a ResponseWriter while still
1010
// reading data (see possible TODO at top of
1011
// this file), we go into closed state here
1012
// anyway, after telling the peer we're
1013
// hanging up on them. We'll transition to
1014
// stateClosed after the RST_STREAM frame is
1016
st.state = stateHalfClosedLocal
1017
sc.resetStream(streamError(st.id, ErrCodeCancel))
927
1018
case stateHalfClosedRemote:
928
1019
sc.closeStream(st, errHandlerComplete)
1022
switch v := wr.write.(type) {
1024
// st may be unknown if the RST_STREAM was generated to reject bad input.
1025
if st, ok := sc.streams[v.StreamID]; ok {
1026
sc.closeStream(st, v)
1028
case handlerPanicRST:
1029
sc.closeStream(wr.stream, errHandlerPanicked)
1033
// Reply (if requested) to unblock the ServeHTTP goroutine.
1034
wr.replyToWriter(res.err)
932
1036
sc.scheduleFrameWrite()
946
1050
// flush the write buffer.
947
1051
func (sc *serverConn) scheduleFrameWrite() {
948
1052
sc.serveG.check()
952
if sc.needToSendGoAway {
953
sc.needToSendGoAway = false
954
sc.startFrameWrite(frameWriteMsg{
956
maxStreamID: sc.maxStreamID,
962
if sc.needToSendSettingsAck {
963
sc.needToSendSettingsAck = false
964
sc.startFrameWrite(frameWriteMsg{write: writeSettingsAck{}})
968
if wm, ok := sc.writeSched.take(); ok {
969
sc.startFrameWrite(wm)
973
if sc.needsFrameFlush {
974
sc.startFrameWrite(frameWriteMsg{write: flushFrameWriter{}})
975
sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
1053
if sc.writingFrame || sc.inFrameScheduleLoop {
1056
sc.inFrameScheduleLoop = true
1057
for !sc.writingFrameAsync {
1058
if sc.needToSendGoAway {
1059
sc.needToSendGoAway = false
1060
sc.startFrameWrite(FrameWriteRequest{
1061
write: &writeGoAway{
1062
maxStreamID: sc.maxClientStreamID,
1063
code: sc.goAwayCode,
1068
if sc.needToSendSettingsAck {
1069
sc.needToSendSettingsAck = false
1070
sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
1073
if !sc.inGoAway || sc.goAwayCode == ErrCodeNo {
1074
if wr, ok := sc.writeSched.Pop(); ok {
1075
sc.startFrameWrite(wr)
1079
if sc.needsFrameFlush {
1080
sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
1081
sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
1086
sc.inFrameScheduleLoop = false
1089
// startGracefulShutdown sends a GOAWAY with ErrCodeNo to tell the
1090
// client we're gracefully shutting down. The connection isn't closed
1091
// until all current streams are done.
1092
func (sc *serverConn) startGracefulShutdown() {
1093
sc.goAwayIn(ErrCodeNo, 0)
980
1096
func (sc *serverConn) goAway(code ErrCode) {
981
1097
sc.serveG.check()
1098
var forceCloseIn time.Duration
985
1099
if code != ErrCodeNo {
986
sc.shutDownIn(250 * time.Millisecond)
1100
forceCloseIn = 250 * time.Millisecond
988
1102
// TODO: configurable
989
sc.shutDownIn(1 * time.Second)
1103
forceCloseIn = 1 * time.Second
1105
sc.goAwayIn(code, forceCloseIn)
1108
func (sc *serverConn) goAwayIn(code ErrCode, forceCloseIn time.Duration) {
1113
if forceCloseIn != 0 {
1114
sc.shutDownIn(forceCloseIn)
991
1116
sc.inGoAway = true
992
1117
sc.needToSendGoAway = true
1282
1415
func (sc *serverConn) processData(f *DataFrame) error {
1283
1416
sc.serveG.check()
1417
if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
1284
1420
data := f.Data()
1286
1422
// "If a DATA frame is received whose stream is not in "open"
1287
1423
// or "half closed (local)" state, the recipient MUST respond
1288
1424
// with a stream error (Section 5.4.2) of type STREAM_CLOSED."
1289
1425
id := f.Header().StreamID
1290
st, ok := sc.streams[id]
1291
if !ok || st.state != stateOpen || st.gotTrailerHeader {
1426
state, st := sc.state(id)
1427
if id == 0 || state == stateIdle {
1428
// Section 5.1: "Receiving any frame other than HEADERS
1429
// or PRIORITY on a stream in this state MUST be
1430
// treated as a connection error (Section 5.4.1) of
1431
// type PROTOCOL_ERROR."
1432
return ConnectionError(ErrCodeProtocol)
1434
if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
1292
1435
// This includes sending a RST_STREAM if the stream is
1293
1436
// in stateHalfClosedLocal (which currently means that
1294
1437
// the http.Handler returned, so it's done reading &
1406
1576
// endpoint has opened or reserved. [...] An endpoint that
1407
1577
// receives an unexpected stream identifier MUST respond with
1408
1578
// a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1409
if id <= sc.maxStreamID {
1579
if id <= sc.maxClientStreamID {
1410
1580
return ConnectionError(ErrCodeProtocol)
1414
ctx, cancelCtx := contextWithCancel(sc.baseCtx)
1420
cancelCtx: cancelCtx,
1422
if f.StreamEnded() {
1423
st.state = stateHalfClosedRemote
1427
st.flow.conn = &sc.flow // link to conn-level counter
1428
st.flow.add(sc.initialWindowSize)
1429
st.inflow.conn = &sc.inflow // link to conn-level counter
1430
st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
1433
if f.HasPriority() {
1434
adjustStreamPriority(sc.streams, st.id, f.Priority)
1437
if sc.curOpenStreams == 1 {
1438
sc.setConnState(http.StateActive)
1440
if sc.curOpenStreams > sc.advMaxStreams {
1441
// "Endpoints MUST NOT exceed the limit set by their
1442
// peer. An endpoint that receives a HEADERS frame
1443
// that causes their advertised concurrent stream
1444
// limit to be exceeded MUST treat this as a stream
1445
// error (Section 5.4.2) of type PROTOCOL_ERROR or
1582
sc.maxClientStreamID = id
1584
if sc.idleTimer != nil {
1588
// http://tools.ietf.org/html/rfc7540#section-5.1.2
1589
// [...] Endpoints MUST NOT exceed the limit set by their peer. An
1590
// endpoint that receives a HEADERS frame that causes their
1591
// advertised concurrent stream limit to be exceeded MUST treat
1592
// this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
1593
// or REFUSED_STREAM.
1594
if sc.curClientStreams+1 > sc.advMaxStreams {
1447
1595
if sc.unackedSettings == 0 {
1448
1596
// They should know better.
1449
return streamError(st.id, ErrCodeProtocol)
1597
return streamError(id, ErrCodeProtocol)
1451
1599
// Assume it's a network race, where they just haven't
1452
1600
// received our last SETTINGS update. But actually
1453
1601
// this can't happen yet, because we don't yet provide
1454
1602
// a way for users to adjust server parameters at
1456
return streamError(st.id, ErrCodeRefusedStream)
1604
return streamError(id, ErrCodeRefusedStream)
1607
initialState := stateOpen
1608
if f.StreamEnded() {
1609
initialState = stateHalfClosedRemote
1611
st := sc.newStream(id, 0, initialState)
1613
if f.HasPriority() {
1614
if err := checkPriority(f.StreamID, f.Priority); err != nil {
1617
sc.writeSched.AdjustStream(st.id, f.Priority)
1459
1620
rw, req, err := sc.newWriterAndRequest(st, f)
1471
1632
if f.Truncated {
1472
1633
// Their header list was too long. Send a 431 error.
1473
1634
handler = handleHeaderListTooLong
1474
} else if err := checkValidHTTP2Request(req); err != nil {
1635
} else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil {
1475
1636
handler = new400Handler(err)
1478
1639
// The net/http package sets the read deadline from the
1479
1640
// http.Server.ReadTimeout during the TLS handshake, but then
1480
1641
// passes the connection off to us with the deadline already
1481
// set. Disarm it here after the request headers are read, similar
1482
// to how the http1 server works.
1483
// Unlike http1, though, we never re-arm it yet, though.
1484
// TODO(bradfitz): figure out golang.org/issue/14204
1485
// (IdleTimeout) and how this relates. Maybe the default
1486
// IdleTimeout is ReadTimeout.
1642
// set. Disarm it here after the request headers are read,
1643
// similar to how the http1 server works. Here it's
1644
// technically more like the http1 Server's ReadHeaderTimeout
1645
// (in Go 1.8), though. That's a more sane option anyway.
1487
1646
if sc.hs.ReadTimeout != 0 {
1488
1647
sc.conn.SetReadDeadline(time.Time{})
1684
func checkPriority(streamID uint32, p PriorityParam) error {
1685
if streamID == p.StreamDep {
1686
// Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
1687
// this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
1688
// Section 5.3.3 says that a stream can depend on one of its dependencies,
1689
// so it's only self-dependencies that are forbidden.
1690
return streamError(streamID, ErrCodeProtocol)
1525
1695
func (sc *serverConn) processPriority(f *PriorityFrame) error {
1526
adjustStreamPriority(sc.streams, f.StreamID, f.PriorityParam)
1699
if err := checkPriority(f.StreamID, f.PriorityParam); err != nil {
1702
sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam)
1530
func adjustStreamPriority(streams map[uint32]*stream, streamID uint32, priority PriorityParam) {
1531
st, ok := streams[streamID]
1533
// TODO: not quite correct (this streamID might
1534
// already exist in the dep tree, but be closed), but
1535
// close enough for now.
1538
st.weight = priority.Weight
1539
parent := streams[priority.StreamDep] // might be nil
1541
// if client tries to set this stream to be the parent of itself
1542
// ignore and keep going
1546
// section 5.3.3: If a stream is made dependent on one of its
1547
// own dependencies, the formerly dependent stream is first
1548
// moved to be dependent on the reprioritized stream's previous
1549
// parent. The moved dependency retains its weight.
1550
for piter := parent; piter != nil; piter = piter.parent {
1552
parent.parent = st.parent
1557
if priority.Exclusive && (st.parent != nil || priority.StreamDep == 0) {
1558
for _, openStream := range streams {
1559
if openStream != st && openStream.parent == st.parent {
1560
openStream.parent = st
1706
func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream {
1709
panic("internal error: cannot create stream with id 0")
1712
ctx, cancelCtx := contextWithCancel(sc.baseCtx)
1718
cancelCtx: cancelCtx,
1721
st.flow.conn = &sc.flow // link to conn-level counter
1722
st.flow.add(sc.initialWindowSize)
1723
st.inflow.conn = &sc.inflow // link to conn-level counter
1724
st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
1727
sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
1729
sc.curPushedStreams++
1731
sc.curClientStreams++
1733
if sc.curOpenStreams() == 1 {
1734
sc.setConnState(http.StateActive)
1566
1740
func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
1567
1741
sc.serveG.check()
1569
method := f.PseudoValue("method")
1570
path := f.PseudoValue("path")
1571
scheme := f.PseudoValue("scheme")
1572
authority := f.PseudoValue("authority")
1744
method: f.PseudoValue("method"),
1745
scheme: f.PseudoValue("scheme"),
1746
authority: f.PseudoValue("authority"),
1747
path: f.PseudoValue("path"),
1574
isConnect := method == "CONNECT"
1750
isConnect := rp.method == "CONNECT"
1576
if path != "" || scheme != "" || authority == "" {
1752
if rp.path != "" || rp.scheme != "" || rp.authority == "" {
1577
1753
return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
1579
} else if method == "" || path == "" ||
1580
(scheme != "https" && scheme != "http") {
1755
} else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
1581
1756
// See 8.1.2.6 Malformed Requests and Responses:
1583
1758
// Malformed requests or responses that are detected
1594
1769
bodyOpen := !f.StreamEnded()
1595
if method == "HEAD" && bodyOpen {
1770
if rp.method == "HEAD" && bodyOpen {
1596
1771
// HEAD requests can't have bodies
1597
1772
return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
1775
rp.header = make(http.Header)
1776
for _, hf := range f.RegularFields() {
1777
rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value)
1779
if rp.authority == "" {
1780
rp.authority = rp.header.Get("Host")
1783
rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
1785
return nil, nil, err
1788
st.reqBuf = getRequestBodyBuf()
1789
req.Body.(*requestBody).pipe = &pipe{
1790
b: &fixedBuffer{buf: st.reqBuf},
1793
if vv, ok := rp.header["Content-Length"]; ok {
1794
req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
1796
req.ContentLength = -1
1802
type requestParam struct {
1804
scheme, authority, path string
1808
func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) {
1599
1811
var tlsState *tls.ConnectionState // nil if not scheme https
1601
if scheme == "https" {
1812
if rp.scheme == "https" {
1602
1813
tlsState = sc.tlsState
1605
header := make(http.Header)
1606
for _, hf := range f.RegularFields() {
1607
header.Add(sc.canonicalHeader(hf.Name), hf.Value)
1610
if authority == "" {
1611
authority = header.Get("Host")
1613
needsContinue := header.Get("Expect") == "100-continue"
1816
needsContinue := rp.header.Get("Expect") == "100-continue"
1614
1817
if needsContinue {
1615
header.Del("Expect")
1818
rp.header.Del("Expect")
1617
1820
// Merge Cookie headers into one "; "-delimited value.
1618
if cookies := header["Cookie"]; len(cookies) > 1 {
1619
header.Set("Cookie", strings.Join(cookies, "; "))
1821
if cookies := rp.header["Cookie"]; len(cookies) > 1 {
1822
rp.header.Set("Cookie", strings.Join(cookies, "; "))
1622
1825
// Setup Trailers
1623
1826
var trailer http.Header
1624
for _, v := range header["Trailer"] {
1827
for _, v := range rp.header["Trailer"] {
1625
1828
for _, key := range strings.Split(v, ",") {
1626
1829
key = http.CanonicalHeaderKey(strings.TrimSpace(key))
1639
delete(header, "Trailer")
1842
delete(rp.header, "Trailer")
1845
var requestURI string
1846
if rp.method == "CONNECT" {
1847
url_ = &url.URL{Host: rp.authority}
1848
requestURI = rp.authority // mimic HTTP/1 server behavior
1851
url_, err = url.ParseRequestURI(rp.path)
1853
return nil, nil, streamError(st.id, ErrCodeProtocol)
1855
requestURI = rp.path
1641
1858
body := &requestBody{
1644
1861
needsContinue: needsContinue,
1647
var requestURI string
1649
url_ = &url.URL{Host: authority}
1650
requestURI = authority // mimic HTTP/1 server behavior
1653
url_, err = url.ParseRequestURI(path)
1655
return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
1659
1863
req := &http.Request{
1662
1866
RemoteAddr: sc.remoteAddrStr,
1664
1868
RequestURI: requestURI,
1665
1869
Proto: "HTTP/2.0",
1671
1875
Trailer: trailer,
1673
1877
req = requestWithContext(req, st.ctx)
1675
// Disabled, per golang.org/issue/14960:
1676
// st.reqBuf = sc.getRequestBodyBuf()
1677
// TODO: remove this 64k of garbage per request (again, but without a data race):
1678
buf := make([]byte, initialWindowSize)
1681
b: &fixedBuffer{buf: buf},
1684
if vv, ok := header["Content-Length"]; ok {
1685
req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
1687
req.ContentLength = -1
1691
1879
rws := responseWriterStatePool.Get().(*responseWriterState)
1692
1880
bwSave := rws.bw
2220
2435
responseWriterStatePool.Put(rws)
2440
ErrRecursivePush = errors.New("http2: recursive push not allowed")
2441
ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
2444
// pushOptions is the internal version of http.PushOptions, which we
2445
// cannot include here because it's only defined in Go 1.8 and later.
2446
type pushOptions struct {
2451
func (w *responseWriter) push(target string, opts pushOptions) error {
2454
sc.serveG.checkNotOn()
2456
// No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
2457
// http://tools.ietf.org/html/rfc7540#section-6.6
2459
return ErrRecursivePush
2463
if opts.Method == "" {
2466
if opts.Header == nil {
2467
opts.Header = http.Header{}
2469
wantScheme := "http"
2470
if w.rws.req.TLS != nil {
2471
wantScheme = "https"
2474
// Validate the request.
2475
u, err := url.Parse(target)
2480
if !strings.HasPrefix(target, "/") {
2481
return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target)
2483
u.Scheme = wantScheme
2484
u.Host = w.rws.req.Host
2486
if u.Scheme != wantScheme {
2487
return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme)
2490
return errors.New("URL must have a host")
2493
for k := range opts.Header {
2494
if strings.HasPrefix(k, ":") {
2495
return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
2497
// These headers are meaningful only if the request has a body,
2498
// but PUSH_PROMISE requests cannot have a body.
2499
// http://tools.ietf.org/html/rfc7540#section-8.2
2500
// Also disallow Host, since the promised URL must be absolute.
2501
switch strings.ToLower(k) {
2502
case "content-length", "content-encoding", "trailer", "te", "expect", "host":
2503
return fmt.Errorf("promised request headers cannot include %q", k)
2506
if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil {
2510
// The RFC effectively limits promised requests to GET and HEAD:
2511
// "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
2512
// http://tools.ietf.org/html/rfc7540#section-8.2
2513
if opts.Method != "GET" && opts.Method != "HEAD" {
2514
return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
2517
msg := startPushRequest{
2519
method: opts.Method,
2521
header: cloneHeader(opts.Header),
2522
done: errChanPool.Get().(chan error),
2526
case <-sc.doneServing:
2527
return errClientDisconnected
2529
return errStreamClosed
2530
case sc.wantStartPushCh <- msg:
2534
case <-sc.doneServing:
2535
return errClientDisconnected
2537
return errStreamClosed
2538
case err := <-msg.done:
2539
errChanPool.Put(msg.done)
2544
type startPushRequest struct {
2552
func (sc *serverConn) startPush(msg startPushRequest) {
2555
// http://tools.ietf.org/html/rfc7540#section-6.6.
2556
// PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
2557
// is in either the "open" or "half-closed (remote)" state.
2558
if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote {
2559
// responseWriter.Push checks that the stream is peer-initiaed.
2560
msg.done <- errStreamClosed
2564
// http://tools.ietf.org/html/rfc7540#section-6.6.
2565
if !sc.pushEnabled {
2566
msg.done <- http.ErrNotSupported
2570
// PUSH_PROMISE frames must be sent in increasing order by stream ID, so
2571
// we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
2572
// is written. Once the ID is allocated, we start the request handler.
2573
allocatePromisedID := func() (uint32, error) {
2576
// Check this again, just in case. Technically, we might have received
2577
// an updated SETTINGS by the time we got around to writing this frame.
2578
if !sc.pushEnabled {
2579
return 0, http.ErrNotSupported
2581
// http://tools.ietf.org/html/rfc7540#section-6.5.2.
2582
if sc.curPushedStreams+1 > sc.clientMaxStreams {
2583
return 0, ErrPushLimitReached
2586
// http://tools.ietf.org/html/rfc7540#section-5.1.1.
2587
// Streams initiated by the server MUST use even-numbered identifiers.
2588
// A server that is unable to establish a new stream identifier can send a GOAWAY
2589
// frame so that the client is forced to open a new connection for new streams.
2590
if sc.maxPushPromiseID+2 >= 1<<31 {
2591
sc.startGracefulShutdown()
2592
return 0, ErrPushLimitReached
2594
sc.maxPushPromiseID += 2
2595
promisedID := sc.maxPushPromiseID
2597
// http://tools.ietf.org/html/rfc7540#section-8.2.
2598
// Strictly speaking, the new stream should start in "reserved (local)", then
2599
// transition to "half closed (remote)" after sending the initial HEADERS, but
2600
// we start in "half closed (remote)" for simplicity.
2601
// See further comments at the definition of stateHalfClosedRemote.
2602
promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote)
2603
rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{
2605
scheme: msg.url.Scheme,
2606
authority: msg.url.Host,
2607
path: msg.url.RequestURI(),
2608
header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
2611
// Should not happen, since we've already validated msg.url.
2612
panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
2615
go sc.runHandler(rw, req, sc.handler.ServeHTTP)
2616
return promisedID, nil
2619
sc.writeFrame(FrameWriteRequest{
2620
write: &writePushPromise{
2621
streamID: msg.parent.id,
2625
allocatePromisedID: allocatePromisedID,
2223
2632
// foreachHeaderElement splits v according to the "#rule" construction
2224
2633
// in RFC 2616 section 2.1 and calls fn for each non-empty element.
2225
2634
func foreachHeaderElement(v string, fn func(string)) {
2303
2712
"Transfer-Encoding": true,
2304
2713
"Www-Authenticate": true,
2716
// h1ServerShutdownChan returns a channel that will be closed when the
2717
// provided *http.Server wants to shut down.
2719
// This is a somewhat hacky way to get at http1 innards. It works
2720
// when the http2 code is bundled into the net/http package in the
2721
// standard library. The alternatives ended up making the cmd/go tool
2722
// depend on http Servers. This is the lightest option for now.
2723
// This is tested via the TestServeShutdown* tests in net/http.
2724
func h1ServerShutdownChan(hs *http.Server) <-chan struct{} {
2725
if fn := testh1ServerShutdownChan; fn != nil {
2728
var x interface{} = hs
2730
getDoneChan() <-chan struct{}
2732
if hs, ok := x.(I); ok {
2733
return hs.getDoneChan()
2738
// optional test hook for h1ServerShutdownChan.
2739
var testh1ServerShutdownChan func(hs *http.Server) <-chan struct{}
2741
// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
2742
// disabled. See comments on h1ServerShutdownChan above for why
2743
// the code is written this way.
2744
func h1ServerKeepAlivesDisabled(hs *http.Server) bool {
2745
var x interface{} = hs
2749
if hs, ok := x.(I); ok {
2750
return !hs.doKeepAlives()