diff --git a/process.go b/process.go index 9d9c5ee22..7825f36ff 100644 --- a/process.go +++ b/process.go @@ -66,7 +66,6 @@ func (p *process) Kill(ctx context.Context, s syscall.Signal) error { } func (p *process) Wait(ctx context.Context) (uint32, error) { - // TODO (ehazlett): add filtering for specific event eventstream, err := p.task.client.EventService().Stream(ctx, &eventsapi.StreamEventsRequest{}) if err != nil { return UnknownExitStatus, err @@ -78,15 +77,15 @@ evloop: if err != nil { return UnknownExitStatus, err } - v, err := typeurl.UnmarshalAny(evt.Event) - if err != nil { - return UnknownExitStatus, err - } - if e, ok := v.(*eventsapi.RuntimeEvent); ok { + if typeurl.Is(evt.Event, eventsapi.RuntimeEvent{}) { + v, err := typeurl.UnmarshalAny(evt.Event) + if err != nil { + return UnknownExitStatus, err + } + e := v.(*eventsapi.RuntimeEvent) if e.Type != eventsapi.RuntimeEvent_EXIT { continue evloop } - if e.ID == p.task.containerID && e.Pid == p.pid { return e.ExitStatus, nil } diff --git a/task.go b/task.go index d64cf6916..19a63b1e9 100644 --- a/task.go +++ b/task.go @@ -160,8 +160,12 @@ func (t *task) Wait(ctx context.Context) (uint32, error) { if err != nil { return UnknownExitStatus, err } - v, err := typeurl.UnmarshalAny(evt.Event) - if e, ok := v.(*eventsapi.RuntimeEvent); ok { + if typeurl.Is(evt.Event, eventsapi.RuntimeEvent{}) { + v, err := typeurl.UnmarshalAny(evt.Event) + if err != nil { + return UnknownExitStatus, err + } + e := v.(*eventsapi.RuntimeEvent) if e.Type != eventsapi.RuntimeEvent_EXIT { continue } diff --git a/typeurl/types.go b/typeurl/types.go index 19fee0208..6184d2fb1 100644 --- a/typeurl/types.go +++ b/typeurl/types.go @@ -2,12 +2,12 @@ package typeurl import ( "encoding/json" - "errors" "path" "reflect" "strings" "sync" + "github.com/containerd/containerd/errdefs" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" ) @@ -15,10 +15,8 @@ import ( const Prefix = "types.containerd.io" var ( - mu sync.Mutex - registry = make(map[reflect.Type]string) - ErrRegistered = errors.New("typeurl: type already registred") - ErrNotExists = errors.New("typeurl: type is not registered") + mu sync.Mutex + registry = make(map[reflect.Type]string) ) // Register a type with the base url of the type @@ -27,7 +25,7 @@ func Register(v interface{}, args ...string) { mu.Lock() defer mu.Unlock() if _, ok := registry[t]; ok { - panic(ErrRegistered) + panic(errdefs.ErrAlreadyExists) } registry[t] = path.Join(append([]string{Prefix}, args...)...) } @@ -41,13 +39,21 @@ func TypeURL(v interface{}) (string, error) { // fallback to the proto registry if it is a proto message pb, ok := v.(proto.Message) if !ok { - return "", ErrNotExists + return "", errdefs.ErrNotFound } return path.Join(Prefix, proto.MessageName(pb)), nil } return u, nil } +func Is(any *types.Any, v interface{}) bool { + url, err := TypeURL(v) + if err != nil { + return false + } + return any.TypeUrl == url +} + func MarshalAny(v interface{}) (*types.Any, error) { var ( err error @@ -108,7 +114,7 @@ func getTypeByUrl(url string) (urlType, error) { isProto: true, }, nil } - return urlType{}, ErrNotExists + return urlType{}, errdefs.ErrNotFound } func tryDereference(v interface{}) reflect.Type { diff --git a/typeurl/types_test.go b/typeurl/types_test.go index 8056c17ab..9f9bd3e2f 100644 --- a/typeurl/types_test.go +++ b/typeurl/types_test.go @@ -7,7 +7,6 @@ import ( "testing" eventsapi "github.com/containerd/containerd/api/services/events/v1" - events "github.com/containerd/containerd/events" ) type test struct { @@ -121,9 +120,26 @@ func TestMarshalUnmarshal(t *testing.T) { } } +func TestIs(t *testing.T) { + clear() + Register(test{}, "test") + + v := &test{ + Name: "koye", + Age: 6, + } + any, err := MarshalAny(v) + if err != nil { + t.Fatal(err) + } + if !Is(any, test{}) { + t.Fatal("Is(any, test{}) should be true") + } +} + func TestMarshalEvent(t *testing.T) { for _, testcase := range []struct { - event events.Event + event interface{} url string }{ {