Merge pull request #8602 from mxpv/sbevents

Publish sandbox events
This commit is contained in:
Phil Estes 2023-05-31 09:14:08 -04:00 committed by GitHub
commit 80eb76332e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 743 additions and 194 deletions

316
api/events/sandbox.pb.go Normal file
View File

@ -0,0 +1,316 @@
//
//Copyright The containerd Authors.
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.20.1
// source: github.com/containerd/containerd/api/events/sandbox.proto
package events
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type SandboxCreate struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
SandboxID string `protobuf:"bytes,1,opt,name=sandbox_id,json=sandboxId,proto3" json:"sandbox_id,omitempty"`
}
func (x *SandboxCreate) Reset() {
*x = SandboxCreate{}
if protoimpl.UnsafeEnabled {
mi := &file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SandboxCreate) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SandboxCreate) ProtoMessage() {}
func (x *SandboxCreate) ProtoReflect() protoreflect.Message {
mi := &file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SandboxCreate.ProtoReflect.Descriptor instead.
func (*SandboxCreate) Descriptor() ([]byte, []int) {
return file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescGZIP(), []int{0}
}
func (x *SandboxCreate) GetSandboxID() string {
if x != nil {
return x.SandboxID
}
return ""
}
type SandboxStart struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
SandboxID string `protobuf:"bytes,1,opt,name=sandbox_id,json=sandboxId,proto3" json:"sandbox_id,omitempty"`
}
func (x *SandboxStart) Reset() {
*x = SandboxStart{}
if protoimpl.UnsafeEnabled {
mi := &file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SandboxStart) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SandboxStart) ProtoMessage() {}
func (x *SandboxStart) ProtoReflect() protoreflect.Message {
mi := &file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SandboxStart.ProtoReflect.Descriptor instead.
func (*SandboxStart) Descriptor() ([]byte, []int) {
return file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescGZIP(), []int{1}
}
func (x *SandboxStart) GetSandboxID() string {
if x != nil {
return x.SandboxID
}
return ""
}
type SandboxExit struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
SandboxID string `protobuf:"bytes,1,opt,name=sandbox_id,json=sandboxId,proto3" json:"sandbox_id,omitempty"`
ExitStatus uint32 `protobuf:"varint,2,opt,name=exit_status,json=exitStatus,proto3" json:"exit_status,omitempty"`
ExitedAt *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=exited_at,json=exitedAt,proto3" json:"exited_at,omitempty"`
}
func (x *SandboxExit) Reset() {
*x = SandboxExit{}
if protoimpl.UnsafeEnabled {
mi := &file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SandboxExit) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SandboxExit) ProtoMessage() {}
func (x *SandboxExit) ProtoReflect() protoreflect.Message {
mi := &file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SandboxExit.ProtoReflect.Descriptor instead.
func (*SandboxExit) Descriptor() ([]byte, []int) {
return file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescGZIP(), []int{2}
}
func (x *SandboxExit) GetSandboxID() string {
if x != nil {
return x.SandboxID
}
return ""
}
func (x *SandboxExit) GetExitStatus() uint32 {
if x != nil {
return x.ExitStatus
}
return 0
}
func (x *SandboxExit) GetExitedAt() *timestamppb.Timestamp {
if x != nil {
return x.ExitedAt
}
return nil
}
var File_github_com_containerd_containerd_api_events_sandbox_proto protoreflect.FileDescriptor
var file_github_com_containerd_containerd_api_events_sandbox_proto_rawDesc = []byte{
0x0a, 0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e,
0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65,
0x72, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2f, 0x73, 0x61,
0x6e, 0x64, 0x62, 0x6f, 0x78, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x11, 0x63, 0x6f, 0x6e,
0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x1a, 0x1f,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f,
0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22,
0x2e, 0x0a, 0x0d, 0x53, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65,
0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x5f, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x49, 0x64, 0x22,
0x2d, 0x0a, 0x0c, 0x53, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12,
0x1d, 0x0a, 0x0a, 0x73, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x49, 0x64, 0x22, 0x86,
0x01, 0x0a, 0x0b, 0x53, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x45, 0x78, 0x69, 0x74, 0x12, 0x1d,
0x0a, 0x0a, 0x73, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x09, 0x73, 0x61, 0x6e, 0x64, 0x62, 0x6f, 0x78, 0x49, 0x64, 0x12, 0x1f, 0x0a,
0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01,
0x28, 0x0d, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x37,
0x0a, 0x09, 0x65, 0x78, 0x69, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x65,
0x78, 0x69, 0x74, 0x65, 0x64, 0x41, 0x74, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75,
0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64,
0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f,
0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x3b, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescOnce sync.Once
file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescData = file_github_com_containerd_containerd_api_events_sandbox_proto_rawDesc
)
func file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescGZIP() []byte {
file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescOnce.Do(func() {
file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescData = protoimpl.X.CompressGZIP(file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescData)
})
return file_github_com_containerd_containerd_api_events_sandbox_proto_rawDescData
}
var file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_github_com_containerd_containerd_api_events_sandbox_proto_goTypes = []interface{}{
(*SandboxCreate)(nil), // 0: containerd.events.SandboxCreate
(*SandboxStart)(nil), // 1: containerd.events.SandboxStart
(*SandboxExit)(nil), // 2: containerd.events.SandboxExit
(*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp
}
var file_github_com_containerd_containerd_api_events_sandbox_proto_depIdxs = []int32{
3, // 0: containerd.events.SandboxExit.exited_at:type_name -> google.protobuf.Timestamp
1, // [1:1] is the sub-list for method output_type
1, // [1:1] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_github_com_containerd_containerd_api_events_sandbox_proto_init() }
func file_github_com_containerd_containerd_api_events_sandbox_proto_init() {
if File_github_com_containerd_containerd_api_events_sandbox_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SandboxCreate); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SandboxStart); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SandboxExit); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_github_com_containerd_containerd_api_events_sandbox_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_github_com_containerd_containerd_api_events_sandbox_proto_goTypes,
DependencyIndexes: file_github_com_containerd_containerd_api_events_sandbox_proto_depIdxs,
MessageInfos: file_github_com_containerd_containerd_api_events_sandbox_proto_msgTypes,
}.Build()
File_github_com_containerd_containerd_api_events_sandbox_proto = out.File
file_github_com_containerd_containerd_api_events_sandbox_proto_rawDesc = nil
file_github_com_containerd_containerd_api_events_sandbox_proto_goTypes = nil
file_github_com_containerd_containerd_api_events_sandbox_proto_depIdxs = nil
}

