diff --git a/pkg/cri/streaming/portforward/httpstream.go b/pkg/cri/streaming/portforward/httpstream.go index 685cd8b5d..e1570380b 100644 --- a/pkg/cri/streaming/portforward/httpstream.go +++ b/pkg/cri/streaming/portforward/httpstream.go @@ -38,6 +38,7 @@ import ( "net/http" "strconv" "sync" + "syscall" "time" api "k8s.io/api/core/v1" @@ -242,8 +243,16 @@ Loop: // portForward invokes the httpStreamHandler's forwarder.PortForward // function for the given stream pair. func (h *httpStreamHandler) portForward(p *httpStreamPair) { - defer p.dataStream.Close() - defer p.errorStream.Close() + resetStreams := false + defer func() { + if resetStreams { + p.dataStream.Reset() + p.errorStream.Reset() + return + } + p.dataStream.Close() + p.errorStream.Close() + }() portString := p.dataStream.Headers().Get(api.PortHeader) port, _ := strconv.ParseInt(portString, 10, 32) @@ -252,11 +261,34 @@ func (h *httpStreamHandler) portForward(p *httpStreamPair) { err := h.forwarder.PortForward(h.pod, h.uid, int32(port), p.dataStream) klog.V(5).Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString) - if err != nil { - msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err) - utilruntime.HandleError(msg) - fmt.Fprint(p.errorStream, msg.Error()) + // happy path, we have successfully completed forwarding task + if err == nil { + return } + + if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) { + // In the process of forwarding, we encountered error types that can be handled: + // + // These two errors can occur in the following scenarios: + // ECONNRESET: the target process reset connection between CRI and itself. + // see: https://github.com/kubernetes/kubernetes/issues/111825 for detail + // + // EPIPE: the target process did not read the received data, causing the + // buffer in the kernel to be full, resulting in the occurrence of Zero Window, + // then closing the connection (FIN, RESET) + // see: https://github.com/kubernetes/kubernetes/issues/74551 for detail + // + // In both cases, we should RESET the httpStream. + klog.ErrorS(err, "forwarding port", "conn", h.conn, "request", p.requestID, "port", portString) + resetStreams = true + return + } + + // We don't know how to deal with other types of errors, + // try to forward them to errStream, let our user know what happened + msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err) + utilruntime.HandleError(msg) + fmt.Fprint(p.errorStream, msg.Error()) } // httpStreamPair represents the error and data streams for a port