From 122439ffed46c033571440074c558bbe7db7e238 Mon Sep 17 00:00:00 2001 From: maoziqiang Date: Wed, 23 Jul 2025 18:41:30 +0800 Subject: [PATCH] mzq-test --- cmd/ctr/commands/shim/shim.go | 1 + core/runtime/v2/binary.go | 1 + core/runtime/v2/shim.go | 13 +++++++++++++ core/runtime/v2/shim_manager.go | 17 +++++++++++++++-- pkg/shim/publisher.go | 3 ++- pkg/shim/shim.go | 14 ++++++++++++++ vendor/github.com/containerd/ttrpc/client.go | 13 ++++++++++++- .../pkg/util/httpstream/wsstream/conn.go | 7 ++----- .../pkg/cri/streaming/portforward/websocket.go | 3 ++- .../pkg/cri/streaming/remotecommand/attach.go | 1 + .../pkg/cri/streaming/remotecommand/exec.go | 1 + .../cri/streaming/remotecommand/websocket.go | 7 +++++-- .../k8s.io/kubelet/pkg/cri/streaming/server.go | 2 +- 13 files changed, 70 insertions(+), 13 deletions(-) diff --git a/cmd/ctr/commands/shim/shim.go b/cmd/ctr/commands/shim/shim.go index ab869a110..ca3e2cbe6 100644 --- a/cmd/ctr/commands/shim/shim.go +++ b/cmd/ctr/commands/shim/shim.go @@ -302,6 +302,7 @@ func getTTRPCClient(cliContext *cli.Context) (*ttrpc.Client, error) { s2, _ := shim.SocketAddress(ctx, cliContext.String("address"), id, false) s2 = strings.TrimPrefix(s2, "unix://") + log.L.Infof("mzq-test, shim.go getTaskService, id: %v, ns: %v, s1: %v, s2: %v\n", id, ns, s1, s2) for _, socket := range []string{s2, "\x00" + s1} { conn, err := net.Dial("unix", socket) if err == nil { diff --git a/core/runtime/v2/binary.go b/core/runtime/v2/binary.go index c77fb494b..28499dc55 100644 --- a/core/runtime/v2/binary.go +++ b/core/runtime/v2/binary.go @@ -138,6 +138,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ if err != nil { return nil, err } + log.G(ctx).Infof("mzq-test, binary.go Start, ns: %v, cmd: %+v, response: %v, opts: %+v, conn: %+v, params: %v\n", ns, cmd, string(response), opts, conn, params) // Save bootstrap configuration (so containerd can restore shims after restart). if err := writeBootstrapParams(filepath.Join(b.bundle.Path, "bootstrap.json"), params); err != nil { diff --git a/core/runtime/v2/shim.go b/core/runtime/v2/shim.go index db7307bc0..b04b8bbff 100644 --- a/core/runtime/v2/shim.go +++ b/core/runtime/v2/shim.go @@ -111,6 +111,13 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan if err != nil { return nil, fmt.Errorf("unable to make connection: %w", err) } + a := log.Fields{ + "mzq-test": true, + "conn": fmt.Sprintf("%+v", conn), + "params": fmt.Sprintf("%+v", params), + "bundle": fmt.Sprintf("%+v", bundle), + } + log.G(ctx).WithFields(a).Info("shim.go loadShim") defer func() { if retErr != nil { @@ -277,6 +284,12 @@ func makeConnection(ctx context.Context, id string, params client.BootstrapParam } }() + a := log.Fields{ + "mzq-test": true, + "conn": fmt.Sprintf("%+v", conn), + "params": fmt.Sprintf("%+v", params), + } + log.G(ctx).WithFields(a).Info("shim.go makeConnection makeConnection") return ttrpc.NewClient( conn, ttrpc.WithOnClose(onClose), diff --git a/core/runtime/v2/shim_manager.go b/core/runtime/v2/shim_manager.go index 2fef2f3ac..fca4a41cb 100644 --- a/core/runtime/v2/shim_manager.go +++ b/core/runtime/v2/shim_manager.go @@ -207,6 +207,14 @@ func (m *ShimManager) Start(ctx context.Context, id string, bundle *Bundle, opts if err != nil { return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err) } + a := log.Fields { + "mzq-test": true, + "id": id, + "opts": fmt.Sprintf("%+v", opts), + "process": fmt.Sprintf("%+v", process), + "address": fmt.Sprintf("%+v", address), + } + log.G(ctx).WithFields(a).Info("manager.go Start") if err := m.shims.Add(ctx, shim); err != nil { return nil, err @@ -255,8 +263,13 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, ttrpcAddress: m.containerdTTRPCAddress, env: m.env, }) - shim, err := b.Start(ctx, typeurl.MarshalProto(topts), func() { - log.G(ctx).WithField("id", id).Info("shim disconnected") + shim, err := b.Start(ctx, protobuf.FromAny(topts), func() { + a := log.Fields { + "mzq-test": true, + "id": id, + "b": fmt.Sprintf("%+v", b), + } + log.G(ctx).WithFields(a).Info("shim disconnected") cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b) // Remove self from the runtime task list. Even though the cleanupAfterDeadShim() diff --git a/pkg/shim/publisher.go b/pkg/shim/publisher.go index 269f77193..85757a963 100644 --- a/pkg/shim/publisher.go +++ b/pkg/shim/publisher.go @@ -56,7 +56,8 @@ func WithPublishTTRPCOpts(opts ...ttrpc.ClientOpts) PublisherOpts { } // NewPublisher creates a new remote events publisher -func NewPublisher(address string, opts ...PublisherOpts) (*RemoteEventsPublisher, error) { +func NewPublisher(address string) (*RemoteEventsPublisher, error) { + log.L.Infof("mzq-test, publisher.go NewPublisher, address: %v\n", address) client, err := ttrpcutil.NewClient(address) if err != nil { return nil, err diff --git a/pkg/shim/shim.go b/pkg/shim/shim.go index cc8da6f76..8251587d0 100644 --- a/pkg/shim/shim.go +++ b/pkg/shim/shim.go @@ -247,6 +247,20 @@ func run(ctx context.Context, manager Manager, config Config) error { } ttrpcAddress := os.Getenv(ttrpcAddressEnv) + a := log.Fields{ + "mzq-test": true, + "id": id, + "name": name, + "config": fmt.Sprintf("%+v", config), + "signals": fmt.Sprintf("%+v", signals), + "ttrpcAddress": ttrpcAddress, + } + log.G(ctx).WithFields(a).Info("shim.go run") + publisher, err := NewPublisher(ttrpcAddress) + if err != nil { + return err + } + defer publisher.Close() ctx = namespaces.WithNamespace(ctx, namespaceFlag) ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) diff --git a/vendor/github.com/containerd/ttrpc/client.go b/vendor/github.com/containerd/ttrpc/client.go index b1bc7a3fc..4eace5782 100644 --- a/vendor/github.com/containerd/ttrpc/client.go +++ b/vendor/github.com/containerd/ttrpc/client.go @@ -109,6 +109,12 @@ func chainUnaryInterceptors(interceptors []UnaryClientInterceptor, final Invoker // NewClient creates a new ttrpc client using the given connection func NewClient(conn net.Conn, opts ...ClientOpts) *Client { + a := logrus.Fields{ + "mzq-test": true, + "conn": fmt.Sprintf("%+v", conn), + "opts": fmt.Sprintf("%+v", opts), + } + logrus.WithFields(a).Info("client.go NewClient") ctx, cancel := context.WithCancel(context.Background()) channel := newChannel(conn) c := &Client{ @@ -368,7 +374,12 @@ func (c *Client) receiveLoop() error { sid := streamID(msg.header.StreamID) s := c.getStream(sid) if s == nil { - log.G(c.ctx).WithField("stream", sid).Error("ttrpc: received message on inactive stream") + a := logrus.Fields{ + "mzq-test": true, + "stream": sid, + "c": fmt.Sprintf("%+v", c), + } + logrus.WithFields(a).Errorf("ttrpc: received message on inactive stream") continue } diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go index 2e477fee2..120cf04e0 100644 --- a/vendor/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go +++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go @@ -215,10 +215,7 @@ func (conn *Conn) SetWriteDeadline(duration time.Duration) { // Open the connection and create channels for reading and writing. It returns // the selected subprotocol, a slice of channels and an error. func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) { - // serveHTTPComplete is channel that is closed/selected when "websocket#ServeHTTP" finishes. - serveHTTPComplete := make(chan struct{}) - // Ensure panic in spawned goroutine is propagated into the parent goroutine. - panicChan := make(chan any, 1) + klog.Infof("mzq-test, conn.go Open, req: %+v, body: %+v, conn: %+v, timeout: %+v\n", req, req.Body, conn, conn.timeout.String()) go func() { // If websocket server returns, propagate panic if necessary. Otherwise, // signal HTTPServe finished by closing "serveHTTPComplete". @@ -336,7 +333,7 @@ func (conn *Conn) handle(ws *websocket.Conn) { var data []byte if err := websocket.Message.Receive(ws, &data); err != nil { if err != io.EOF { - klog.Errorf("Error on socket receive: %v", err) + klog.Errorf("mzq-test, Error on socket receive: %v, ws: %+v, data: %v", err, ws, string(data)) } break } diff --git a/vendor/k8s.io/kubelet/pkg/cri/streaming/portforward/websocket.go b/vendor/k8s.io/kubelet/pkg/cri/streaming/portforward/websocket.go index 3700a7e22..322e4fe0f 100644 --- a/vendor/k8s.io/kubelet/pkg/cri/streaming/portforward/websocket.go +++ b/vendor/k8s.io/kubelet/pkg/cri/streaming/portforward/websocket.go @@ -114,7 +114,8 @@ func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwar }, }) conn.SetIdleTimeout(idleTimeout) - _, streams, err := conn.Open(responsewriter.GetOriginal(w), req) + klog.Infof("mzq-test, portforward/websocket.go, idleTimeout: %+v, req: %+v, req.body: %+v, podName: %v, uid: %v, opts: %+v, portForwarder: %+v, supportedPortForwardProtocols: %+v, streamCreationTimeout: %+v\n", idleTimeout.String(), req.Body, podName, uid, portForwarder, supportedPortForwardProtocols, streamCreationTimeout.String()) + _, streams, err := conn.Open(httplog.Unlogged(req, w), req) if err != nil { err = fmt.Errorf("unable to upgrade websocket connection: %v", err) return err diff --git a/vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/attach.go b/vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/attach.go index aa638499a..ee54934b2 100644 --- a/vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/attach.go +++ b/vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/attach.go @@ -40,6 +40,7 @@ type Attacher interface { // ServeAttach handles requests to attach to a container. After creating/receiving the required // streams, it delegates the actual attaching to attacher. func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { + fmt.Println("mzq-test, remotecommand/attach.go ServeAttach") ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) if !ok { // error is handled by createStreams diff --git a/vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/exec.go b/vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/exec.go index 5ec6b86a8..4a196e846 100644 --- a/vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/exec.go +++ b/vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/exec.go @@ -43,6 +43,7 @@ type Executor interface { // creating/receiving the required streams, it delegates the actual execution // to the executor. func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { + fmt.Println("mzq-test, remotecommand/exec.go ServeExec") ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) if !ok { // error is handled by createStreams diff --git a/vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/websocket.go b/vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/websocket.go index 259008885..b487878e7 100644 --- a/vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/websocket.go +++ b/vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/websocket.go @@ -23,7 +23,9 @@ import ( "k8s.io/apimachinery/pkg/util/httpstream/wsstream" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apiserver/pkg/endpoints/responsewriter" + "k8s.io/apiserver/pkg/server/httplog" + "k8s.io/apiserver/pkg/util/wsstream" + "k8s.io/klog/v2" ) const ( @@ -95,7 +97,8 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *Opti }, }) conn.SetIdleTimeout(idleTimeout) - negotiatedProtocol, streams, err := conn.Open(responsewriter.GetOriginal(w), req) + klog.Infof("mzq-test, remotecommand/websocket.go createWebSocketStreams, req: %+v, req.Body: %+v, opts: %+v, idleTime: %+v, conn:%+v\n", req, req.Body, opts, idleTimeout.String(), conn) + negotiatedProtocol, streams, err := conn.Open(httplog.Unlogged(req, w), req) if err != nil { runtime.HandleError(fmt.Errorf("unable to upgrade websocket connection: %v", err)) return nil, false diff --git a/vendor/k8s.io/kubelet/pkg/cri/streaming/server.go b/vendor/k8s.io/kubelet/pkg/cri/streaming/server.go index fe5c22b04..e6c371cd4 100644 --- a/vendor/k8s.io/kubelet/pkg/cri/streaming/server.go +++ b/vendor/k8s.io/kubelet/pkg/cri/streaming/server.go @@ -316,8 +316,8 @@ func (s *server) serveAttach(req *restful.Request, resp *restful.Response) { Stderr: attach.Stderr, TTY: attach.Tty, } + resp.ResponseWriter, remotecommandserver.ServeAttach( - resp.ResponseWriter, req.Request, s.runtime, "", // unused: podName