37
api/events/sandbox.proto Normal file
View File

@ -0,0 +1,37 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
syntax = "proto3";
package containerd.events;
import "google/protobuf/timestamp.proto";
option go_package = "github.com/containerd/containerd/api/events;events";
message SandboxCreate {
string sandbox_id = 1;
}
message SandboxStart {
string sandbox_id = 1;
}
message SandboxExit {
string sandbox_id = 1;
uint32 exit_status = 2;
google.protobuf.Timestamp exited_at = 3;
}

View File

@ -0,0 +1,44 @@
// Code generated by protoc-gen-go-fieldpath. DO NOT EDIT.
// source: github.com/containerd/containerd/api/events/sandbox.proto
package events
// Field returns the value for the given fieldpath as a string, if defined.
// If the value is not defined, the second value will be false.
func (m *SandboxCreate) Field(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
case "sandbox_id":
return string(m.SandboxID), len(m.SandboxID) > 0
}
return "", false
}
// Field returns the value for the given fieldpath as a string, if defined.
// If the value is not defined, the second value will be false.
func (m *SandboxStart) Field(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
case "sandbox_id":
return string(m.SandboxID), len(m.SandboxID) > 0
}
return "", false
}
// Field returns the value for the given fieldpath as a string, if defined.
// If the value is not defined, the second value will be false.
func (m *SandboxExit) Field(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
// unhandled: exit_status
// unhandled: exited_at
case "sandbox_id":
return string(m.SandboxID), len(m.SandboxID) > 0
}
return "", false
}

View File

