diff --git a/api/events/sandbox.pb.go b/api/events/sandbox.pb.go new file mode 100644 index 000000000..08f5e70e4 --- /dev/null +++ b/api/events/sandbox.pb.go @@ -0,0 +1,316 @@ +// +//Copyright The containerd Authors. +// +//Licensed under the Apache License, Version 2.0 (the "License"); +//you may not use this file except in compliance with the License. +//You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, software +//distributed under the License is distributed on an "AS IS" BASIS, +//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//See the License for the specific language governing permissions and +//limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.20.1 +// source: github.com/containerd/containerd/api/events/sandbox.proto + +package events + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type SandboxCreate struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + SandboxID string `protobuf:"bytes,1,opt,name=sandbox_id,json=sandboxId,proto3" json:"sandbox_id,omitempty"` +} + +func (x *SandboxCreate) Reset() { + *x = SandboxCreate{} + if protoimpl.UnsafeEnabled { + mi := &file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SandboxCreate) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SandboxCreate) ProtoMessage() {} + +func (x *SandboxCreate) ProtoReflect() protoreflect.Message { + mi := &file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SandboxCreate.ProtoReflect.Descriptor instead. +func (*SandboxCreate) Descriptor() ([]byte, []int) { + return file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescGZIP(), []int{0} +} + +func (x *SandboxCreate) GetSandboxID() string { + if x != nil { + return x.SandboxID + } + return "" +} + +type SandboxStart struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + SandboxID string `protobuf:"bytes,1,opt,name=sandbox_id,json=sandboxId,proto3" json:"sandbox_id,omitempty"` +} + +func (x *SandboxStart) Reset() { + *x = SandboxStart{} + if protoimpl.UnsafeEnabled { + mi := &file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SandboxStart) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SandboxStart) ProtoMessage() {} + +func (x *SandboxStart) ProtoReflect() protoreflect.Message { + mi := &file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SandboxStart.ProtoReflect.Descriptor instead. +func (*SandboxStart) Descriptor() ([]byte, []int) { + return file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescGZIP(), []int{1} +} + +func (x *SandboxStart) GetSandboxID() string { + if x != nil { + return x.SandboxID + } + return "" +} + +type SandboxExit struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + SandboxID string `protobuf:"bytes,1,opt,name=sandbox_id,json=sandboxId,proto3" json:"sandbox_id,omitempty"` + ExitStatus uint32 `protobuf:"varint,2,opt,name=exit_status,json=exitStatus,proto3" json:"exit_status,omitempty"` + ExitedAt *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=exited_at,json=exitedAt,proto3" json:"exited_at,omitempty"` +} + +func (x *SandboxExit) Reset() { + *x = SandboxExit{} + if protoimpl.UnsafeEnabled { + mi := &file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SandboxExit) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SandboxExit) ProtoMessage() {} + +func (x *SandboxExit) ProtoReflect() protoreflect.Message { + mi := &file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SandboxExit.ProtoReflect.Descriptor instead. +func (*SandboxExit) Descriptor() ([]byte, []int) { + return file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescGZIP(), []int{2} +} + +func (x *SandboxExit) GetSandboxID() string { + if x != nil { + return x.SandboxID + } + return "" +} + +func (x *SandboxExit) GetExitStatus() uint32 { + if x != nil { + return x.ExitStatus + } + return 0 +} + +func (x *SandboxExit) GetExitedAt() *timestamppb.Timestamp { + if x != nil { + return x.ExitedAt + } + return nil +} + +var File_github_com_containerd_containerd_api_events_sandbox_proto protoreflect.FileDescriptor + +var file_github_com_containerd_containerd_api_events_sandbox_proto_rawDesc = []byte{ + 0x0a, 0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, + 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, + 0x72, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2f, 0x73, 0x61, + 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x11, 0x63, 0x6f, 0x6e, + 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x1a, 0x1f, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, + 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, + 0x2e, 0x0a, 0x0d, 0x53, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, + 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x49, 0x64, 0x22, + 0x2d, 0x0a, 0x0c, 0x53, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, + 0x1d, 0x0a, 0x0a, 0x73, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x49, 0x64, 0x22, 0x86, + 0x01, 0x0a, 0x0b, 0x53, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x45, 0x78, 0x69, 0x74, 0x12, 0x1d, + 0x0a, 0x0a, 0x73, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x09, 0x73, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x49, 0x64, 0x12, 0x1f, 0x0a, + 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x37, + 0x0a, 0x09, 0x65, 0x78, 0x69, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x65, + 0x78, 0x69, 0x74, 0x65, 0x64, 0x41, 0x74, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, + 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x3b, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescOnce sync.Once + file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescData = file_github_com_containerd_containerd_api_events_sandbox_proto_rawDesc +) + +func file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescGZIP() []byte { + file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescOnce.Do(func() { + file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescData = protoimpl.X.CompressGZIP(file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescData) + }) + return file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescData +} + +var file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_github_com_containerd_containerd_api_events_sandbox_proto_goTypes = []interface{}{ + (*SandboxCreate)(nil), // 0: containerd.events.SandboxCreate + (*SandboxStart)(nil), // 1: containerd.events.SandboxStart + (*SandboxExit)(nil), // 2: containerd.events.SandboxExit + (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp +} +var file_github_com_containerd_containerd_api_events_sandbox_proto_depIdxs = []int32{ + 3, // 0: containerd.events.SandboxExit.exited_at:type_name -> google.protobuf.Timestamp + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_github_com_containerd_containerd_api_events_sandbox_proto_init() } +func file_github_com_containerd_containerd_api_events_sandbox_proto_init() { + if File_github_com_containerd_containerd_api_events_sandbox_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SandboxCreate); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SandboxStart); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SandboxExit); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_github_com_containerd_containerd_api_events_sandbox_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_github_com_containerd_containerd_api_events_sandbox_proto_goTypes, + DependencyIndexes: file_github_com_containerd_containerd_api_events_sandbox_proto_depIdxs, + MessageInfos: file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes, + }.Build() + File_github_com_containerd_containerd_api_events_sandbox_proto = out.File + file_github_com_containerd_containerd_api_events_sandbox_proto_rawDesc = nil + file_github_com_containerd_containerd_api_events_sandbox_proto_goTypes = nil + file_github_com_containerd_containerd_api_events_sandbox_proto_depIdxs = nil +} diff --git a/api/events/sandbox.proto b/api/events/sandbox.proto new file mode 100644 index 000000000..f1c5195e5 --- /dev/null +++ b/api/events/sandbox.proto @@ -0,0 +1,37 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +syntax = "proto3"; + +package containerd.events; + +import "google/protobuf/timestamp.proto"; + +option go_package = "github.com/containerd/containerd/api/events;events"; + +message SandboxCreate { + string sandbox_id = 1; +} + +message SandboxStart { + string sandbox_id = 1; +} + +message SandboxExit { + string sandbox_id = 1; + uint32 exit_status = 2; + google.protobuf.Timestamp exited_at = 3; +} diff --git a/api/events/sandbox_fieldpath.pb.go b/api/events/sandbox_fieldpath.pb.go new file mode 100644 index 000000000..5afb99457 --- /dev/null +++ b/api/events/sandbox_fieldpath.pb.go @@ -0,0 +1,44 @@ +// Code generated by protoc-gen-go-fieldpath. DO NOT EDIT. +// source: github.com/containerd/containerd/api/events/sandbox.proto +package events + +// Field returns the value for the given fieldpath as a string, if defined. +// If the value is not defined, the second value will be false. +func (m *SandboxCreate) Field(fieldpath []string) (string, bool) { + if len(fieldpath) == 0 { + return "", false + } + switch fieldpath[0] { + case "sandbox_id": + return string(m.SandboxID), len(m.SandboxID) > 0 + } + return "", false +} + +// Field returns the value for the given fieldpath as a string, if defined. +// If the value is not defined, the second value will be false. +func (m *SandboxStart) Field(fieldpath []string) (string, bool) { + if len(fieldpath) == 0 { + return "", false + } + switch fieldpath[0] { + case "sandbox_id": + return string(m.SandboxID), len(m.SandboxID) > 0 + } + return "", false +} + +// Field returns the value for the given fieldpath as a string, if defined. +// If the value is not defined, the second value will be false. +func (m *SandboxExit) Field(fieldpath []string) (string, bool) { + if len(fieldpath) == 0 { + return "", false + } + switch fieldpath[0] { + // unhandled: exit_status + // unhandled: exited_at + case "sandbox_id": + return string(m.SandboxID), len(m.SandboxID) > 0 + } + return "", false +} diff --git a/api/next.pb.txt b/api/next.pb.txt index 536afbf24..234e075de 100644 --- a/api/next.pb.txt +++ b/api/next.pb.txt @@ -380,6 +380,91 @@ file { } syntax: "proto3" } +file { + name: "google/protobuf/timestamp.proto" + package: "google.protobuf" + message_type { + name: "Timestamp" + field { + name: "seconds" + number: 1 + label: LABEL_OPTIONAL + type: TYPE_INT64 + json_name: "seconds" + } + field { + name: "nanos" + number: 2 + label: LABEL_OPTIONAL + type: TYPE_INT32 + json_name: "nanos" + } + } + options { + java_package: "com.google.protobuf" + java_outer_classname: "TimestampProto" + java_multiple_files: true + go_package: "google.golang.org/protobuf/types/known/timestamppb" + cc_enable_arenas: true + objc_class_prefix: "GPB" + csharp_namespace: "Google.Protobuf.WellKnownTypes" + } + syntax: "proto3" +} +file { + name: "github.com/containerd/containerd/api/events/sandbox.proto" + package: "containerd.events" + dependency: "google/protobuf/timestamp.proto" + message_type { + name: "SandboxCreate" + field { + name: "sandbox_id" + number: 1 + label: LABEL_OPTIONAL + type: TYPE_STRING + json_name: "sandboxId" + } + } + message_type { + name: "SandboxStart" + field { + name: "sandbox_id" + number: 1 + label: LABEL_OPTIONAL + type: TYPE_STRING + json_name: "sandboxId" + } + } + message_type { + name: "SandboxExit" + field { + name: "sandbox_id" + number: 1 + label: LABEL_OPTIONAL + type: TYPE_STRING + json_name: "sandboxId" + } + field { + name: "exit_status" + number: 2 + label: LABEL_OPTIONAL + type: TYPE_UINT32 + json_name: "exitStatus" + } + field { + name: "exited_at" + number: 3 + label: LABEL_OPTIONAL + type: TYPE_MESSAGE + type_name: ".google.protobuf.Timestamp" + json_name: "exitedAt" + } + } + options { + go_package: "github.com/containerd/containerd/api/events;events" + } + syntax: "proto3" +} file { name: "github.com/containerd/containerd/api/events/snapshot.proto" package: "containerd.events" @@ -455,37 +540,6 @@ file { } syntax: "proto3" } -file { - name: "google/protobuf/timestamp.proto" - package: "google.protobuf" - message_type { - name: "Timestamp" - field { - name: "seconds" - number: 1 - label: LABEL_OPTIONAL - type: TYPE_INT64 - json_name: "seconds" - } - field { - name: "nanos" - number: 2 - label: LABEL_OPTIONAL - type: TYPE_INT32 - json_name: "nanos" - } - } - options { - java_package: "com.google.protobuf" - java_outer_classname: "TimestampProto" - java_multiple_files: true - go_package: "google.golang.org/protobuf/types/known/timestamppb" - cc_enable_arenas: true - objc_class_prefix: "GPB" - csharp_namespace: "Google.Protobuf.WellKnownTypes" - } - syntax: "proto3" -} file { name: "github.com/containerd/containerd/api/types/mount.proto" package: "containerd.types" diff --git a/pkg/cri/sbserver/events.go b/pkg/cri/sbserver/events.go index d5abe820b..a1291cfb1 100644 --- a/pkg/cri/sbserver/events.go +++ b/pkg/cri/sbserver/events.go @@ -120,12 +120,10 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, exitedAt = time.Now() } - e := &eventtypes.TaskExit{ - ContainerID: id, - ID: id, - Pid: pid, - ExitStatus: exitStatus, - ExitedAt: protobuf.ToTimestamp(exitedAt), + e := &eventtypes.SandboxExit{ + SandboxID: id, + ExitStatus: exitStatus, + ExitedAt: protobuf.ToTimestamp(exitedAt), } log.L.Debugf("received exit event %+v", e) @@ -135,14 +133,14 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout) defer dcancel() - sb, err := em.c.sandboxStore.Get(e.ID) + sb, err := em.c.sandboxStore.Get(e.GetSandboxID()) if err == nil { - if err := handleSandboxExit(dctx, e, sb, em.c); err != nil { + if err := handleSandboxExit(dctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil { return err } return nil } else if !errdefs.IsNotFound(err) { - return fmt.Errorf("failed to get sandbox %s: %w", e.ID, err) + return fmt.Errorf("failed to get sandbox %s: %w", e.SandboxID, err) } return nil }() @@ -218,6 +216,8 @@ func convertEvent(e typeurl.Any) (string, interface{}, error) { switch e := evt.(type) { case *eventtypes.TaskOOM: id = e.ContainerID + case *eventtypes.SandboxExit: + id = e.SandboxID case *eventtypes.ImageCreate: id = e.Name case *eventtypes.ImageUpdate: @@ -323,7 +323,19 @@ func (em *eventMonitor) handleEvent(any interface{}) error { } sb, err := em.c.sandboxStore.Get(e.ID) if err == nil { - if err := handleSandboxExit(ctx, e, sb, em.c); err != nil { + if err := handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil { + return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err) + } + return nil + } else if !errdefs.IsNotFound(err) { + return fmt.Errorf("can't find sandbox for TaskExit event: %w", err) + } + return nil + case *eventtypes.SandboxExit: + log.L.Infof("SandboxExit event %+v", e) + sb, err := em.c.sandboxStore.Get(e.GetSandboxID()) + if err == nil { + if err := handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil { return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err) } return nil @@ -416,30 +428,13 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta return nil } -// handleSandboxExit handles TaskExit event for sandbox. -func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox, c *criService) error { - // TODO: Move pause container cleanup to podsandbox/ package. - if sb.Container != nil { - // No stream attached to sandbox container. - task, err := sb.Container.Task(ctx, nil) - if err != nil { - if !errdefs.IsNotFound(err) { - return fmt.Errorf("failed to load task for sandbox: %w", err) - } - } else { - // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker - if _, err = task.Delete(ctx, containerd.WithProcessKill); err != nil { - if !errdefs.IsNotFound(err) { - return fmt.Errorf("failed to stop sandbox: %w", err) - } - // Move on to make sure container status is updated. - } - } - } - +// handleSandboxExit handles sandbox exit event. +func handleSandboxExit(ctx context.Context, sb sandboxstore.Sandbox, exitStatus uint32, exitTime time.Time, c *criService) error { if err := sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { status.State = sandboxstore.StateNotReady status.Pid = 0 + status.ExitStatus = exitStatus + status.ExitedAt = exitTime return status, nil }); err != nil { return fmt.Errorf("failed to update sandbox state: %w", err) diff --git a/pkg/cri/sbserver/helpers.go b/pkg/cri/sbserver/helpers.go index fefb1646c..37cdb019f 100644 --- a/pkg/cri/sbserver/helpers.go +++ b/pkg/cri/sbserver/helpers.go @@ -41,7 +41,6 @@ import ( criconfig "github.com/containerd/containerd/pkg/cri/config" containerstore "github.com/containerd/containerd/pkg/cri/store/container" imagestore "github.com/containerd/containerd/pkg/cri/store/image" - sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" runtimeoptions "github.com/containerd/containerd/pkg/runtimeoptions/v1" "github.com/containerd/containerd/plugin" runcoptions "github.com/containerd/containerd/runtime/v2/runc/options" @@ -81,8 +80,6 @@ const ( // containerKindContainer is a label value indicating container is application container containerKindContainer = "container" - // sandboxMetadataExtension is an extension name that identify metadata of sandbox in CreateContainerRequest - sandboxMetadataExtension = criContainerdPrefix + ".sandbox.metadata" // containerMetadataExtension is an extension name that identify metadata of container in CreateContainerRequest containerMetadataExtension = criContainerdPrefix + ".container.metadata" @@ -334,13 +331,6 @@ func unknownContainerStatus() containerstore.Status { } } -// unknownSandboxStatus returns the default sandbox status when its status is unknown. -func unknownSandboxStatus() sandboxstore.Status { - return sandboxstore.Status{ - State: sandboxstore.StateUnknown, - } -} - // getPassthroughAnnotations filters requested pod annotations by comparing // against permitted annotations for the given runtime. func getPassthroughAnnotations(podAnnotations map[string]string, diff --git a/pkg/cri/sbserver/podsandbox/controller.go b/pkg/cri/sbserver/podsandbox/controller.go index 0be73ffe5..68ac86029 100644 --- a/pkg/cri/sbserver/podsandbox/controller.go +++ b/pkg/cri/sbserver/podsandbox/controller.go @@ -43,10 +43,6 @@ import ( type CRIService interface { // TODO: we should implement Event backoff in Controller. BackOffEvent(id string, event interface{}) - - // TODO: refactor event generator for PLEG. - // GenerateAndSendContainerEvent is called by controller for sandbox container events. - GenerateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType) } // ImageService specifies dependencies to CRI image service. diff --git a/pkg/cri/sbserver/podsandbox/recover.go b/pkg/cri/sbserver/podsandbox/recover.go new file mode 100644 index 000000000..eb12cf665 --- /dev/null +++ b/pkg/cri/sbserver/podsandbox/recover.go @@ -0,0 +1,174 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package podsandbox + +import ( + "context" + "fmt" + goruntime "runtime" + "time" + + "github.com/containerd/containerd/pkg/netns" + "github.com/containerd/typeurl/v2" + runtime "k8s.io/cri-api/pkg/apis/runtime/v1" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" + ctrdutil "github.com/containerd/containerd/pkg/cri/util" +) + +// loadContainerTimeout is the default timeout for loading a container/sandbox. +// One container/sandbox hangs (e.g. containerd#2438) should not affect other +// containers/sandboxes. +// Most CRI container/sandbox related operations are per container, the ones +// which handle multiple containers at a time are: +// * ListPodSandboxes: Don't talk with containerd services. +// * ListContainers: Don't talk with containerd services. +// * ListContainerStats: Not in critical code path, a default timeout will +// be applied at CRI level. +// * Recovery logic: We should set a time for each container/sandbox recovery. +// * Event monitor: We should set a timeout for each container/sandbox event handling. +const loadContainerTimeout = 10 * time.Second + +func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) { + ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout) + defer cancel() + var sandbox sandboxstore.Sandbox + // Load sandbox metadata. + exts, err := cntr.Extensions(ctx) + if err != nil { + return sandbox, fmt.Errorf("failed to get sandbox container extensions: %w", err) + } + ext, ok := exts[sandboxMetadataExtension] + if !ok { + return sandbox, fmt.Errorf("metadata extension %q not found", sandboxMetadataExtension) + } + data, err := typeurl.UnmarshalAny(ext) + if err != nil { + return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %w", ext, err) + } + meta := data.(*sandboxstore.Metadata) + + s, err := func() (sandboxstore.Status, error) { + status := sandboxstore.Status{ + State: sandboxstore.StateUnknown, + } + // Load sandbox created timestamp. + info, err := cntr.Info(ctx) + if err != nil { + return status, fmt.Errorf("failed to get sandbox container info: %w", err) + } + status.CreatedAt = info.CreatedAt + + // Load sandbox state. + t, err := cntr.Task(ctx, nil) + if err != nil && !errdefs.IsNotFound(err) { + return status, fmt.Errorf("failed to load task: %w", err) + } + var taskStatus containerd.Status + var notFound bool + if errdefs.IsNotFound(err) { + // Task is not found. + notFound = true + } else { + // Task is found. Get task status. + taskStatus, err = t.Status(ctx) + if err != nil { + // It's still possible that task is deleted during this window. + if !errdefs.IsNotFound(err) { + return status, fmt.Errorf("failed to get task status: %w", err) + } + notFound = true + } + } + if notFound { + // Task does not exist, set sandbox state as NOTREADY. + status.State = sandboxstore.StateNotReady + } else { + if taskStatus.Status == containerd.Running { + // Wait for the task for sandbox monitor. + // wait is a long running background request, no timeout needed. + exitCh, err := t.Wait(ctrdutil.NamespacedContext()) + if err != nil { + if !errdefs.IsNotFound(err) { + return status, fmt.Errorf("failed to wait for task: %w", err) + } + status.State = sandboxstore.StateNotReady + } else { + // Task is running, set sandbox state as READY. + status.State = sandboxstore.StateReady + status.Pid = t.Pid() + + go func() { + c.waitSandboxExit(context.Background(), meta.ID, exitCh) + }() + } + } else { + // Task is not running. Delete the task and set sandbox state as NOTREADY. + if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { + return status, fmt.Errorf("failed to delete task: %w", err) + } + status.State = sandboxstore.StateNotReady + } + } + return status, nil + }() + if err != nil { + log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID()) + } + + sandbox = sandboxstore.NewSandbox(*meta, s) + sandbox.Container = cntr + + // Load network namespace. + sandbox.NetNS = getNetNS(meta) + + // It doesn't matter whether task is running or not. If it is running, sandbox + // status will be `READY`; if it is not running, sandbox status will be `NOT_READY`, + // kubelet will stop the sandbox which will properly cleanup everything. + return sandbox, nil +} + +func getNetNS(meta *sandboxstore.Metadata) *netns.NetNS { + // Don't need to load netns for host network sandbox. + if hostNetwork(meta.Config) { + return nil + } + return netns.LoadNetNS(meta.NetNSPath) +} + +// hostNetwork handles checking if host networking was requested. +// TODO: Copy pasted from sbserver to handle container sandbox events in podsandbox/ package, needs refactoring. +func hostNetwork(config *runtime.PodSandboxConfig) bool { + var hostNet bool + switch goruntime.GOOS { + case "windows": + // Windows HostProcess pods can only run on the host network + hostNet = config.GetWindows().GetSecurityContext().GetHostProcess() + case "darwin": + // No CNI on Darwin yet. + hostNet = true + default: + // Even on other platforms, the logic containerd uses is to check if NamespaceMode == NODE. + // So this handles Linux, as well as any other platforms not governed by the cases above + // that have special quirks. + hostNet = config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtime.NamespaceMode_NODE + } + return hostNet +} diff --git a/pkg/cri/sbserver/podsandbox/sandbox_delete.go b/pkg/cri/sbserver/podsandbox/sandbox_delete.go index 678362053..4e388cd01 100644 --- a/pkg/cri/sbserver/podsandbox/sandbox_delete.go +++ b/pkg/cri/sbserver/podsandbox/sandbox_delete.go @@ -20,8 +20,6 @@ import ( "context" "fmt" - runtime "k8s.io/cri-api/pkg/apis/runtime/v1" - "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/log" @@ -59,10 +57,5 @@ func (c *Controller) Shutdown(ctx context.Context, sandboxID string) error { } } - // Send CONTAINER_DELETED event with ContainerId equal to SandboxId. - c.cri.GenerateAndSendContainerEvent(ctx, sandboxID, sandboxID, runtime.ContainerEventType_CONTAINER_DELETED_EVENT) - - c.sandboxStore.Delete(sandboxID) - return nil } diff --git a/pkg/cri/sbserver/podsandbox/sandbox_stop.go b/pkg/cri/sbserver/podsandbox/sandbox_stop.go index 7aab32b32..51d4eb228 100644 --- a/pkg/cri/sbserver/podsandbox/sandbox_stop.go +++ b/pkg/cri/sbserver/podsandbox/sandbox_stop.go @@ -93,12 +93,10 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, sandbox sandboxst defer close(stopCh) exitStatus, exitedAt, err := c.waitSandboxExit(exitCtx, id, exitCh) if err != nil && err != context.Canceled && err != context.DeadlineExceeded { - e := &eventtypes.TaskExit{ - ContainerID: id, - ID: id, - Pid: task.Pid(), - ExitStatus: exitStatus, - ExitedAt: protobuf.ToTimestamp(exitedAt), + e := &eventtypes.SandboxExit{ + SandboxID: id, + ExitStatus: exitStatus, + ExitedAt: protobuf.ToTimestamp(exitedAt), } logrus.WithError(err).Errorf("Failed to wait sandbox exit %+v", e) // TODO: how to backoff diff --git a/pkg/cri/sbserver/restart.go b/pkg/cri/sbserver/restart.go index 7c88818ba..070e405be 100644 --- a/pkg/cri/sbserver/restart.go +++ b/pkg/cri/sbserver/restart.go @@ -31,6 +31,7 @@ import ( "github.com/containerd/containerd/log" criconfig "github.com/containerd/containerd/pkg/cri/config" "github.com/containerd/containerd/pkg/cri/sbserver/podsandbox" + "github.com/containerd/containerd/pkg/netns" "github.com/containerd/containerd/platforms" "github.com/containerd/typeurl/v2" "golang.org/x/sync/errgroup" @@ -40,7 +41,6 @@ import ( containerstore "github.com/containerd/containerd/pkg/cri/store/container" sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" ctrdutil "github.com/containerd/containerd/pkg/cri/util" - "github.com/containerd/containerd/pkg/netns" ) // NOTE: The recovery logic has following assumption: when the cri plugin is down: @@ -60,11 +60,21 @@ func (c *criService) recover(ctx context.Context) error { return fmt.Errorf("failed to list sandbox containers: %w", err) } + podSandboxController, ok := c.sandboxControllers[criconfig.ModePodSandbox] + if !ok { + log.G(ctx).Fatal("unable to restore pod sandboxes, no controller found") + } + + podSandboxLoader, ok := podSandboxController.(podSandboxRecover) + if !ok { + log.G(ctx).Fatal("pod sandbox controller doesn't support recovery") + } + eg, ctx2 := errgroup.WithContext(ctx) for _, sandbox := range sandboxes { sandbox := sandbox eg.Go(func() error { - sb, err := c.loadSandbox(ctx2, sandbox) + sb, err := podSandboxLoader.RecoverContainer(ctx2, sandbox) if err != nil { log.G(ctx2).WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID()) return nil @@ -388,99 +398,10 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe return containerstore.NewContainer(*meta, opts...) } -// loadSandbox loads sandbox from containerd. -func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) { - ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout) - defer cancel() - var sandbox sandboxstore.Sandbox - // Load sandbox metadata. - exts, err := cntr.Extensions(ctx) - if err != nil { - return sandbox, fmt.Errorf("failed to get sandbox container extensions: %w", err) - } - ext, ok := exts[sandboxMetadataExtension] - if !ok { - return sandbox, fmt.Errorf("metadata extension %q not found", sandboxMetadataExtension) - } - data, err := typeurl.UnmarshalAny(ext) - if err != nil { - return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %w", ext, err) - } - meta := data.(*sandboxstore.Metadata) - - s, err := func() (sandboxstore.Status, error) { - status := unknownSandboxStatus() - // Load sandbox created timestamp. - info, err := cntr.Info(ctx) - if err != nil { - return status, fmt.Errorf("failed to get sandbox container info: %w", err) - } - status.CreatedAt = info.CreatedAt - - // Load sandbox state. - t, err := cntr.Task(ctx, nil) - if err != nil && !errdefs.IsNotFound(err) { - return status, fmt.Errorf("failed to load task: %w", err) - } - var taskStatus containerd.Status - var notFound bool - if errdefs.IsNotFound(err) { - // Task is not found. - notFound = true - } else { - // Task is found. Get task status. - taskStatus, err = t.Status(ctx) - if err != nil { - // It's still possible that task is deleted during this window. - if !errdefs.IsNotFound(err) { - return status, fmt.Errorf("failed to get task status: %w", err) - } - notFound = true - } - } - if notFound { - // Task does not exist, set sandbox state as NOTREADY. - status.State = sandboxstore.StateNotReady - } else { - if taskStatus.Status == containerd.Running { - // Wait for the task for sandbox monitor. - // wait is a long running background request, no timeout needed. - exitCh, err := t.Wait(ctrdutil.NamespacedContext()) - if err != nil { - if !errdefs.IsNotFound(err) { - return status, fmt.Errorf("failed to wait for task: %w", err) - } - status.State = sandboxstore.StateNotReady - } else { - // Task is running, set sandbox state as READY. - status.State = sandboxstore.StateReady - status.Pid = t.Pid() - c.eventMonitor.startSandboxExitMonitor(context.Background(), meta.ID, status.Pid, exitCh) - } - } else { - // Task is not running. Delete the task and set sandbox state as NOTREADY. - if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { - return status, fmt.Errorf("failed to delete task: %w", err) - } - status.State = sandboxstore.StateNotReady - } - } - return status, nil - }() - if err != nil { - log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID()) - } - - sandbox = sandboxstore.NewSandbox(*meta, s) - sandbox.Container = cntr - - // Load network namespace. - sandbox.NetNS = getNetNS(meta) - - // It doesn't matter whether task is running or not. If it is running, sandbox - // status will be `READY`; if it is not running, sandbox status will be `NOT_READY`, - // kubelet will stop the sandbox which will properly cleanup everything. - return sandbox, nil +// podSandboxRecover is an additional interface implemented by podsandbox/ controller to handle +// Pod sandbox containers recovery. +type podSandboxRecover interface { + RecoverContainer(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) } func getNetNS(meta *sandboxstore.Metadata) *netns.NetNS { diff --git a/pkg/cri/sbserver/sandbox_remove.go b/pkg/cri/sbserver/sandbox_remove.go index 80b642151..41382ad0a 100644 --- a/pkg/cri/sbserver/sandbox_remove.go +++ b/pkg/cri/sbserver/sandbox_remove.go @@ -89,6 +89,9 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS return nil, fmt.Errorf("failed to delete sandbox %q: %w", id, err) } + // Send CONTAINER_DELETED event with ContainerId equal to SandboxId. + c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_DELETED_EVENT) + err = c.nri.RemovePodSandbox(ctx, &sandbox) if err != nil { log.G(ctx).WithError(err).Errorf("NRI pod removal notification failed") diff --git a/pkg/cri/sbserver/service.go b/pkg/cri/sbserver/service.go index 09f7243dc..27bf6ac45 100644 --- a/pkg/cri/sbserver/service.go +++ b/pkg/cri/sbserver/service.go @@ -215,12 +215,6 @@ func (c *criService) BackOffEvent(id string, event interface{}) { c.eventMonitor.backOff.enBackOff(id, event) } -// GenerateAndSendContainerEvent is a temporary workaround to send PLEG events from podsandbopx/ controller -// TODO: refactor PLEG event generator so both podsandbox/ controller and CRI service can access it. -func (c *criService) GenerateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType) { - c.generateAndSendContainerEvent(ctx, containerID, sandboxID, eventType) -} - // Register registers all required services onto a specific grpc server. // This is used by containerd cri plugin. func (c *criService) Register(s *grpc.Server) error { diff --git a/services/sandbox/controller_service.go b/services/sandbox/controller_service.go index 5fc9ae0d1..805333d2c 100644 --- a/services/sandbox/controller_service.go +++ b/services/sandbox/controller_service.go @@ -19,14 +19,17 @@ package sandbox import ( "context" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/anypb" + + eventtypes "github.com/containerd/containerd/api/events" api "github.com/containerd/containerd/api/services/sandbox/v1" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/protobuf" "github.com/containerd/containerd/sandbox" - "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/anypb" ) func init() { @@ -35,6 +38,7 @@ func init() { ID: "sandbox-controllers", Requires: []plugin.Type{ plugin.SandboxControllerPlugin, + plugin.EventPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { sc, err := ic.GetByID(plugin.SandboxControllerPlugin, "local") @@ -42,15 +46,22 @@ func init() { return nil, err } + ep, err := ic.Get(plugin.EventPlugin) + if err != nil { + return nil, err + } + return &controllerService{ - local: sc.(sandbox.Controller), + local: sc.(sandbox.Controller), + publisher: ep.(events.Publisher), }, nil }, }) } type controllerService struct { - local sandbox.Controller + local sandbox.Controller + publisher events.Publisher api.UnimplementedControllerServer } @@ -68,6 +79,13 @@ func (s *controllerService) Create(ctx context.Context, req *api.ControllerCreat if err != nil { return &api.ControllerCreateResponse{}, errdefs.ToGRPC(err) } + + if err := s.publisher.Publish(ctx, "sandboxes/create", &eventtypes.SandboxCreate{ + SandboxID: req.GetSandboxID(), + }); err != nil { + return &api.ControllerCreateResponse{}, errdefs.ToGRPC(err) + } + return &api.ControllerCreateResponse{ SandboxID: req.GetSandboxID(), }, nil @@ -79,6 +97,13 @@ func (s *controllerService) Start(ctx context.Context, req *api.ControllerStartR if err != nil { return &api.ControllerStartResponse{}, errdefs.ToGRPC(err) } + + if err := s.publisher.Publish(ctx, "sandboxes/start", &eventtypes.SandboxStart{ + SandboxID: req.GetSandboxID(), + }); err != nil { + return &api.ControllerStartResponse{}, errdefs.ToGRPC(err) + } + return &api.ControllerStartResponse{ SandboxID: inst.SandboxID, Pid: inst.Pid, @@ -98,6 +123,15 @@ func (s *controllerService) Wait(ctx context.Context, req *api.ControllerWaitReq if err != nil { return &api.ControllerWaitResponse{}, errdefs.ToGRPC(err) } + + if err := s.publisher.Publish(ctx, "sandboxes/exit", &eventtypes.SandboxExit{ + SandboxID: req.GetSandboxID(), + ExitStatus: exitStatus.ExitStatus, + ExitedAt: protobuf.ToTimestamp(exitStatus.ExitedAt), + }); err != nil { + return &api.ControllerWaitResponse{}, errdefs.ToGRPC(err) + } + return &api.ControllerWaitResponse{ ExitStatus: exitStatus.ExitStatus, ExitedAt: protobuf.ToTimestamp(exitStatus.ExitedAt),