mzq-test
This commit is contained in:
parent
57c526b000
commit
816b01e621
@ -244,6 +244,7 @@ func getTaskService(context *cli.Context) (task.TaskService, error) {
|
|||||||
s2, _ := shim.SocketAddress(ctx, context.GlobalString("address"), id)
|
s2, _ := shim.SocketAddress(ctx, context.GlobalString("address"), id)
|
||||||
s2 = strings.TrimPrefix(s2, "unix://")
|
s2 = strings.TrimPrefix(s2, "unix://")
|
||||||
|
|
||||||
|
log.L.Infof("mzq-test, shim.go getTaskService, id: %v, ns: %v, s1: %v, s2: %v\n", id, ns, s1, s2)
|
||||||
for _, socket := range []string{s2, "\x00" + s1} {
|
for _, socket := range []string{s2, "\x00" + s1} {
|
||||||
conn, err := net.Dial("unix", socket)
|
conn, err := net.Dial("unix", socket)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -129,6 +129,7 @@ func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwar
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
conn.SetIdleTimeout(idleTimeout)
|
conn.SetIdleTimeout(idleTimeout)
|
||||||
|
klog.Infof("mzq-test, portforward/websocket.go, idleTimeout: %+v, req: %+v, req.body: %+v, podName: %v, uid: %v, opts: %+v, portForwarder: %+v, supportedPortForwardProtocols: %+v, streamCreationTimeout: %+v\n", idleTimeout.String(), req.Body, podName, uid, portForwarder, supportedPortForwardProtocols, streamCreationTimeout.String())
|
||||||
_, streams, err := conn.Open(httplog.Unlogged(req, w), req)
|
_, streams, err := conn.Open(httplog.Unlogged(req, w), req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("unable to upgrade websocket connection: %v", err)
|
err = fmt.Errorf("unable to upgrade websocket connection: %v", err)
|
||||||
|
@ -55,6 +55,7 @@ type Attacher interface {
|
|||||||
// ServeAttach handles requests to attach to a container. After creating/receiving the required
|
// ServeAttach handles requests to attach to a container. After creating/receiving the required
|
||||||
// streams, it delegates the actual attaching to attacher.
|
// streams, it delegates the actual attaching to attacher.
|
||||||
func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
|
func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
|
||||||
|
fmt.Println("mzq-test, remotecommand/attach.go ServeAttach")
|
||||||
ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
|
ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
|
||||||
if !ok {
|
if !ok {
|
||||||
// error is handled by createStreams
|
// error is handled by createStreams
|
||||||
|
@ -58,6 +58,7 @@ type Executor interface {
|
|||||||
// creating/receiving the required streams, it delegates the actual execution
|
// creating/receiving the required streams, it delegates the actual execution
|
||||||
// to the executor.
|
// to the executor.
|
||||||
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
|
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
|
||||||
|
fmt.Println("mzq-test, remotecommand/exec.go ServeExec")
|
||||||
ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
|
ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
|
||||||
if !ok {
|
if !ok {
|
||||||
// error is handled by createStreams
|
// error is handled by createStreams
|
||||||
|
@ -40,6 +40,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apiserver/pkg/server/httplog"
|
"k8s.io/apiserver/pkg/server/httplog"
|
||||||
"k8s.io/apiserver/pkg/util/wsstream"
|
"k8s.io/apiserver/pkg/util/wsstream"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -111,6 +112,7 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *Opti
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
conn.SetIdleTimeout(idleTimeout)
|
conn.SetIdleTimeout(idleTimeout)
|
||||||
|
klog.Infof("mzq-test, remotecommand/websocket.go createWebSocketStreams, req: %+v, req.Body: %+v, opts: %+v, idleTime: %+v, conn:%+v\n", req, req.Body, opts, idleTimeout.String(), conn)
|
||||||
negotiatedProtocol, streams, err := conn.Open(httplog.Unlogged(req, w), req)
|
negotiatedProtocol, streams, err := conn.Open(httplog.Unlogged(req, w), req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
runtime.HandleError(fmt.Errorf("unable to upgrade websocket connection: %v", err))
|
runtime.HandleError(fmt.Errorf("unable to upgrade websocket connection: %v", err))
|
||||||
|
@ -334,8 +334,8 @@ func (s *server) serveAttach(req *restful.Request, resp *restful.Response) {
|
|||||||
Stderr: attach.Stderr,
|
Stderr: attach.Stderr,
|
||||||
TTY: attach.Tty,
|
TTY: attach.Tty,
|
||||||
}
|
}
|
||||||
|
resp.ResponseWriter,
|
||||||
remotecommandserver.ServeAttach(
|
remotecommandserver.ServeAttach(
|
||||||
resp.ResponseWriter,
|
|
||||||
req.Request,
|
req.Request,
|
||||||
s.runtime,
|
s.runtime,
|
||||||
"", // unused: podName
|
"", // unused: podName
|
||||||
|
@ -138,6 +138,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
log.G(ctx).Infof("mzq-test, binary.go Start, ns: %v, cmd: %+v, response: %v, opts: %+v, conn: %+v, params: %v\n", ns, cmd, string(response), opts, conn, params)
|
||||||
|
|
||||||
return &shim{
|
return &shim{
|
||||||
bundle: b.bundle,
|
bundle: b.bundle,
|
||||||
|
@ -180,8 +180,8 @@ func (m *ShimManager) ID() string {
|
|||||||
return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "shim")
|
return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "shim")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start launches a new shim instance
|
|
||||||
func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) {
|
func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) {
|
||||||
|
// Start launches a new shim instance
|
||||||
bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec)
|
bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -218,6 +218,14 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err)
|
return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err)
|
||||||
}
|
}
|
||||||
|
a := log.Fields {
|
||||||
|
"mzq-test": true,
|
||||||
|
"id": id,
|
||||||
|
"opts": fmt.Sprintf("%+v", opts),
|
||||||
|
"process": fmt.Sprintf("%+v", process),
|
||||||
|
"address": fmt.Sprintf("%+v", address),
|
||||||
|
}
|
||||||
|
log.G(ctx).WithFields(a).Info("manager.go Start")
|
||||||
|
|
||||||
if err := m.shims.Add(ctx, shim); err != nil {
|
if err := m.shims.Add(ctx, shim); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -267,7 +275,12 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string,
|
|||||||
schedCore: m.schedCore,
|
schedCore: m.schedCore,
|
||||||
})
|
})
|
||||||
shim, err := b.Start(ctx, protobuf.FromAny(topts), func() {
|
shim, err := b.Start(ctx, protobuf.FromAny(topts), func() {
|
||||||
log.G(ctx).WithField("id", id).Info("shim disconnected")
|
a := log.Fields {
|
||||||
|
"mzq-test": true,
|
||||||
|
"id": id,
|
||||||
|
"b": fmt.Sprintf("%+v", b),
|
||||||
|
}
|
||||||
|
log.G(ctx).WithFields(a).Info("shim disconnected")
|
||||||
|
|
||||||
cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b)
|
cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b)
|
||||||
// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
|
// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
|
||||||
|
@ -118,6 +118,13 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
a := log.Fields {
|
||||||
|
"mzq-test": true,
|
||||||
|
"conn": fmt.Sprintf("%+v", conn),
|
||||||
|
"params": fmt.Sprintf("%+v", params),
|
||||||
|
"bundle": fmt.Sprintf("%+v", bundle),
|
||||||
|
}
|
||||||
|
log.G(ctx).WithFields(a).Info("shim.go loadShim")
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if retErr != nil {
|
if retErr != nil {
|
||||||
@ -244,6 +251,12 @@ func makeConnection(ctx context.Context, params client.BootstrapParams, onClose
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
a := log.Fields{
|
||||||
|
"mzq-test": true,
|
||||||
|
"conn": fmt.Sprintf("%+v", conn),
|
||||||
|
"params": fmt.Sprintf("%+v", params),
|
||||||
|
}
|
||||||
|
log.G(ctx).WithFields(a).Info("shim.go makeConnection makeConnection")
|
||||||
return ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)), nil
|
return ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)), nil
|
||||||
case "grpc":
|
case "grpc":
|
||||||
ctx, cancel := context.WithTimeout(ctx, time.Second*100)
|
ctx, cancel := context.WithTimeout(ctx, time.Second*100)
|
||||||
|
@ -43,6 +43,7 @@ type item struct {
|
|||||||
|
|
||||||
// NewPublisher creates a new remote events publisher
|
// NewPublisher creates a new remote events publisher
|
||||||
func NewPublisher(address string) (*RemoteEventsPublisher, error) {
|
func NewPublisher(address string) (*RemoteEventsPublisher, error) {
|
||||||
|
log.L.Infof("mzq-test, publisher.go NewPublisher, address: %v\n", address)
|
||||||
client, err := ttrpcutil.NewClient(address)
|
client, err := ttrpcutil.NewClient(address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -290,6 +290,15 @@ func run(ctx context.Context, manager Manager, initFunc Init, name string, confi
|
|||||||
}
|
}
|
||||||
|
|
||||||
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
|
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
|
||||||
|
a := log.Fields{
|
||||||
|
"mzq-test": true,
|
||||||
|
"id": id,
|
||||||
|
"name": name,
|
||||||
|
"config": fmt.Sprintf("%+v", config),
|
||||||
|
"signals": fmt.Sprintf("%+v", signals),
|
||||||
|
"ttrpcAddress": ttrpcAddress,
|
||||||
|
}
|
||||||
|
log.G(ctx).WithFields(a).Info("shim.go run")
|
||||||
publisher, err := NewPublisher(ttrpcAddress)
|
publisher, err := NewPublisher(ttrpcAddress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
13
vendor/github.com/containerd/ttrpc/client.go
generated
vendored
13
vendor/github.com/containerd/ttrpc/client.go
generated
vendored
@ -73,6 +73,12 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
|
|||||||
|
|
||||||
// NewClient creates a new ttrpc client using the given connection
|
// NewClient creates a new ttrpc client using the given connection
|
||||||
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
|
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
|
||||||
|
a := logrus.Fields{
|
||||||
|
"mzq-test": true,
|
||||||
|
"conn": fmt.Sprintf("%+v", conn),
|
||||||
|
"opts": fmt.Sprintf("%+v", opts),
|
||||||
|
}
|
||||||
|
logrus.WithFields(a).Info("client.go NewClient")
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
channel := newChannel(conn)
|
channel := newChannel(conn)
|
||||||
c := &Client{
|
c := &Client{
|
||||||
@ -323,7 +329,12 @@ func (c *Client) receiveLoop() error {
|
|||||||
sid := streamID(msg.header.StreamID)
|
sid := streamID(msg.header.StreamID)
|
||||||
s := c.getStream(sid)
|
s := c.getStream(sid)
|
||||||
if s == nil {
|
if s == nil {
|
||||||
logrus.WithField("stream", sid).Errorf("ttrpc: received message on inactive stream")
|
a := logrus.Fields{
|
||||||
|
"mzq-test": true,
|
||||||
|
"stream": sid,
|
||||||
|
"c": fmt.Sprintf("%+v", c),
|
||||||
|
}
|
||||||
|
logrus.WithFields(a).Errorf("ttrpc: received message on inactive stream")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
3
vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go
generated
vendored
3
vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go
generated
vendored
@ -175,6 +175,7 @@ func (conn *Conn) SetIdleTimeout(duration time.Duration) {
|
|||||||
// Open the connection and create channels for reading and writing. It returns
|
// Open the connection and create channels for reading and writing. It returns
|
||||||
// the selected subprotocol, a slice of channels and an error.
|
// the selected subprotocol, a slice of channels and an error.
|
||||||
func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) {
|
func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) {
|
||||||
|
klog.Infof("mzq-test, conn.go Open, req: %+v, body: %+v, conn: %+v, timeout: %+v\n", req, req.Body, conn, conn.timeout.String())
|
||||||
go func() {
|
go func() {
|
||||||
defer runtime.HandleCrash()
|
defer runtime.HandleCrash()
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
@ -249,7 +250,7 @@ func (conn *Conn) handle(ws *websocket.Conn) {
|
|||||||
var data []byte
|
var data []byte
|
||||||
if err := websocket.Message.Receive(ws, &data); err != nil {
|
if err := websocket.Message.Receive(ws, &data); err != nil {
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
klog.Errorf("Error on socket receive: %v", err)
|
klog.Errorf("mzq-test, Error on socket receive: %v, ws: %+v, data: %v", err, ws, string(data))
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user