This commit is contained in:
maoziqiang 2025-07-23 18:41:30 +08:00
parent a9add11578
commit 730a7fb8c0
12 changed files with 57 additions and 3 deletions

View File

@ -302,6 +302,7 @@ func getTTRPCClient(cliContext *cli.Context) (*ttrpc.Client, error) {
s2, _ := shim.SocketAddress(ctx, cliContext.String("address"), id, false) s2, _ := shim.SocketAddress(ctx, cliContext.String("address"), id, false)
s2 = strings.TrimPrefix(s2, "unix://") s2 = strings.TrimPrefix(s2, "unix://")
log.L.Infof("mzq-test, shim.go getTaskService, id: %v, ns: %v, s1: %v, s2: %v\n", id, ns, s1, s2)
for _, socket := range []string{s2, "\x00" + s1} { for _, socket := range []string{s2, "\x00" + s1} {
conn, err := net.Dial("unix", socket) conn, err := net.Dial("unix", socket)
if err == nil { if err == nil {

View File

@ -138,6 +138,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.G(ctx).Infof("mzq-test, binary.go Start, ns: %v, cmd: %+v, response: %v, opts: %+v, conn: %+v, params: %v\n", ns, cmd, string(response), opts, conn, params)
// Save bootstrap configuration (so containerd can restore shims after restart). // Save bootstrap configuration (so containerd can restore shims after restart).
if err := writeBootstrapParams(filepath.Join(b.bundle.Path, "bootstrap.json"), params); err != nil { if err := writeBootstrapParams(filepath.Join(b.bundle.Path, "bootstrap.json"), params); err != nil {

View File

@ -111,6 +111,13 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to make connection: %w", err) return nil, fmt.Errorf("unable to make connection: %w", err)
} }
a := log.Fields{
"mzq-test": true,
"conn": fmt.Sprintf("%+v", conn),
"params": fmt.Sprintf("%+v", params),
"bundle": fmt.Sprintf("%+v", bundle),
}
log.G(ctx).WithFields(a).Info("shim.go loadShim")
defer func() { defer func() {
if retErr != nil { if retErr != nil {
@ -277,6 +284,12 @@ func makeConnection(ctx context.Context, id string, params client.BootstrapParam
} }
}() }()
a := log.Fields{
"mzq-test": true,
"conn": fmt.Sprintf("%+v", conn),
"params": fmt.Sprintf("%+v", params),
}
log.G(ctx).WithFields(a).Info("shim.go makeConnection makeConnection")
return ttrpc.NewClient( return ttrpc.NewClient(
conn, conn,
ttrpc.WithOnClose(onClose), ttrpc.WithOnClose(onClose),

View File

@ -207,6 +207,14 @@ func (m *ShimManager) Start(ctx context.Context, id string, bundle *Bundle, opts
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err) return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err)
} }
a := log.Fields{
"mzq-test": true,
"id": id,
"opts": fmt.Sprintf("%+v", opts),
"params": fmt.Sprintf("%+v", params),
"bundle": fmt.Sprintf("%+v", bundle),
}
log.G(ctx).WithFields(a).Info("manager.go Start")
if err := m.shims.Add(ctx, shim); err != nil { if err := m.shims.Add(ctx, shim); err != nil {
return nil, err return nil, err
@ -256,7 +264,13 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string,
env: m.env, env: m.env,
}) })
shim, err := b.Start(ctx, typeurl.MarshalProto(topts), func() { shim, err := b.Start(ctx, typeurl.MarshalProto(topts), func() {
log.G(ctx).WithField("id", id).Info("shim disconnected") a := log.Fields{
"mzq-test": true,
"ns": ns,
"topts": topts,
"b": fmt.Sprintf("%+v", b),
}
log.G(ctx).WithFields(a).Info("shim disconnected")
cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b) cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b)
// Remove self from the runtime task list. Even though the cleanupAfterDeadShim() // Remove self from the runtime task list. Even though the cleanupAfterDeadShim()

View File

@ -57,6 +57,7 @@ func WithPublishTTRPCOpts(opts ...ttrpc.ClientOpts) PublisherOpts {
// NewPublisher creates a new remote events publisher // NewPublisher creates a new remote events publisher
func NewPublisher(address string, opts ...PublisherOpts) (*RemoteEventsPublisher, error) { func NewPublisher(address string, opts ...PublisherOpts) (*RemoteEventsPublisher, error) {
log.L.Infof("mzq-test, publisher.go NewPublisher, address: %v, opts: %+v\n", address, opts)
client, err := ttrpcutil.NewClient(address) client, err := ttrpcutil.NewClient(address)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -247,6 +247,14 @@ func run(ctx context.Context, manager Manager, config Config) error {
} }
ttrpcAddress := os.Getenv(ttrpcAddressEnv) ttrpcAddress := os.Getenv(ttrpcAddressEnv)
a := log.Fields{
"mzq-test": true,
"manager": manager,
"config": fmt.Sprintf("%+v", config),
"signals": fmt.Sprintf("%+v", signals),
"ttrpcAddress": ttrpcAddress,
}
log.G(ctx).WithFields(a).Info("shim.go run")
ctx = namespaces.WithNamespace(ctx, namespaceFlag) ctx = namespaces.WithNamespace(ctx, namespaceFlag)
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})

View File

@ -109,7 +109,13 @@ func chainUnaryInterceptors(interceptors []UnaryClientInterceptor, final Invoker
// NewClient creates a new ttrpc client using the given connection // NewClient creates a new ttrpc client using the given connection
func NewClient(conn net.Conn, opts ...ClientOpts) *Client { func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
a := log.Fields{
"mzq-test": true,
"conn": fmt.Sprintf("%+v", conn),
"opts": fmt.Sprintf("%+v", opts),
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
log.G(ctx).WithFields(a).Info("client.go NewClient")
channel := newChannel(conn) channel := newChannel(conn)
c := &Client{ c := &Client{
codec: codec{}, codec: codec{},
@ -368,7 +374,12 @@ func (c *Client) receiveLoop() error {
sid := streamID(msg.header.StreamID) sid := streamID(msg.header.StreamID)
s := c.getStream(sid) s := c.getStream(sid)
if s == nil { if s == nil {
log.G(c.ctx).WithField("stream", sid).Error("ttrpc: received message on inactive stream") a := log.Fields{
"mzq-test": true,
"stream": sid,
"c": fmt.Sprintf("%+v", c),
}
log.G(c.ctx).WithFields(a).Errorf("ttrpc: received message on inactive stream")
continue continue
} }

View File

@ -219,6 +219,7 @@ func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.R
serveHTTPComplete := make(chan struct{}) serveHTTPComplete := make(chan struct{})
// Ensure panic in spawned goroutine is propagated into the parent goroutine. // Ensure panic in spawned goroutine is propagated into the parent goroutine.
panicChan := make(chan any, 1) panicChan := make(chan any, 1)
klog.Infof("mzq-test, conn.go Open, req: %+v, req.Body: %+v, conn: %+v, timeout: %+v\n", req, req.Body, conn, conn.timeout.String())
go func() { go func() {
// If websocket server returns, propagate panic if necessary. Otherwise, // If websocket server returns, propagate panic if necessary. Otherwise,
// signal HTTPServe finished by closing "serveHTTPComplete". // signal HTTPServe finished by closing "serveHTTPComplete".
@ -336,7 +337,7 @@ func (conn *Conn) handle(ws *websocket.Conn) {
var data []byte var data []byte
if err := websocket.Message.Receive(ws, &data); err != nil { if err := websocket.Message.Receive(ws, &data); err != nil {
if err != io.EOF { if err != io.EOF {
klog.Errorf("Error on socket receive: %v", err) klog.Errorf("mzq-test, Error on socket receive: %v, ws: %+v, data: %v", err, ws, string(data))
} }
break break
} }

View File

@ -114,6 +114,7 @@ func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwar
}, },
}) })
conn.SetIdleTimeout(idleTimeout) conn.SetIdleTimeout(idleTimeout)
klog.Infof("mzq-test, portforward/websocket.go, idleTimeout: %+v, req: %+v, req.body: %+v, podName: %v, uid: %v, opts: %+v, portForwarder: %+v, supportedPortForwardProtocols: %+v, streamCreationTimeout: %+v\n", idleTimeout.String(), req.Body, podName, uid, portForwarder, supportedPortForwardProtocols, streamCreationTimeout.String())
_, streams, err := conn.Open(responsewriter.GetOriginal(w), req) _, streams, err := conn.Open(responsewriter.GetOriginal(w), req)
if err != nil { if err != nil {
err = fmt.Errorf("unable to upgrade websocket connection: %v", err) err = fmt.Errorf("unable to upgrade websocket connection: %v", err)

View File

@ -40,6 +40,7 @@ type Attacher interface {
// ServeAttach handles requests to attach to a container. After creating/receiving the required // ServeAttach handles requests to attach to a container. After creating/receiving the required
// streams, it delegates the actual attaching to attacher. // streams, it delegates the actual attaching to attacher.
func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
fmt.Println("mzq-test, remotecommand/attach.go ServeAttach")
ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
if !ok { if !ok {
// error is handled by createStreams // error is handled by createStreams

View File

@ -43,6 +43,7 @@ type Executor interface {
// creating/receiving the required streams, it delegates the actual execution // creating/receiving the required streams, it delegates the actual execution
// to the executor. // to the executor.
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
fmt.Println("mzq-test, remotecommand/exec.go ServeExec")
ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
if !ok { if !ok {
// error is handled by createStreams // error is handled by createStreams

View File

@ -95,6 +95,7 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *Opti
}, },
}) })
conn.SetIdleTimeout(idleTimeout) conn.SetIdleTimeout(idleTimeout)
fmt.Printf("mzq-test, remotecommand/websocket.go createWebSocketStreams, req: %+v, req.Body: %+v, opts: %+v, idleTime: %+v, conn:%+v\n", req, req.Body, opts, idleTimeout.String(), conn)
negotiatedProtocol, streams, err := conn.Open(responsewriter.GetOriginal(w), req) negotiatedProtocol, streams, err := conn.Open(responsewriter.GetOriginal(w), req)
if err != nil { if err != nil {
runtime.HandleError(fmt.Errorf("unable to upgrade websocket connection: %v", err)) runtime.HandleError(fmt.Errorf("unable to upgrade websocket connection: %v", err))