@ -380,6 +380,91 @@ file {
}
syntax: "proto3"
}
file {
name: "google/protobuf/timestamp.proto"
package: "google.protobuf"
message_type {
name: "Timestamp"
field {
name: "seconds"
number: 1
label: LABEL_OPTIONAL
type: TYPE_INT64
json_name: "seconds"
}
field {
name: "nanos"
number: 2
label: LABEL_OPTIONAL
type: TYPE_INT32
json_name: "nanos"
}
}
options {
java_package: "com.google.protobuf"
java_outer_classname: "TimestampProto"
java_multiple_files: true
go_package: "google.golang.org/protobuf/types/known/timestamppb"
cc_enable_arenas: true
objc_class_prefix: "GPB"
csharp_namespace: "Google.Protobuf.WellKnownTypes"
}
syntax: "proto3"
}
file {
name: "github.com/containerd/containerd/api/events/sandbox.proto"
package: "containerd.events"
dependency: "google/protobuf/timestamp.proto"
message_type {
name: "SandboxCreate"
field {
name: "sandbox_id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "sandboxId"
}
}
message_type {
name: "SandboxStart"
field {
name: "sandbox_id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "sandboxId"
}
}
message_type {
name: "SandboxExit"
field {
name: "sandbox_id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "sandboxId"
}
field {
name: "exit_status"
number: 2
label: LABEL_OPTIONAL
type: TYPE_UINT32
json_name: "exitStatus"
}
field {
name: "exited_at"
number: 3
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".google.protobuf.Timestamp"
json_name: "exitedAt"
}
}
options {
go_package: "github.com/containerd/containerd/api/events;events"
}
syntax: "proto3"
}
file {
name: "github.com/containerd/containerd/api/events/snapshot.proto"
package: "containerd.events"
@ -455,37 +540,6 @@ file {
}
syntax: "proto3"
}
file {
name: "google/protobuf/timestamp.proto"
package: "google.protobuf"
message_type {
name: "Timestamp"
field {
name: "seconds"
number: 1
label: LABEL_OPTIONAL
type: TYPE_INT64
json_name: "seconds"
}
field {
name: "nanos"
number: 2
label: LABEL_OPTIONAL
type: TYPE_INT32
json_name: "nanos"
}
}
options {
java_package: "com.google.protobuf"
java_outer_classname: "TimestampProto"
java_multiple_files: true
go_package: "google.golang.org/protobuf/types/known/timestamppb"
cc_enable_arenas: true
objc_class_prefix: "GPB"
csharp_namespace: "Google.Protobuf.WellKnownTypes"
}
syntax: "proto3"
}
file {
name: "github.com/containerd/containerd/api/types/mount.proto"
package: "containerd.types"

View File

@ -120,12 +120,10 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string,
exitedAt = time.Now()
}
e := &eventtypes.TaskExit{
ContainerID: id,
ID: id,
Pid: pid,
ExitStatus: exitStatus,
ExitedAt: protobuf.ToTimestamp(exitedAt),
e := &eventtypes.SandboxExit{
SandboxID: id,
ExitStatus: exitStatus,
ExitedAt: protobuf.ToTimestamp(exitedAt),
}
log.L.Debugf("received exit event %+v", e)
@ -135,14 +133,14 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string,
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
defer dcancel()
sb, err := em.c.sandboxStore.Get(e.ID)
sb, err := em.c.sandboxStore.Get(e.GetSandboxID())
if err == nil {
if err := handleSandboxExit(dctx, e, sb, em.c); err != nil {
if err := handleSandboxExit(dctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil {
return err
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to get sandbox %s: %w", e.ID, err)
return fmt.Errorf("failed to get sandbox %s: %w", e.SandboxID, err)
}
return nil
}()
@ -218,6 +216,8 @@ func convertEvent(e typeurl.Any) (string, interface{}, error) {
switch e := evt.(type) {
case *eventtypes.TaskOOM:
id = e.ContainerID
case *eventtypes.SandboxExit:
id = e.SandboxID
case *eventtypes.ImageCreate:
id = e.Name
case *eventtypes.ImageUpdate:
@ -323,7 +323,19 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
}
sb, err := em.c.sandboxStore.Get(e.ID)
if err == nil {
if err := handleSandboxExit(ctx, e, sb, em.c); err != nil {
if err := handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil {
return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err)
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find sandbox for TaskExit event: %w", err)
}
return nil
case *eventtypes.SandboxExit:
log.L.Infof("SandboxExit event %+v", e)
sb, err := em.c.sandboxStore.Get(e.GetSandboxID())
if err == nil {
if err := handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil {
return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err)
}
return nil
@ -416,30 +428,13 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
return nil
}
// handleSandboxExit handles TaskExit event for sandbox.
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox, c *criService) error {
// TODO: Move pause container cleanup to podsandbox/ package.
if sb.Container != nil {
// No stream attached to sandbox container.
task, err := sb.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to load task for sandbox: %w", err)
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx, containerd.WithProcessKill); err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop sandbox: %w", err)
}
// Move on to make sure container status is updated.
}
}
}
// handleSandboxExit handles sandbox exit event.
func handleSandboxExit(ctx context.Context, sb sandboxstore.Sandbox, exitStatus uint32, exitTime time.Time, c *criService) error {
if err := sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
status.State = sandboxstore.StateNotReady
status.Pid = 0
status.ExitStatus = exitStatus
status.ExitedAt = exitTime
return status, nil
}); err != nil {
return fmt.Errorf("failed to update sandbox state: %w", err)

View File

@ -41,7 +41,6 @@ import (
criconfig "github.com/containerd/containerd/pkg/cri/config"
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
imagestore "github.com/containerd/containerd/pkg/cri/store/image"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
runtimeoptions "github.com/containerd/containerd/pkg/runtimeoptions/v1"
"github.com/containerd/containerd/plugin"
runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
@ -81,8 +80,6 @@ const (
// containerKindContainer is a label value indicating container is application container
containerKindContainer = "container"
// sandboxMetadataExtension is an extension name that identify metadata of sandbox in CreateContainerRequest
sandboxMetadataExtension = criContainerdPrefix + ".sandbox.metadata"
// containerMetadataExtension is an extension name that identify metadata of container in CreateContainerRequest
containerMetadataExtension = criContainerdPrefix + ".container.metadata"
@ -334,13 +331,6 @@ func unknownContainerStatus() containerstore.Status {
}
}
// unknownSandboxStatus returns the default sandbox status when its status is unknown.
func unknownSandboxStatus() sandboxstore.Status {
return sandboxstore.Status{
State: sandboxstore.StateUnknown,
}
}
// getPassthroughAnnotations filters requested pod annotations by comparing
// against permitted annotations for the given runtime.
func getPassthroughAnnotations(podAnnotations map[string]string,

View File

@ -43,10 +43,6 @@ import (
type CRIService interface {
// TODO: we should implement Event backoff in Controller.
BackOffEvent(id string, event interface{})
// TODO: refactor event generator for PLEG.
// GenerateAndSendContainerEvent is called by controller for sandbox container events.
GenerateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType)
}
// ImageService specifies dependencies to CRI image service.

View File

@ -0,0 +1,174 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podsandbox
import (
"context"
"fmt"
goruntime "runtime"
"time"
"github.com/containerd/containerd/pkg/netns"
"github.com/containerd/typeurl/v2"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
)
// loadContainerTimeout is the default timeout for loading a container/sandbox.
// One container/sandbox hangs (e.g. containerd#2438) should not affect other
// containers/sandboxes.
// Most CRI container/sandbox related operations are per container, the ones
// which handle multiple containers at a time are:
// * ListPodSandboxes: Don't talk with containerd services.
// * ListContainers: Don't talk with containerd services.
// * ListContainerStats: Not in critical code path, a default timeout will
// be applied at CRI level.
// * Recovery logic: We should set a time for each container/sandbox recovery.
// * Event monitor: We should set a timeout for each container/sandbox event handling.
const loadContainerTimeout = 10 * time.Second
func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) {
ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout)
defer cancel()
var sandbox sandboxstore.Sandbox
// Load sandbox metadata.
exts, err := cntr.Extensions(ctx)
if err != nil {
return sandbox, fmt.Errorf("failed to get sandbox container extensions: %w", err)
}
ext, ok := exts[sandboxMetadataExtension]
if !ok {
return sandbox, fmt.Errorf("metadata extension %q not found", sandboxMetadataExtension)
}
data, err := typeurl.UnmarshalAny(ext)
if err != nil {
return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %w", ext, err)
}
meta := data.(*sandboxstore.Metadata)
s, err := func() (sandboxstore.Status, error) {
status := sandboxstore.Status{
State: sandboxstore.StateUnknown,
}
// Load sandbox created timestamp.
info, err := cntr.Info(ctx)
if err != nil {
return status, fmt.Errorf("failed to get sandbox container info: %w", err)
}
status.CreatedAt = info.CreatedAt
// Load sandbox state.
t, err := cntr.Task(ctx, nil)
if err != nil && !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to load task: %w", err)
}
var taskStatus containerd.Status
var notFound bool
if errdefs.IsNotFound(err) {
// Task is not found.
notFound = true
} else {
// Task is found. Get task status.
taskStatus, err = t.Status(ctx)
if err != nil {
// It's still possible that task is deleted during this window.
if !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to get task status: %w", err)
}
notFound = true
}
}
if notFound {
// Task does not exist, set sandbox state as NOTREADY.
status.State = sandboxstore.StateNotReady
} else {
if taskStatus.Status == containerd.Running {
// Wait for the task for sandbox monitor.
// wait is a long running background request, no timeout needed.
exitCh, err := t.Wait(ctrdutil.NamespacedContext())
if err != nil {
if !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to wait for task: %w", err)
}
status.State = sandboxstore.StateNotReady
} else {
// Task is running, set sandbox state as READY.
status.State = sandboxstore.StateReady
status.Pid = t.Pid()
go func() {
c.waitSandboxExit(context.Background(), meta.ID, exitCh)
}()
}
} else {
// Task is not running. Delete the task and set sandbox state as NOTREADY.
if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to delete task: %w", err)
}
status.State = sandboxstore.StateNotReady
}
}
return status, nil
}()
if err != nil {
log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID())
}
sandbox = sandboxstore.NewSandbox(*meta, s)
sandbox.Container = cntr
// Load network namespace.
sandbox.NetNS = getNetNS(meta)
// It doesn't matter whether task is running or not. If it is running, sandbox
// status will be `READY`; if it is not running, sandbox status will be `NOT_READY`,
// kubelet will stop the sandbox which will properly cleanup everything.
return sandbox, nil
}
func getNetNS(meta *sandboxstore.Metadata) *netns.NetNS {
// Don't need to load netns for host network sandbox.
if hostNetwork(meta.Config) {
return nil
}
return netns.LoadNetNS(meta.NetNSPath)
}
// hostNetwork handles checking if host networking was requested.
// TODO: Copy pasted from sbserver to handle container sandbox events in podsandbox/ package, needs refactoring.
func hostNetwork(config *runtime.PodSandboxConfig) bool {
var hostNet bool
switch goruntime.GOOS {
case "windows":
// Windows HostProcess pods can only run on the host network
hostNet = config.GetWindows().GetSecurityContext().GetHostProcess()
case "darwin":
// No CNI on Darwin yet.
hostNet = true
default:
// Even on other platforms, the logic containerd uses is to check if NamespaceMode == NODE.
// So this handles Linux, as well as any other platforms not governed by the cases above
// that have special quirks.
hostNet = config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtime.NamespaceMode_NODE
}
return hostNet
}

