Merge pull request #8418 from sxllwx/fix/k8s-issue-74551
bugfix(port-forward): Correctly handle known errors
This commit is contained in:
commit
0789790f07
@ -38,6 +38,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
@ -242,8 +243,16 @@ Loop:
|
|||||||
// portForward invokes the httpStreamHandler's forwarder.PortForward
|
// portForward invokes the httpStreamHandler's forwarder.PortForward
|
||||||
// function for the given stream pair.
|
// function for the given stream pair.
|
||||||
func (h *httpStreamHandler) portForward(p *httpStreamPair) {
|
func (h *httpStreamHandler) portForward(p *httpStreamPair) {
|
||||||
defer p.dataStream.Close()
|
resetStreams := false
|
||||||
defer p.errorStream.Close()
|
defer func() {
|
||||||
|
if resetStreams {
|
||||||
|
p.dataStream.Reset()
|
||||||
|
p.errorStream.Reset()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.dataStream.Close()
|
||||||
|
p.errorStream.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
portString := p.dataStream.Headers().Get(api.PortHeader)
|
portString := p.dataStream.Headers().Get(api.PortHeader)
|
||||||
port, _ := strconv.ParseInt(portString, 10, 32)
|
port, _ := strconv.ParseInt(portString, 10, 32)
|
||||||
@ -252,11 +261,34 @@ func (h *httpStreamHandler) portForward(p *httpStreamPair) {
|
|||||||
err := h.forwarder.PortForward(h.pod, h.uid, int32(port), p.dataStream)
|
err := h.forwarder.PortForward(h.pod, h.uid, int32(port), p.dataStream)
|
||||||
klog.V(5).Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
|
klog.V(5).Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
|
||||||
|
|
||||||
if err != nil {
|
// happy path, we have successfully completed forwarding task
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
|
||||||
|
// In the process of forwarding, we encountered error types that can be handled:
|
||||||
|
//
|
||||||
|
// These two errors can occur in the following scenarios:
|
||||||
|
// ECONNRESET: the target process reset connection between CRI and itself.
|
||||||
|
// see: https://github.com/kubernetes/kubernetes/issues/111825 for detail
|
||||||
|
//
|
||||||
|
// EPIPE: the target process did not read the received data, causing the
|
||||||
|
// buffer in the kernel to be full, resulting in the occurrence of Zero Window,
|
||||||
|
// then closing the connection (FIN, RESET)
|
||||||
|
// see: https://github.com/kubernetes/kubernetes/issues/74551 for detail
|
||||||
|
//
|
||||||
|
// In both cases, we should RESET the httpStream.
|
||||||
|
klog.ErrorS(err, "forwarding port", "conn", h.conn, "request", p.requestID, "port", portString)
|
||||||
|
resetStreams = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't know how to deal with other types of errors,
|
||||||
|
// try to forward them to errStream, let our user know what happened
|
||||||
msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err)
|
msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err)
|
||||||
utilruntime.HandleError(msg)
|
utilruntime.HandleError(msg)
|
||||||
fmt.Fprint(p.errorStream, msg.Error())
|
fmt.Fprint(p.errorStream, msg.Error())
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// httpStreamPair represents the error and data streams for a port
|
// httpStreamPair represents the error and data streams for a port
|
||||||
|
Loading…
Reference in New Issue
Block a user