use event service for task wait

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Evan Hazlett 2017-06-21 11:26:14 -04:00 committed by Michael Crosby
parent 440fc06041
commit 0b06fa8518
6 changed files with 133 additions and 9 deletions

View File

@ -36,6 +36,8 @@ func (*TaskStart) Descriptor() ([]byte, []int) { return fileDescriptorTask, []in
type TaskDelete struct { type TaskDelete struct {
ContainerID string `protobuf:"bytes,1,opt,name=container_id,json=containerId,proto3" json:"container_id,omitempty"` ContainerID string `protobuf:"bytes,1,opt,name=container_id,json=containerId,proto3" json:"container_id,omitempty"`
Pid uint32 `protobuf:"varint,2,opt,name=pid,proto3" json:"pid,omitempty"`
ExitStatus uint32 `protobuf:"varint,3,opt,name=exit_status,json=exitStatus,proto3" json:"exit_status,omitempty"`
} }
func (m *TaskDelete) Reset() { *m = TaskDelete{} } func (m *TaskDelete) Reset() { *m = TaskDelete{} }
@ -116,6 +118,16 @@ func (m *TaskDelete) MarshalTo(dAtA []byte) (int, error) {
i = encodeVarintTask(dAtA, i, uint64(len(m.ContainerID))) i = encodeVarintTask(dAtA, i, uint64(len(m.ContainerID)))
i += copy(dAtA[i:], m.ContainerID) i += copy(dAtA[i:], m.ContainerID)
} }
if m.Pid != 0 {
dAtA[i] = 0x10
i++
i = encodeVarintTask(dAtA, i, uint64(m.Pid))
}
if m.ExitStatus != 0 {
dAtA[i] = 0x18
i++
i = encodeVarintTask(dAtA, i, uint64(m.ExitStatus))
}
return i, nil return i, nil
} }
@ -173,6 +185,12 @@ func (m *TaskDelete) Size() (n int) {
if l > 0 { if l > 0 {
n += 1 + l + sovTask(uint64(l)) n += 1 + l + sovTask(uint64(l))
} }
if m.Pid != 0 {
n += 1 + sovTask(uint64(m.Pid))
}
if m.ExitStatus != 0 {
n += 1 + sovTask(uint64(m.ExitStatus))
}
return n return n
} }
@ -215,6 +233,8 @@ func (this *TaskDelete) String() string {
} }
s := strings.Join([]string{`&TaskDelete{`, s := strings.Join([]string{`&TaskDelete{`,
`ContainerID:` + fmt.Sprintf("%v", this.ContainerID) + `,`, `ContainerID:` + fmt.Sprintf("%v", this.ContainerID) + `,`,
`Pid:` + fmt.Sprintf("%v", this.Pid) + `,`,
`ExitStatus:` + fmt.Sprintf("%v", this.ExitStatus) + `,`,
`}`, `}`,
}, "") }, "")
return s return s
@ -443,6 +463,44 @@ func (m *TaskDelete) Unmarshal(dAtA []byte) error {
} }
m.ContainerID = string(dAtA[iNdEx:postIndex]) m.ContainerID = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Pid", wireType)
}
m.Pid = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTask
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Pid |= (uint32(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ExitStatus", wireType)
}
m.ExitStatus = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTask
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.ExitStatus |= (uint32(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipTask(dAtA[iNdEx:]) skippy, err := skipTask(dAtA[iNdEx:])
@ -574,7 +632,7 @@ func init() {
} }
var fileDescriptorTask = []byte{ var fileDescriptorTask = []byte{
// 186 bytes of a gzipped FileDescriptorProto // 228 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xb2, 0x4e, 0xcf, 0x2c, 0xc9, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xb2, 0x4e, 0xcf, 0x2c, 0xc9,
0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xce, 0xcf, 0x2b, 0x49, 0xcc, 0xcc, 0x4b, 0x2d, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xce, 0xcf, 0x2b, 0x49, 0xcc, 0xcc, 0x4b, 0x2d,
0x4a, 0x41, 0x66, 0x26, 0x16, 0x64, 0xea, 0x97, 0x54, 0x16, 0xa4, 0x16, 0xeb, 0xa7, 0x96, 0xa5, 0x4a, 0x41, 0x66, 0x26, 0x16, 0x64, 0xea, 0x97, 0x54, 0x16, 0xa4, 0x16, 0xeb, 0xa7, 0x96, 0xa5,
@ -583,8 +641,11 @@ var fileDescriptorTask = []byte{
0x17, 0xa5, 0x26, 0x96, 0xa4, 0x0a, 0x19, 0x71, 0xf1, 0xc0, 0x15, 0xc5, 0x67, 0xa6, 0x48, 0x30, 0x17, 0xa5, 0x26, 0x96, 0xa4, 0x0a, 0x19, 0x71, 0xf1, 0xc0, 0x15, 0xc5, 0x67, 0xa6, 0x48, 0x30,
0x2a, 0x30, 0x6a, 0x70, 0x3a, 0xf1, 0x3f, 0xba, 0x27, 0xcf, 0xed, 0x0c, 0x13, 0xf7, 0x74, 0x09, 0x2a, 0x30, 0x6a, 0x70, 0x3a, 0xf1, 0x3f, 0xba, 0x27, 0xcf, 0xed, 0x0c, 0x13, 0xf7, 0x74, 0x09,
0xe2, 0x86, 0x2b, 0xf2, 0x4c, 0x51, 0xb2, 0xe7, 0xe2, 0x04, 0x99, 0x10, 0x5c, 0x92, 0x58, 0x54, 0xe2, 0x86, 0x2b, 0xf2, 0x4c, 0x51, 0xb2, 0xe7, 0xe2, 0x04, 0x99, 0x10, 0x5c, 0x92, 0x58, 0x54,
0x42, 0x96, 0x01, 0x50, 0x27, 0xb8, 0xa4, 0xe6, 0xa4, 0x92, 0xe7, 0x04, 0x27, 0x89, 0x13, 0x0f, 0x42, 0x96, 0x01, 0xc5, 0x10, 0x27, 0xb8, 0xa4, 0xe6, 0xa4, 0x92, 0xe7, 0x04, 0x21, 0x01, 0x2e,
0xe5, 0x18, 0x6e, 0x3c, 0x94, 0x63, 0x68, 0x78, 0x24, 0xc7, 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0xe6, 0x82, 0xcc, 0x14, 0x09, 0x26, 0x05, 0x46, 0x0d, 0xde, 0x20, 0x10, 0x53, 0x48, 0x9e, 0x8b,
0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0x26, 0xb1, 0x81, 0xbd, 0x6e, 0x0c, 0x08, 0x00, 0x00, 0x3b, 0xb5, 0x22, 0xb3, 0x24, 0xbe, 0xb8, 0x24, 0xb1, 0xa4, 0xb4, 0x58, 0x82, 0x19, 0x2c, 0xc3,
0xff, 0xff, 0x3a, 0x4d, 0xa1, 0xee, 0x39, 0x01, 0x00, 0x00, 0x05, 0x12, 0x0a, 0x06, 0x8b, 0x38, 0x49, 0x9c, 0x78, 0x28, 0xc7, 0x70, 0xe3, 0xa1, 0x1c, 0x43,
0xc3, 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e,
0x31, 0x89, 0x0d, 0x1c, 0x5a, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0e, 0x43, 0xc1, 0x0e,
0x6c, 0x01, 0x00, 0x00,
} }

View File

@ -12,4 +12,6 @@ message TaskStart {
message TaskDelete { message TaskDelete {
string container_id = 1; string container_id = 1;
uint32 pid = 2;
uint32 exit_status = 3;
} }

View File

@ -83,7 +83,7 @@ func getEventOutput(evt *event.Envelope) (string, error) {
if err := proto.Unmarshal(evt.Event.Value, e); err != nil { if err := proto.Unmarshal(evt.Event.Value, e); err != nil {
return out, err return out, err
} }
out = "id=" + e.ContainerID out = fmt.Sprintf("id=%s pid=%d status=%d", e.ContainerID, e.Pid, e.ExitStatus)
case "types.containerd.io/containerd.v1.types.event.ContainerUpdate": case "types.containerd.io/containerd.v1.types.event.ContainerUpdate":
e := &event.ContainerUpdate{} e := &event.ContainerUpdate{}
if err := proto.Unmarshal(evt.Event.Value, e); err != nil { if err := proto.Unmarshal(evt.Event.Value, e); err != nil {

View File

@ -3,10 +3,13 @@ package containerd
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"syscall" "syscall"
eventsapi "github.com/containerd/containerd/api/services/events"
"github.com/containerd/containerd/api/services/execution" "github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/event"
tasktypes "github.com/containerd/containerd/api/types/task"
"github.com/gogo/protobuf/proto"
protobuf "github.com/gogo/protobuf/types" protobuf "github.com/gogo/protobuf/types"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
) )
@ -70,7 +73,31 @@ func (p *process) Kill(ctx context.Context, s syscall.Signal) error {
} }
func (p *process) Wait(ctx context.Context) (uint32, error) { func (p *process) Wait(ctx context.Context) (uint32, error) {
return 255, fmt.Errorf("not implemented") // TODO (ehazlett): add filtering for specific event
events, err := p.task.client.EventService().Stream(ctx, &eventsapi.StreamEventsRequest{})
if err != nil {
return UnknownExitStatus, err
}
for {
evt, err := events.Recv()
if err != nil {
return UnknownExitStatus, err
}
if evt.Event.TypeUrl == "types.containerd.io/containerd.v1.types.event.RuntimeEvent" {
e := &event.RuntimeEvent{}
if err := proto.Unmarshal(evt.Event.Value, e); err != nil {
return UnknownExitStatus, err
}
if e.Type != tasktypes.Event_EXIT {
continue
}
if e.ID == p.task.containerID && e.Pid == p.pid {
return e.ExitStatus, nil
}
}
}
} }
func (p *process) CloseIO(ctx context.Context, opts ...IOCloserOpts) error { func (p *process) CloseIO(ctx context.Context, opts ...IOCloserOpts) error {

View File

@ -198,6 +198,10 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteTaskRequest) (*api.De
if err != nil { if err != nil {
return nil, err return nil, err
} }
state, err := t.State(ctx)
if err != nil {
return nil, err
}
exit, err := runtime.Delete(ctx, t) exit, err := runtime.Delete(ctx, t)
if err != nil { if err != nil {
return nil, err return nil, err
@ -205,6 +209,8 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteTaskRequest) (*api.De
if err := s.emit(ctx, "/tasks/delete", event.TaskDelete{ if err := s.emit(ctx, "/tasks/delete", event.TaskDelete{
ContainerID: r.ContainerID, ContainerID: r.ContainerID,
Pid: state.Pid,
ExitStatus: exit.Status,
}); err != nil { }); err != nil {
return nil, err return nil, err
} }

30
task.go
View File

@ -11,9 +11,13 @@ import (
"syscall" "syscall"
"github.com/containerd/containerd/api/services/containers" "github.com/containerd/containerd/api/services/containers"
eventsapi "github.com/containerd/containerd/api/services/events"
"github.com/containerd/containerd/api/services/execution" "github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/event"
tasktypes "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
"github.com/containerd/containerd/rootfs" "github.com/containerd/containerd/rootfs"
"github.com/gogo/protobuf/proto"
"github.com/opencontainers/image-spec/specs-go/v1" "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
) )
@ -138,7 +142,31 @@ func (t *task) Status(ctx context.Context) (TaskStatus, error) {
// Wait is a blocking call that will wait for the task to exit and return the exit status // Wait is a blocking call that will wait for the task to exit and return the exit status
func (t *task) Wait(ctx context.Context) (uint32, error) { func (t *task) Wait(ctx context.Context) (uint32, error) {
return 255, fmt.Errorf("not implemented") // TODO (ehazlett): add filtering for specific event
events, err := t.client.EventService().Stream(ctx, &eventsapi.StreamEventsRequest{})
if err != nil {
return UnknownExitStatus, err
}
for {
evt, err := events.Recv()
if err != nil {
return UnknownExitStatus, err
}
if evt.Event.TypeUrl == "types.containerd.io/containerd.v1.types.event.RuntimeEvent" {
e := &event.RuntimeEvent{}
if err := proto.Unmarshal(evt.Event.Value, e); err != nil {
return UnknownExitStatus, err
}
if e.Type != tasktypes.Event_EXIT {
continue
}
if e.ID == t.containerID && e.Pid == t.pid {
return e.ExitStatus, nil
}
}
}
} }
// Delete deletes the task and its runtime state // Delete deletes the task and its runtime state