Compare commits
	
		
			1 Commits
		
	
	
		
			v2.0.4-k3s
			...
			v2.0.4-mzq
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 730a7fb8c0 | 
@@ -302,6 +302,7 @@ func getTTRPCClient(cliContext *cli.Context) (*ttrpc.Client, error) {
 | 
				
			|||||||
	s2, _ := shim.SocketAddress(ctx, cliContext.String("address"), id, false)
 | 
						s2, _ := shim.SocketAddress(ctx, cliContext.String("address"), id, false)
 | 
				
			||||||
	s2 = strings.TrimPrefix(s2, "unix://")
 | 
						s2 = strings.TrimPrefix(s2, "unix://")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						log.L.Infof("mzq-test, shim.go getTaskService, id: %v, ns: %v, s1: %v, s2: %v\n", id, ns, s1, s2)
 | 
				
			||||||
	for _, socket := range []string{s2, "\x00" + s1} {
 | 
						for _, socket := range []string{s2, "\x00" + s1} {
 | 
				
			||||||
		conn, err := net.Dial("unix", socket)
 | 
							conn, err := net.Dial("unix", socket)
 | 
				
			||||||
		if err == nil {
 | 
							if err == nil {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -138,6 +138,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						log.G(ctx).Infof("mzq-test, binary.go Start, ns: %v, cmd: %+v, response: %v, opts: %+v, conn: %+v, params: %v\n", ns, cmd, string(response), opts, conn, params)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Save bootstrap configuration (so containerd can restore shims after restart).
 | 
						// Save bootstrap configuration (so containerd can restore shims after restart).
 | 
				
			||||||
	if err := writeBootstrapParams(filepath.Join(b.bundle.Path, "bootstrap.json"), params); err != nil {
 | 
						if err := writeBootstrapParams(filepath.Join(b.bundle.Path, "bootstrap.json"), params); err != nil {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -111,6 +111,13 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, fmt.Errorf("unable to make connection: %w", err)
 | 
							return nil, fmt.Errorf("unable to make connection: %w", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						a := log.Fields{
 | 
				
			||||||
 | 
							"mzq-test": true,
 | 
				
			||||||
 | 
							"conn":     fmt.Sprintf("%+v", conn),
 | 
				
			||||||
 | 
							"params":   fmt.Sprintf("%+v", params),
 | 
				
			||||||
 | 
							"bundle":   fmt.Sprintf("%+v", bundle),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						log.G(ctx).WithFields(a).Info("shim.go loadShim")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	defer func() {
 | 
						defer func() {
 | 
				
			||||||
		if retErr != nil {
 | 
							if retErr != nil {
 | 
				
			||||||
@@ -277,6 +284,12 @@ func makeConnection(ctx context.Context, id string, params client.BootstrapParam
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
		}()
 | 
							}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							a := log.Fields{
 | 
				
			||||||
 | 
								"mzq-test": true,
 | 
				
			||||||
 | 
								"conn":     fmt.Sprintf("%+v", conn),
 | 
				
			||||||
 | 
								"params":   fmt.Sprintf("%+v", params),
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							log.G(ctx).WithFields(a).Info("shim.go makeConnection makeConnection")
 | 
				
			||||||
		return ttrpc.NewClient(
 | 
							return ttrpc.NewClient(
 | 
				
			||||||
			conn,
 | 
								conn,
 | 
				
			||||||
			ttrpc.WithOnClose(onClose),
 | 
								ttrpc.WithOnClose(onClose),
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -207,6 +207,14 @@ func (m *ShimManager) Start(ctx context.Context, id string, bundle *Bundle, opts
 | 
				
			|||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err)
 | 
								return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							a := log.Fields{
 | 
				
			||||||
 | 
								"mzq-test": true,
 | 
				
			||||||
 | 
								"id":       id,
 | 
				
			||||||
 | 
								"opts":     fmt.Sprintf("%+v", opts),
 | 
				
			||||||
 | 
								"params":   fmt.Sprintf("%+v", params),
 | 
				
			||||||
 | 
								"bundle":   fmt.Sprintf("%+v", bundle),
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							log.G(ctx).WithFields(a).Info("manager.go Start")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if err := m.shims.Add(ctx, shim); err != nil {
 | 
							if err := m.shims.Add(ctx, shim); err != nil {
 | 
				
			||||||
			return nil, err
 | 
								return nil, err
 | 
				
			||||||
@@ -256,7 +264,13 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string,
 | 
				
			|||||||
		env:          m.env,
 | 
							env:          m.env,
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	shim, err := b.Start(ctx, typeurl.MarshalProto(topts), func() {
 | 
						shim, err := b.Start(ctx, typeurl.MarshalProto(topts), func() {
 | 
				
			||||||
		log.G(ctx).WithField("id", id).Info("shim disconnected")
 | 
							a := log.Fields{
 | 
				
			||||||
 | 
								"mzq-test": true,
 | 
				
			||||||
 | 
								"ns":       ns,
 | 
				
			||||||
 | 
								"topts":    topts,
 | 
				
			||||||
 | 
								"b":        fmt.Sprintf("%+v", b),
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							log.G(ctx).WithFields(a).Info("shim disconnected")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b)
 | 
							cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b)
 | 
				
			||||||
		// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
 | 
							// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -57,6 +57,7 @@ func WithPublishTTRPCOpts(opts ...ttrpc.ClientOpts) PublisherOpts {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// NewPublisher creates a new remote events publisher
 | 
					// NewPublisher creates a new remote events publisher
 | 
				
			||||||
func NewPublisher(address string, opts ...PublisherOpts) (*RemoteEventsPublisher, error) {
 | 
					func NewPublisher(address string, opts ...PublisherOpts) (*RemoteEventsPublisher, error) {
 | 
				
			||||||
 | 
						log.L.Infof("mzq-test, publisher.go NewPublisher, address: %v, opts: %+v\n", address, opts)
 | 
				
			||||||
	client, err := ttrpcutil.NewClient(address)
 | 
						client, err := ttrpcutil.NewClient(address)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -247,6 +247,14 @@ func run(ctx context.Context, manager Manager, config Config) error {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ttrpcAddress := os.Getenv(ttrpcAddressEnv)
 | 
						ttrpcAddress := os.Getenv(ttrpcAddressEnv)
 | 
				
			||||||
 | 
						a := log.Fields{
 | 
				
			||||||
 | 
							"mzq-test":     true,
 | 
				
			||||||
 | 
							"manager":      manager,
 | 
				
			||||||
 | 
							"config":       fmt.Sprintf("%+v", config),
 | 
				
			||||||
 | 
							"signals":      fmt.Sprintf("%+v", signals),
 | 
				
			||||||
 | 
							"ttrpcAddress": ttrpcAddress,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						log.G(ctx).WithFields(a).Info("shim.go run")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ctx = namespaces.WithNamespace(ctx, namespaceFlag)
 | 
						ctx = namespaces.WithNamespace(ctx, namespaceFlag)
 | 
				
			||||||
	ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
 | 
						ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										13
									
								
								vendor/github.com/containerd/ttrpc/client.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										13
									
								
								vendor/github.com/containerd/ttrpc/client.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -109,7 +109,13 @@ func chainUnaryInterceptors(interceptors []UnaryClientInterceptor, final Invoker
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// NewClient creates a new ttrpc client using the given connection
 | 
					// NewClient creates a new ttrpc client using the given connection
 | 
				
			||||||
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
 | 
					func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
 | 
				
			||||||
 | 
						a := log.Fields{
 | 
				
			||||||
 | 
							"mzq-test": true,
 | 
				
			||||||
 | 
							"conn":     fmt.Sprintf("%+v", conn),
 | 
				
			||||||
 | 
							"opts":     fmt.Sprintf("%+v", opts),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
						ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
 | 
						log.G(ctx).WithFields(a).Info("client.go NewClient")
 | 
				
			||||||
	channel := newChannel(conn)
 | 
						channel := newChannel(conn)
 | 
				
			||||||
	c := &Client{
 | 
						c := &Client{
 | 
				
			||||||
		codec:           codec{},
 | 
							codec:           codec{},
 | 
				
			||||||
@@ -368,7 +374,12 @@ func (c *Client) receiveLoop() error {
 | 
				
			|||||||
			sid := streamID(msg.header.StreamID)
 | 
								sid := streamID(msg.header.StreamID)
 | 
				
			||||||
			s := c.getStream(sid)
 | 
								s := c.getStream(sid)
 | 
				
			||||||
			if s == nil {
 | 
								if s == nil {
 | 
				
			||||||
				log.G(c.ctx).WithField("stream", sid).Error("ttrpc: received message on inactive stream")
 | 
									a := log.Fields{
 | 
				
			||||||
 | 
										"mzq-test": true,
 | 
				
			||||||
 | 
										"stream":   sid,
 | 
				
			||||||
 | 
										"c":        fmt.Sprintf("%+v", c),
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									log.G(c.ctx).WithFields(a).Errorf("ttrpc: received message on inactive stream")
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										3
									
								
								vendor/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								vendor/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -219,6 +219,7 @@ func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.R
 | 
				
			|||||||
	serveHTTPComplete := make(chan struct{})
 | 
						serveHTTPComplete := make(chan struct{})
 | 
				
			||||||
	// Ensure panic in spawned goroutine is propagated into the parent goroutine.
 | 
						// Ensure panic in spawned goroutine is propagated into the parent goroutine.
 | 
				
			||||||
	panicChan := make(chan any, 1)
 | 
						panicChan := make(chan any, 1)
 | 
				
			||||||
 | 
						klog.Infof("mzq-test, conn.go Open, req: %+v, req.Body: %+v, conn: %+v, timeout: %+v\n", req, req.Body, conn, conn.timeout.String())
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		// If websocket server returns, propagate panic if necessary. Otherwise,
 | 
							// If websocket server returns, propagate panic if necessary. Otherwise,
 | 
				
			||||||
		// signal HTTPServe finished by closing "serveHTTPComplete".
 | 
							// signal HTTPServe finished by closing "serveHTTPComplete".
 | 
				
			||||||
@@ -336,7 +337,7 @@ func (conn *Conn) handle(ws *websocket.Conn) {
 | 
				
			|||||||
		var data []byte
 | 
							var data []byte
 | 
				
			||||||
		if err := websocket.Message.Receive(ws, &data); err != nil {
 | 
							if err := websocket.Message.Receive(ws, &data); err != nil {
 | 
				
			||||||
			if err != io.EOF {
 | 
								if err != io.EOF {
 | 
				
			||||||
				klog.Errorf("Error on socket receive: %v", err)
 | 
									klog.Errorf("mzq-test, Error on socket receive: %v, ws: %+v, data: %v", err, ws, string(data))
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			break
 | 
								break
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										1
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/portforward/websocket.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/portforward/websocket.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -114,6 +114,7 @@ func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwar
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	conn.SetIdleTimeout(idleTimeout)
 | 
						conn.SetIdleTimeout(idleTimeout)
 | 
				
			||||||
 | 
						klog.Infof("mzq-test, portforward/websocket.go, idleTimeout: %+v, req: %+v, req.body: %+v, podName: %v, uid: %v, opts: %+v, portForwarder: %+v, supportedPortForwardProtocols: %+v, streamCreationTimeout: %+v\n", idleTimeout.String(), req.Body, podName, uid, portForwarder, supportedPortForwardProtocols, streamCreationTimeout.String())
 | 
				
			||||||
	_, streams, err := conn.Open(responsewriter.GetOriginal(w), req)
 | 
						_, streams, err := conn.Open(responsewriter.GetOriginal(w), req)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		err = fmt.Errorf("unable to upgrade websocket connection: %v", err)
 | 
							err = fmt.Errorf("unable to upgrade websocket connection: %v", err)
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										1
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/attach.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/attach.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -40,6 +40,7 @@ type Attacher interface {
 | 
				
			|||||||
// ServeAttach handles requests to attach to a container. After creating/receiving the required
 | 
					// ServeAttach handles requests to attach to a container. After creating/receiving the required
 | 
				
			||||||
// streams, it delegates the actual attaching to attacher.
 | 
					// streams, it delegates the actual attaching to attacher.
 | 
				
			||||||
func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
 | 
					func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
 | 
				
			||||||
 | 
						fmt.Println("mzq-test, remotecommand/attach.go ServeAttach")
 | 
				
			||||||
	ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
 | 
						ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
 | 
				
			||||||
	if !ok {
 | 
						if !ok {
 | 
				
			||||||
		// error is handled by createStreams
 | 
							// error is handled by createStreams
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										1
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/exec.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/exec.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -43,6 +43,7 @@ type Executor interface {
 | 
				
			|||||||
// creating/receiving the required streams, it delegates the actual execution
 | 
					// creating/receiving the required streams, it delegates the actual execution
 | 
				
			||||||
// to the executor.
 | 
					// to the executor.
 | 
				
			||||||
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
 | 
					func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
 | 
				
			||||||
 | 
						fmt.Println("mzq-test, remotecommand/exec.go ServeExec")
 | 
				
			||||||
	ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
 | 
						ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
 | 
				
			||||||
	if !ok {
 | 
						if !ok {
 | 
				
			||||||
		// error is handled by createStreams
 | 
							// error is handled by createStreams
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										1
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/websocket.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								vendor/k8s.io/kubelet/pkg/cri/streaming/remotecommand/websocket.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -95,6 +95,7 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *Opti
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	conn.SetIdleTimeout(idleTimeout)
 | 
						conn.SetIdleTimeout(idleTimeout)
 | 
				
			||||||
 | 
						fmt.Printf("mzq-test, remotecommand/websocket.go createWebSocketStreams, req: %+v, req.Body: %+v, opts: %+v, idleTime: %+v, conn:%+v\n", req, req.Body, opts, idleTimeout.String(), conn)
 | 
				
			||||||
	negotiatedProtocol, streams, err := conn.Open(responsewriter.GetOriginal(w), req)
 | 
						negotiatedProtocol, streams, err := conn.Open(responsewriter.GetOriginal(w), req)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		runtime.HandleError(fmt.Errorf("unable to upgrade websocket connection: %v", err))
 | 
							runtime.HandleError(fmt.Errorf("unable to upgrade websocket connection: %v", err))
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user