diff --git a/api/types/event/task.pb.go b/api/types/event/task.pb.go index d9c8aa061..e1d57c947 100644 --- a/api/types/event/task.pb.go +++ b/api/types/event/task.pb.go @@ -36,6 +36,8 @@ func (*TaskStart) Descriptor() ([]byte, []int) { return fileDescriptorTask, []in type TaskDelete struct { ContainerID string `protobuf:"bytes,1,opt,name=container_id,json=containerId,proto3" json:"container_id,omitempty"` + Pid uint32 `protobuf:"varint,2,opt,name=pid,proto3" json:"pid,omitempty"` + ExitStatus uint32 `protobuf:"varint,3,opt,name=exit_status,json=exitStatus,proto3" json:"exit_status,omitempty"` } func (m *TaskDelete) Reset() { *m = TaskDelete{} } @@ -116,6 +118,16 @@ func (m *TaskDelete) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintTask(dAtA, i, uint64(len(m.ContainerID))) i += copy(dAtA[i:], m.ContainerID) } + if m.Pid != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintTask(dAtA, i, uint64(m.Pid)) + } + if m.ExitStatus != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintTask(dAtA, i, uint64(m.ExitStatus)) + } return i, nil } @@ -173,6 +185,12 @@ func (m *TaskDelete) Size() (n int) { if l > 0 { n += 1 + l + sovTask(uint64(l)) } + if m.Pid != 0 { + n += 1 + sovTask(uint64(m.Pid)) + } + if m.ExitStatus != 0 { + n += 1 + sovTask(uint64(m.ExitStatus)) + } return n } @@ -215,6 +233,8 @@ func (this *TaskDelete) String() string { } s := strings.Join([]string{`&TaskDelete{`, `ContainerID:` + fmt.Sprintf("%v", this.ContainerID) + `,`, + `Pid:` + fmt.Sprintf("%v", this.Pid) + `,`, + `ExitStatus:` + fmt.Sprintf("%v", this.ExitStatus) + `,`, `}`, }, "") return s @@ -443,6 +463,44 @@ func (m *TaskDelete) Unmarshal(dAtA []byte) error { } m.ContainerID = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Pid", wireType) + } + m.Pid = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTask + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Pid |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ExitStatus", wireType) + } + m.ExitStatus = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTask + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ExitStatus |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipTask(dAtA[iNdEx:]) @@ -574,7 +632,7 @@ func init() { } var fileDescriptorTask = []byte{ - // 186 bytes of a gzipped FileDescriptorProto + // 228 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xb2, 0x4e, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xce, 0xcf, 0x2b, 0x49, 0xcc, 0xcc, 0x4b, 0x2d, 0x4a, 0x41, 0x66, 0x26, 0x16, 0x64, 0xea, 0x97, 0x54, 0x16, 0xa4, 0x16, 0xeb, 0xa7, 0x96, 0xa5, @@ -583,8 +641,11 @@ var fileDescriptorTask = []byte{ 0x17, 0xa5, 0x26, 0x96, 0xa4, 0x0a, 0x19, 0x71, 0xf1, 0xc0, 0x15, 0xc5, 0x67, 0xa6, 0x48, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x3a, 0xf1, 0x3f, 0xba, 0x27, 0xcf, 0xed, 0x0c, 0x13, 0xf7, 0x74, 0x09, 0xe2, 0x86, 0x2b, 0xf2, 0x4c, 0x51, 0xb2, 0xe7, 0xe2, 0x04, 0x99, 0x10, 0x5c, 0x92, 0x58, 0x54, - 0x42, 0x96, 0x01, 0x50, 0x27, 0xb8, 0xa4, 0xe6, 0xa4, 0x92, 0xe7, 0x04, 0x27, 0x89, 0x13, 0x0f, - 0xe5, 0x18, 0x6e, 0x3c, 0x94, 0x63, 0x68, 0x78, 0x24, 0xc7, 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, - 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0x26, 0xb1, 0x81, 0xbd, 0x6e, 0x0c, 0x08, 0x00, 0x00, - 0xff, 0xff, 0x3a, 0x4d, 0xa1, 0xee, 0x39, 0x01, 0x00, 0x00, + 0x42, 0x96, 0x01, 0xc5, 0x10, 0x27, 0xb8, 0xa4, 0xe6, 0xa4, 0x92, 0xe7, 0x04, 0x21, 0x01, 0x2e, + 0xe6, 0x82, 0xcc, 0x14, 0x09, 0x26, 0x05, 0x46, 0x0d, 0xde, 0x20, 0x10, 0x53, 0x48, 0x9e, 0x8b, + 0x3b, 0xb5, 0x22, 0xb3, 0x24, 0xbe, 0xb8, 0x24, 0xb1, 0xa4, 0xb4, 0x58, 0x82, 0x19, 0x2c, 0xc3, + 0x05, 0x12, 0x0a, 0x06, 0x8b, 0x38, 0x49, 0x9c, 0x78, 0x28, 0xc7, 0x70, 0xe3, 0xa1, 0x1c, 0x43, + 0xc3, 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, + 0x31, 0x89, 0x0d, 0x1c, 0x5a, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0e, 0x43, 0xc1, 0x0e, + 0x6c, 0x01, 0x00, 0x00, } diff --git a/api/types/event/task.proto b/api/types/event/task.proto index 4a62e8153..e79085448 100644 --- a/api/types/event/task.proto +++ b/api/types/event/task.proto @@ -12,4 +12,6 @@ message TaskStart { message TaskDelete { string container_id = 1; + uint32 pid = 2; + uint32 exit_status = 3; } diff --git a/cmd/ctr/events.go b/cmd/ctr/events.go index 814381083..28991c2bd 100644 --- a/cmd/ctr/events.go +++ b/cmd/ctr/events.go @@ -83,7 +83,7 @@ func getEventOutput(evt *event.Envelope) (string, error) { if err := proto.Unmarshal(evt.Event.Value, e); err != nil { return out, err } - out = "id=" + e.ContainerID + out = fmt.Sprintf("id=%s pid=%d status=%d", e.ContainerID, e.Pid, e.ExitStatus) case "types.containerd.io/containerd.v1.types.event.ContainerUpdate": e := &event.ContainerUpdate{} if err := proto.Unmarshal(evt.Event.Value, e); err != nil { diff --git a/process.go b/process.go index fdf11cb6b..fc043e26e 100644 --- a/process.go +++ b/process.go @@ -3,10 +3,13 @@ package containerd import ( "context" "encoding/json" - "fmt" "syscall" + eventsapi "github.com/containerd/containerd/api/services/events" "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/event" + tasktypes "github.com/containerd/containerd/api/types/task" + "github.com/gogo/protobuf/proto" protobuf "github.com/gogo/protobuf/types" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -70,7 +73,31 @@ func (p *process) Kill(ctx context.Context, s syscall.Signal) error { } func (p *process) Wait(ctx context.Context) (uint32, error) { - return 255, fmt.Errorf("not implemented") + // TODO (ehazlett): add filtering for specific event + events, err := p.task.client.EventService().Stream(ctx, &eventsapi.StreamEventsRequest{}) + if err != nil { + return UnknownExitStatus, err + } + for { + evt, err := events.Recv() + if err != nil { + return UnknownExitStatus, err + } + if evt.Event.TypeUrl == "types.containerd.io/containerd.v1.types.event.RuntimeEvent" { + e := &event.RuntimeEvent{} + if err := proto.Unmarshal(evt.Event.Value, e); err != nil { + return UnknownExitStatus, err + } + + if e.Type != tasktypes.Event_EXIT { + continue + } + + if e.ID == p.task.containerID && e.Pid == p.pid { + return e.ExitStatus, nil + } + } + } } func (p *process) CloseIO(ctx context.Context, opts ...IOCloserOpts) error { diff --git a/services/execution/service.go b/services/execution/service.go index 8dda6c6d1..aa5609d46 100644 --- a/services/execution/service.go +++ b/services/execution/service.go @@ -198,6 +198,10 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteTaskRequest) (*api.De if err != nil { return nil, err } + state, err := t.State(ctx) + if err != nil { + return nil, err + } exit, err := runtime.Delete(ctx, t) if err != nil { return nil, err @@ -205,6 +209,8 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteTaskRequest) (*api.De if err := s.emit(ctx, "/tasks/delete", event.TaskDelete{ ContainerID: r.ContainerID, + Pid: state.Pid, + ExitStatus: exit.Status, }); err != nil { return nil, err } diff --git a/task.go b/task.go index e15c35c9e..4389aa964 100644 --- a/task.go +++ b/task.go @@ -11,9 +11,13 @@ import ( "syscall" "github.com/containerd/containerd/api/services/containers" + eventsapi "github.com/containerd/containerd/api/services/events" "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/event" + tasktypes "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/content" "github.com/containerd/containerd/rootfs" + "github.com/gogo/protobuf/proto" "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -138,7 +142,31 @@ func (t *task) Status(ctx context.Context) (TaskStatus, error) { // Wait is a blocking call that will wait for the task to exit and return the exit status func (t *task) Wait(ctx context.Context) (uint32, error) { - return 255, fmt.Errorf("not implemented") + // TODO (ehazlett): add filtering for specific event + events, err := t.client.EventService().Stream(ctx, &eventsapi.StreamEventsRequest{}) + if err != nil { + return UnknownExitStatus, err + } + for { + evt, err := events.Recv() + if err != nil { + return UnknownExitStatus, err + } + if evt.Event.TypeUrl == "types.containerd.io/containerd.v1.types.event.RuntimeEvent" { + e := &event.RuntimeEvent{} + if err := proto.Unmarshal(evt.Event.Value, e); err != nil { + return UnknownExitStatus, err + } + + if e.Type != tasktypes.Event_EXIT { + continue + } + + if e.ID == t.containerID && e.Pid == t.pid { + return e.ExitStatus, nil + } + } + } } // Delete deletes the task and its runtime state