View File

@ -20,8 +20,6 @@ import (
"context"
"fmt"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
@ -59,10 +57,5 @@ func (c *Controller) Shutdown(ctx context.Context, sandboxID string) error {
}
}
// Send CONTAINER_DELETED event with ContainerId equal to SandboxId.
c.cri.GenerateAndSendContainerEvent(ctx, sandboxID, sandboxID, runtime.ContainerEventType_CONTAINER_DELETED_EVENT)
c.sandboxStore.Delete(sandboxID)
return nil
}

View File

@ -93,12 +93,10 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, sandbox sandboxst
defer close(stopCh)
exitStatus, exitedAt, err := c.waitSandboxExit(exitCtx, id, exitCh)
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
e := &eventtypes.TaskExit{
ContainerID: id,
ID: id,
Pid: task.Pid(),
ExitStatus: exitStatus,
ExitedAt: protobuf.ToTimestamp(exitedAt),
e := &eventtypes.SandboxExit{
SandboxID: id,
ExitStatus: exitStatus,
ExitedAt: protobuf.ToTimestamp(exitedAt),
}
logrus.WithError(err).Errorf("Failed to wait sandbox exit %+v", e)
// TODO: how to backoff

View File

@ -31,6 +31,7 @@ import (
"github.com/containerd/containerd/log"
criconfig "github.com/containerd/containerd/pkg/cri/config"
"github.com/containerd/containerd/pkg/cri/sbserver/podsandbox"
"github.com/containerd/containerd/pkg/netns"
"github.com/containerd/containerd/platforms"
"github.com/containerd/typeurl/v2"
"golang.org/x/sync/errgroup"
@ -40,7 +41,6 @@ import (
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
"github.com/containerd/containerd/pkg/netns"
)
// NOTE: The recovery logic has following assumption: when the cri plugin is down:
@ -60,11 +60,21 @@ func (c *criService) recover(ctx context.Context) error {
return fmt.Errorf("failed to list sandbox containers: %w", err)
}
podSandboxController, ok := c.sandboxControllers[criconfig.ModePodSandbox]
if !ok {
log.G(ctx).Fatal("unable to restore pod sandboxes, no controller found")
}
podSandboxLoader, ok := podSandboxController.(podSandboxRecover)
if !ok {
log.G(ctx).Fatal("pod sandbox controller doesn't support recovery")
}
eg, ctx2 := errgroup.WithContext(ctx)
for _, sandbox := range sandboxes {
sandbox := sandbox
eg.Go(func() error {
sb, err := c.loadSandbox(ctx2, sandbox)
sb, err := podSandboxLoader.RecoverContainer(ctx2, sandbox)
if err != nil {
log.G(ctx2).WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID())
return nil
@ -388,99 +398,10 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
return containerstore.NewContainer(*meta, opts...)
}
// loadSandbox loads sandbox from containerd.
func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) {
ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout)
defer cancel()
var sandbox sandboxstore.Sandbox
// Load sandbox metadata.
exts, err := cntr.Extensions(ctx)
if err != nil {
return sandbox, fmt.Errorf("failed to get sandbox container extensions: %w", err)
}
ext, ok := exts[sandboxMetadataExtension]
if !ok {
return sandbox, fmt.Errorf("metadata extension %q not found", sandboxMetadataExtension)
}
data, err := typeurl.UnmarshalAny(ext)
if err != nil {
return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %w", ext, err)
}
meta := data.(*sandboxstore.Metadata)
s, err := func() (sandboxstore.Status, error) {
status := unknownSandboxStatus()
// Load sandbox created timestamp.
info, err := cntr.Info(ctx)
if err != nil {
return status, fmt.Errorf("failed to get sandbox container info: %w", err)
}
status.CreatedAt = info.CreatedAt
// Load sandbox state.
t, err := cntr.Task(ctx, nil)
if err != nil && !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to load task: %w", err)
}
var taskStatus containerd.Status
var notFound bool
if errdefs.IsNotFound(err) {
// Task is not found.
notFound = true
} else {
// Task is found. Get task status.
taskStatus, err = t.Status(ctx)
if err != nil {
// It's still possible that task is deleted during this window.
if !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to get task status: %w", err)
}
notFound = true
}
}
if notFound {
// Task does not exist, set sandbox state as NOTREADY.
status.State = sandboxstore.StateNotReady
} else {
if taskStatus.Status == containerd.Running {
// Wait for the task for sandbox monitor.
// wait is a long running background request, no timeout needed.
exitCh, err := t.Wait(ctrdutil.NamespacedContext())
if err != nil {
if !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to wait for task: %w", err)
}
status.State = sandboxstore.StateNotReady
} else {
// Task is running, set sandbox state as READY.
status.State = sandboxstore.StateReady
status.Pid = t.Pid()
c.eventMonitor.startSandboxExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
}
} else {
// Task is not running. Delete the task and set sandbox state as NOTREADY.
if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
return status, fmt.Errorf("failed to delete task: %w", err)
}
status.State = sandboxstore.StateNotReady
}
}
return status, nil
}()
if err != nil {
log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID())
}
sandbox = sandboxstore.NewSandbox(*meta, s)
sandbox.Container = cntr
// Load network namespace.
sandbox.NetNS = getNetNS(meta)
// It doesn't matter whether task is running or not. If it is running, sandbox
// status will be `READY`; if it is not running, sandbox status will be `NOT_READY`,
// kubelet will stop the sandbox which will properly cleanup everything.
return sandbox, nil
// podSandboxRecover is an additional interface implemented by podsandbox/ controller to handle
// Pod sandbox containers recovery.
type podSandboxRecover interface {
RecoverContainer(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error)
}
func getNetNS(meta *sandboxstore.Metadata) *netns.NetNS {

View File

@ -89,6 +89,9 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
return nil, fmt.Errorf("failed to delete sandbox %q: %w", id, err)
}
// Send CONTAINER_DELETED event with ContainerId equal to SandboxId.
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_DELETED_EVENT)
err = c.nri.RemovePodSandbox(ctx, &sandbox)
if err != nil {
log.G(ctx).WithError(err).Errorf("NRI pod removal notification failed")

View File

@ -215,12 +215,6 @@ func (c *criService) BackOffEvent(id string, event interface{}) {
c.eventMonitor.backOff.enBackOff(id, event)
}
// GenerateAndSendContainerEvent is a temporary workaround to send PLEG events from podsandbopx/ controller
// TODO: refactor PLEG event generator so both podsandbox/ controller and CRI service can access it.
func (c *criService) GenerateAndSendContainerEvent(ctx context.Context, containerID string, sandboxID string, eventType runtime.ContainerEventType) {
c.generateAndSendContainerEvent(ctx, containerID, sandboxID, eventType)
}
// Register registers all required services onto a specific grpc server.
// This is used by containerd cri plugin.
func (c *criService) Register(s *grpc.Server) error {

View File

@ -19,14 +19,17 @@ package sandbox
import (
"context"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/anypb"
eventtypes "github.com/containerd/containerd/api/events"
api "github.com/containerd/containerd/api/services/sandbox/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/sandbox"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/anypb"
)
func init() {
@ -35,6 +38,7 @@ func init() {
ID: "sandbox-controllers",
Requires: []plugin.Type{
plugin.SandboxControllerPlugin,
plugin.EventPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
sc, err := ic.GetByID(plugin.SandboxControllerPlugin, "local")
@ -42,15 +46,22 @@ func init() {
return nil, err
}
ep, err := ic.Get(plugin.EventPlugin)
if err != nil {
return nil, err
}
return &controllerService{
local: sc.(sandbox.Controller),
local: sc.(sandbox.Controller),
publisher: ep.(events.Publisher),
}, nil
},
})
}
type controllerService struct {
local sandbox.Controller
local sandbox.Controller
publisher events.Publisher
api.UnimplementedControllerServer
}
@ -68,6 +79,13 @@ func (s *controllerService) Create(ctx context.Context, req *api.ControllerCreat
if err != nil {
return &api.ControllerCreateResponse{}, errdefs.ToGRPC(err)
}
if err := s.publisher.Publish(ctx, "sandboxes/create", &eventtypes.SandboxCreate{
SandboxID: req.GetSandboxID(),
}); err != nil {
return &api.ControllerCreateResponse{}, errdefs.ToGRPC(err)
}
return &api.ControllerCreateResponse{
SandboxID: req.GetSandboxID(),
}, nil
@ -79,6 +97,13 @@ func (s *controllerService) Start(ctx context.Context, req *api.ControllerStartR
if err != nil {
return &api.ControllerStartResponse{}, errdefs.ToGRPC(err)
}
if err := s.publisher.Publish(ctx, "sandboxes/start", &eventtypes.SandboxStart{
SandboxID: req.GetSandboxID(),
}); err != nil {
return &api.ControllerStartResponse{}, errdefs.ToGRPC(err)
}
return &api.ControllerStartResponse{
SandboxID: inst.SandboxID,
Pid: inst.Pid,
@ -98,6 +123,15 @@ func (s *controllerService) Wait(ctx context.Context, req *api.ControllerWaitReq
if err != nil {
return &api.ControllerWaitResponse{}, errdefs.ToGRPC(err)
}
if err := s.publisher.Publish(ctx, "sandboxes/exit", &eventtypes.SandboxExit{
SandboxID: req.GetSandboxID(),
ExitStatus: exitStatus.ExitStatus,
ExitedAt: protobuf.ToTimestamp(exitStatus.ExitedAt),
}); err != nil {
return &api.ControllerWaitResponse{}, errdefs.ToGRPC(err)
}
return &api.ControllerWaitResponse{
ExitStatus: exitStatus.ExitStatus,
ExitedAt: protobuf.ToTimestamp(exitStatus.ExitedAt),