mzq-test
This commit is contained in:
		| @@ -302,6 +302,7 @@ func getTTRPCClient(cliContext *cli.Context) (*ttrpc.Client, error) { | ||||
| 	s2, _ := shim.SocketAddress(ctx, cliContext.String("address"), id, false) | ||||
| 	s2 = strings.TrimPrefix(s2, "unix://") | ||||
|  | ||||
| 	log.L.Infof("mzq-test, shim.go getTaskService, id: %v, ns: %v, s1: %v, s2: %v\n", id, ns, s1, s2) | ||||
| 	for _, socket := range []string{s2, "\x00" + s1} { | ||||
| 		conn, err := net.Dial("unix", socket) | ||||
| 		if err == nil { | ||||
|   | ||||
| @@ -138,6 +138,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	log.G(ctx).Infof("mzq-test, binary.go Start, ns: %v, cmd: %+v, response: %v, opts: %+v, conn: %+v, params: %v\n", ns, cmd, string(response), opts, conn, params) | ||||
|  | ||||
| 	// Save bootstrap configuration (so containerd can restore shims after restart). | ||||
| 	if err := writeBootstrapParams(filepath.Join(b.bundle.Path, "bootstrap.json"), params); err != nil { | ||||
|   | ||||
| @@ -111,6 +111,13 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("unable to make connection: %w", err) | ||||
| 	} | ||||
| 	a := log.Fields { | ||||
| 		"mzq-test": true, | ||||
| 		"conn": fmt.Sprintf("%+v", conn), | ||||
| 		"params": fmt.Sprintf("%+v", params), | ||||
| 		"bundle": fmt.Sprintf("%+v", bundle), | ||||
| 	} | ||||
| 	log.G(ctx).WithFields(a).Info("shim.go loadShim") | ||||
|  | ||||
| 	defer func() { | ||||
| 		if retErr != nil { | ||||
| @@ -277,11 +284,13 @@ func makeConnection(ctx context.Context, id string, params client.BootstrapParam | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| 		return ttrpc.NewClient( | ||||
| 			conn, | ||||
| 			ttrpc.WithOnClose(onClose), | ||||
| 			ttrpc.WithUnaryClientInterceptor(otelttrpc.UnaryClientInterceptor()), | ||||
| 		), nil | ||||
| 		a := log.Fields{ | ||||
| 			"mzq-test": true, | ||||
| 			"conn":     fmt.Sprintf("%+v", conn), | ||||
| 			"params":   fmt.Sprintf("%+v", params), | ||||
| 		} | ||||
| 		log.G(ctx).WithFields(a).Info("shim.go makeConnection makeConnection") | ||||
| 		return ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)), nil | ||||
| 	case "grpc": | ||||
| 		gopts := []grpc.DialOption{ | ||||
| 			grpc.WithTransportCredentials(insecure.NewCredentials()), | ||||
|   | ||||
| @@ -161,8 +161,23 @@ func (m *ShimManager) ID() string { | ||||
| 	return plugins.ShimPlugin.String() + ".manager" | ||||
| } | ||||
|  | ||||
| <<<<<<< HEAD:core/runtime/v2/shim_manager.go | ||||
| // Start launches a new shim instance | ||||
| func (m *ShimManager) Start(ctx context.Context, id string, bundle *Bundle, opts runtime.CreateOpts) (_ ShimInstance, retErr error) { | ||||
| ======= | ||||
| func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) { | ||||
| // Start launches a new shim instance | ||||
| 	bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer func() { | ||||
| 		if retErr != nil { | ||||
| 			bundle.Delete() | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| >>>>>>> 816b01e62 (mzq-test):runtime/v2/manager.go | ||||
| 	// This container belongs to sandbox which supposed to be already started via sandbox API. | ||||
| 	if opts.SandboxID != "" { | ||||
| 		var params shimbinary.BootstrapParams | ||||
| @@ -207,6 +222,14 @@ func (m *ShimManager) Start(ctx context.Context, id string, bundle *Bundle, opts | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err) | ||||
| 		} | ||||
| 		a := log.Fields { | ||||
| 			"mzq-test": true, | ||||
| 			"id": id, | ||||
| 			"opts": fmt.Sprintf("%+v", opts), | ||||
| 			"process": fmt.Sprintf("%+v", process), | ||||
| 			"address": fmt.Sprintf("%+v", address), | ||||
| 		} | ||||
| 		log.G(ctx).WithFields(a).Info("manager.go Start") | ||||
|  | ||||
| 		if err := m.shims.Add(ctx, shim); err != nil { | ||||
| 			return nil, err | ||||
| @@ -255,8 +278,18 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, | ||||
| 		ttrpcAddress: m.containerdTTRPCAddress, | ||||
| 		env:          m.env, | ||||
| 	}) | ||||
| <<<<<<< HEAD:core/runtime/v2/shim_manager.go | ||||
| 	shim, err := b.Start(ctx, typeurl.MarshalProto(topts), func() { | ||||
| 		log.G(ctx).WithField("id", id).Info("shim disconnected") | ||||
| ======= | ||||
| 	shim, err := b.Start(ctx, protobuf.FromAny(topts), func() { | ||||
| 		a := log.Fields { | ||||
| 			"mzq-test": true, | ||||
| 			"id": id, | ||||
| 			"b": fmt.Sprintf("%+v", b), | ||||
| 		} | ||||
| 		log.G(ctx).WithFields(a).Info("shim disconnected") | ||||
| >>>>>>> 816b01e62 (mzq-test):runtime/v2/manager.go | ||||
|  | ||||
| 		cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b) | ||||
| 		// Remove self from the runtime task list. Even though the cleanupAfterDeadShim() | ||||
|   | ||||
| @@ -56,7 +56,8 @@ func WithPublishTTRPCOpts(opts ...ttrpc.ClientOpts) PublisherOpts { | ||||
| } | ||||
|  | ||||
| // NewPublisher creates a new remote events publisher | ||||
| func NewPublisher(address string, opts ...PublisherOpts) (*RemoteEventsPublisher, error) { | ||||
| func NewPublisher(address string) (*RemoteEventsPublisher, error) { | ||||
| 	log.L.Infof("mzq-test, publisher.go NewPublisher, address: %v\n", address) | ||||
| 	client, err := ttrpcutil.NewClient(address) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
|   | ||||
| @@ -247,6 +247,20 @@ func run(ctx context.Context, manager Manager, config Config) error { | ||||
| 	} | ||||
|  | ||||
| 	ttrpcAddress := os.Getenv(ttrpcAddressEnv) | ||||
| 	a := log.Fields{ | ||||
| 		"mzq-test":     true, | ||||
| 		"id":           id, | ||||
| 		"name":         name, | ||||
| 		"config":       fmt.Sprintf("%+v", config), | ||||
| 		"signals":      fmt.Sprintf("%+v", signals), | ||||
| 		"ttrpcAddress": ttrpcAddress, | ||||
| 	} | ||||
| 	log.G(ctx).WithFields(a).Info("shim.go run") | ||||
| 	publisher, err := NewPublisher(ttrpcAddress) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	defer publisher.Close() | ||||
|  | ||||
| 	ctx = namespaces.WithNamespace(ctx, namespaceFlag) | ||||
| 	ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) | ||||
|   | ||||
							
								
								
									
										13
									
								
								vendor/github.com/containerd/ttrpc/client.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										13
									
								
								vendor/github.com/containerd/ttrpc/client.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -109,6 +109,12 @@ func chainUnaryInterceptors(interceptors []UnaryClientInterceptor, final Invoker | ||||
|  | ||||
| // NewClient creates a new ttrpc client using the given connection | ||||
| func NewClient(conn net.Conn, opts ...ClientOpts) *Client { | ||||
| 	a := logrus.Fields{ | ||||
| 		"mzq-test": true, | ||||
| 		"conn":     fmt.Sprintf("%+v", conn), | ||||
| 		"opts":     fmt.Sprintf("%+v", opts), | ||||
| 	} | ||||
| 	logrus.WithFields(a).Info("client.go NewClient") | ||||
| 	ctx, cancel := context.WithCancel(context.Background()) | ||||
| 	channel := newChannel(conn) | ||||
| 	c := &Client{ | ||||
| @@ -368,7 +374,12 @@ func (c *Client) receiveLoop() error { | ||||
| 			sid := streamID(msg.header.StreamID) | ||||
| 			s := c.getStream(sid) | ||||
| 			if s == nil { | ||||
| 				log.G(c.ctx).WithField("stream", sid).Error("ttrpc: received message on inactive stream") | ||||
| 				a := logrus.Fields{ | ||||
| 					"mzq-test": true, | ||||
| 					"stream":   sid, | ||||
| 					"c":        fmt.Sprintf("%+v", c), | ||||
| 				} | ||||
| 				logrus.WithFields(a).Errorf("ttrpc: received message on inactive stream") | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
|   | ||||
							
								
								
									
										7
									
								
								vendor/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										7
									
								
								vendor/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -215,10 +215,7 @@ func (conn *Conn) SetWriteDeadline(duration time.Duration) { | ||||
| // Open the connection and create channels for reading and writing. It returns | ||||
| // the selected subprotocol, a slice of channels and an error. | ||||
| func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) { | ||||
| 	// serveHTTPComplete is channel that is closed/selected when "websocket#ServeHTTP" finishes. | ||||
| 	serveHTTPComplete := make(chan struct{}) | ||||
| 	// Ensure panic in spawned goroutine is propagated into the parent goroutine. | ||||
| 	panicChan := make(chan any, 1) | ||||
| 	klog.Infof("mzq-test, conn.go Open, req: %+v, body: %+v, conn: %+v, timeout: %+v\n", req, req.Body, conn, conn.timeout.String()) | ||||
| 	go func() { | ||||
| 		// If websocket server returns, propagate panic if necessary. Otherwise, | ||||
| 		// signal HTTPServe finished by closing "serveHTTPComplete". | ||||
| @@ -336,7 +333,7 @@ func (conn *Conn) handle(ws *websocket.Conn) { | ||||
| 		var data []byte | ||||
| 		if err := websocket.Message.Receive(ws, &data); err != nil { | ||||
| 			if err != io.EOF { | ||||
| 				klog.Errorf("Error on socket receive: %v", err) | ||||
| 				klog.Errorf("mzq-test, Error on socket receive: %v, ws: %+v, data: %v", err, ws, string(data)) | ||||
| 			} | ||||
| 			break | ||||
| 		} | ||||
|   | ||||
							
								
								
									
										3
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/portforward/websocket.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/portforward/websocket.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -114,7 +114,8 @@ func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwar | ||||
| 		}, | ||||
| 	}) | ||||
| 	conn.SetIdleTimeout(idleTimeout) | ||||
| 	_, streams, err := conn.Open(responsewriter.GetOriginal(w), req) | ||||
| 	klog.Infof("mzq-test, portforward/websocket.go, idleTimeout: %+v, req: %+v, req.body: %+v, podName: %v, uid: %v, opts: %+v, portForwarder: %+v, supportedPortForwardProtocols: %+v, streamCreationTimeout: %+v\n", idleTimeout.String(), req.Body, podName, uid, portForwarder, supportedPortForwardProtocols, streamCreationTimeout.String()) | ||||
| 	_, streams, err := conn.Open(httplog.Unlogged(req, w), req) | ||||
| 	if err != nil { | ||||
| 		err = fmt.Errorf("unable to upgrade websocket connection: %v", err) | ||||
| 		return err | ||||
|   | ||||
							
								
								
									
										1
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/attach.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/attach.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -40,6 +40,7 @@ type Attacher interface { | ||||
| // ServeAttach handles requests to attach to a container. After creating/receiving the required | ||||
| // streams, it delegates the actual attaching to attacher. | ||||
| func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { | ||||
| 	fmt.Println("mzq-test, remotecommand/attach.go ServeAttach") | ||||
| 	ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) | ||||
| 	if !ok { | ||||
| 		// error is handled by createStreams | ||||
|   | ||||
							
								
								
									
										1
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/exec.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/exec.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -43,6 +43,7 @@ type Executor interface { | ||||
| // creating/receiving the required streams, it delegates the actual execution | ||||
| // to the executor. | ||||
| func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { | ||||
| 	fmt.Println("mzq-test, remotecommand/exec.go ServeExec") | ||||
| 	ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) | ||||
| 	if !ok { | ||||
| 		// error is handled by createStreams | ||||
|   | ||||
							
								
								
									
										7
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/websocket.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										7
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/websocket.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -23,7 +23,9 @@ import ( | ||||
|  | ||||
| 	"k8s.io/apimachinery/pkg/util/httpstream/wsstream" | ||||
| 	"k8s.io/apimachinery/pkg/util/runtime" | ||||
| 	"k8s.io/apiserver/pkg/endpoints/responsewriter" | ||||
| 	"k8s.io/apiserver/pkg/server/httplog" | ||||
| 	"k8s.io/apiserver/pkg/util/wsstream" | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| @@ -95,7 +97,8 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *Opti | ||||
| 		}, | ||||
| 	}) | ||||
| 	conn.SetIdleTimeout(idleTimeout) | ||||
| 	negotiatedProtocol, streams, err := conn.Open(responsewriter.GetOriginal(w), req) | ||||
| 	klog.Infof("mzq-test, remotecommand/websocket.go createWebSocketStreams, req: %+v, req.Body: %+v, opts: %+v, idleTime: %+v, conn:%+v\n", req, req.Body, opts, idleTimeout.String(), conn) | ||||
| 	negotiatedProtocol, streams, err := conn.Open(httplog.Unlogged(req, w), req) | ||||
| 	if err != nil { | ||||
| 		runtime.HandleError(fmt.Errorf("unable to upgrade websocket connection: %v", err)) | ||||
| 		return nil, false | ||||
|   | ||||
							
								
								
									
										2
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/server.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/server.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -316,8 +316,8 @@ func (s *server) serveAttach(req *restful.Request, resp *restful.Response) { | ||||
| 		Stderr: attach.Stderr, | ||||
| 		TTY:    attach.Tty, | ||||
| 	} | ||||
| 	resp.ResponseWriter, | ||||
| 	remotecommandserver.ServeAttach( | ||||
| 		resp.ResponseWriter, | ||||
| 		req.Request, | ||||
| 		s.runtime, | ||||
| 		"", // unused: podName | ||||
|   | ||||
		Reference in New Issue
	
	Block a user