Merge pull request #1145 from crosbymichael/event-push

Implement Events Push via Service
This commit is contained in:
Kenfe-Mickaël Laventure 2017-07-11 08:53:26 +02:00 committed by GitHub
commit 13f90e95a7
18 changed files with 383 additions and 142 deletions

View File

@ -21,6 +21,7 @@
ContainerDelete
ContentDelete
StreamEventsRequest
PostEventRequest
Envelope
ImageUpdate
ImageDelete

View File

@ -8,8 +8,9 @@ import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
import _ "github.com/gogo/protobuf/gogoproto"
import _ "github.com/gogo/protobuf/types"
import google_protobuf1 "github.com/gogo/protobuf/types"
import google_protobuf2 "github.com/golang/protobuf/ptypes/empty"
import _ "github.com/gogo/protobuf/types"
import time "time"
@ -38,6 +39,14 @@ func (m *StreamEventsRequest) Reset() { *m = StreamEventsRequ
func (*StreamEventsRequest) ProtoMessage() {}
func (*StreamEventsRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{0} }
type PostEventRequest struct {
Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope" json:"envelope,omitempty"`
}
func (m *PostEventRequest) Reset() { *m = PostEventRequest{} }
func (*PostEventRequest) ProtoMessage() {}
func (*PostEventRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{1} }
type Envelope struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,stdtime" json:"timestamp"`
Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
@ -46,10 +55,11 @@ type Envelope struct {
func (m *Envelope) Reset() { *m = Envelope{} }
func (*Envelope) ProtoMessage() {}
func (*Envelope) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{1} }
func (*Envelope) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{2} }
func init() {
proto.RegisterType((*StreamEventsRequest)(nil), "containerd.services.events.v1.StreamEventsRequest")
proto.RegisterType((*PostEventRequest)(nil), "containerd.services.events.v1.PostEventRequest")
proto.RegisterType((*Envelope)(nil), "containerd.services.events.v1.Envelope")
}
@ -65,6 +75,7 @@ const _ = grpc.SupportPackageIsVersion4
type EventsClient interface {
Stream(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (Events_StreamClient, error)
Post(ctx context.Context, in *PostEventRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error)
}
type eventsClient struct {
@ -107,10 +118,20 @@ func (x *eventsStreamClient) Recv() (*Envelope, error) {
return m, nil
}
func (c *eventsClient) Post(ctx context.Context, in *PostEventRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) {
out := new(google_protobuf2.Empty)
err := grpc.Invoke(ctx, "/containerd.services.events.v1.Events/Post", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Events service
type EventsServer interface {
Stream(*StreamEventsRequest, Events_StreamServer) error
Post(context.Context, *PostEventRequest) (*google_protobuf2.Empty, error)
}
func RegisterEventsServer(s *grpc.Server, srv EventsServer) {
@ -138,10 +159,33 @@ func (x *eventsStreamServer) Send(m *Envelope) error {
return x.ServerStream.SendMsg(m)
}
func _Events_Post_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PostEventRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(EventsServer).Post(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/containerd.services.events.v1.Events/Post",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(EventsServer).Post(ctx, req.(*PostEventRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Events_serviceDesc = grpc.ServiceDesc{
ServiceName: "containerd.services.events.v1.Events",
HandlerType: (*EventsServer)(nil),
Methods: []grpc.MethodDesc{},
Methods: []grpc.MethodDesc{
{
MethodName: "Post",
Handler: _Events_Post_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Stream",
@ -170,6 +214,34 @@ func (m *StreamEventsRequest) MarshalTo(dAtA []byte) (int, error) {
return i, nil
}
func (m *PostEventRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PostEventRequest) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.Envelope != nil {
dAtA[i] = 0xa
i++
i = encodeVarintEvents(dAtA, i, uint64(m.Envelope.Size()))
n1, err := m.Envelope.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
}
return i, nil
}
func (m *Envelope) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -188,11 +260,11 @@ func (m *Envelope) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0xa
i++
i = encodeVarintEvents(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp)))
n1, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:])
n2, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:])
if err != nil {
return 0, err
}
i += n1
i += n2
if len(m.Topic) > 0 {
dAtA[i] = 0x12
i++
@ -203,11 +275,11 @@ func (m *Envelope) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x1a
i++
i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size()))
n2, err := m.Event.MarshalTo(dAtA[i:])
n3, err := m.Event.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n2
i += n3
}
return i, nil
}
@ -245,6 +317,16 @@ func (m *StreamEventsRequest) Size() (n int) {
return n
}
func (m *PostEventRequest) Size() (n int) {
var l int
_ = l
if m.Envelope != nil {
l = m.Envelope.Size()
n += 1 + l + sovEvents(uint64(l))
}
return n
}
func (m *Envelope) Size() (n int) {
var l int
_ = l
@ -283,12 +365,22 @@ func (this *StreamEventsRequest) String() string {
}, "")
return s
}
func (this *PostEventRequest) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&PostEventRequest{`,
`Envelope:` + strings.Replace(fmt.Sprintf("%v", this.Envelope), "Envelope", "Envelope", 1) + `,`,
`}`,
}, "")
return s
}
func (this *Envelope) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&Envelope{`,
`Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "google_protobuf2.Timestamp", 1), `&`, ``, 1) + `,`,
`Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "google_protobuf3.Timestamp", 1), `&`, ``, 1) + `,`,
`Topic:` + fmt.Sprintf("%v", this.Topic) + `,`,
`Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "google_protobuf1.Any", 1) + `,`,
`}`,
@ -353,6 +445,89 @@ func (m *StreamEventsRequest) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *PostEventRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PostEventRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PostEventRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Envelope == nil {
m.Envelope = &Envelope{}
}
if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipEvents(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthEvents
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Envelope) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@ -605,25 +780,28 @@ func init() {
}
var fileDescriptorEvents = []byte{
// 313 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x91, 0xc1, 0x4a, 0xc3, 0x30,
0x1c, 0xc6, 0x17, 0x65, 0x63, 0x8b, 0xb7, 0x38, 0x61, 0x16, 0xcc, 0xc6, 0x2e, 0x0e, 0x0f, 0x89,
0x9b, 0x47, 0x41, 0x70, 0xb8, 0x17, 0xa8, 0x1e, 0xc4, 0x5b, 0x57, 0xff, 0xc6, 0xc0, 0x9a, 0x74,
0x6d, 0x56, 0xd8, 0xcd, 0x47, 0xd8, 0x63, 0xf5, 0xe8, 0xd1, 0x93, 0xba, 0x3e, 0x89, 0x98, 0xb4,
0x4e, 0x54, 0x14, 0xbc, 0x7d, 0x7f, 0xf2, 0xfb, 0xbe, 0xfc, 0xbf, 0x04, 0x5f, 0x08, 0x69, 0xee,
0x17, 0x53, 0x16, 0xea, 0x88, 0x87, 0x5a, 0x99, 0x40, 0x2a, 0x48, 0x6e, 0x3f, 0xcb, 0x20, 0x96,
0x3c, 0x85, 0x24, 0x93, 0x21, 0xa4, 0x1c, 0x32, 0x50, 0x26, 0xe5, 0xd9, 0xb0, 0x54, 0x2c, 0x4e,
0xb4, 0xd1, 0xe4, 0x60, 0xc3, 0xb3, 0x8a, 0x65, 0x25, 0x91, 0x0d, 0xbd, 0xb6, 0xd0, 0x42, 0x5b,
0x92, 0xbf, 0x2b, 0x67, 0xf2, 0xba, 0x42, 0x6b, 0x31, 0x03, 0x6e, 0xa7, 0xe9, 0xe2, 0x8e, 0x1b,
0x19, 0x41, 0x6a, 0x82, 0x28, 0x2e, 0x81, 0xfd, 0xaf, 0x40, 0xa0, 0x96, 0xee, 0xa8, 0xbf, 0x87,
0x77, 0x2f, 0x4d, 0x02, 0x41, 0x34, 0xb1, 0x97, 0xf8, 0x30, 0x5f, 0x40, 0x6a, 0xfa, 0x2b, 0x84,
0x9b, 0x13, 0x95, 0xc1, 0x4c, 0xc7, 0x40, 0xc6, 0xb8, 0xf5, 0x91, 0xd8, 0x41, 0x3d, 0x34, 0xd8,
0x19, 0x79, 0xcc, 0x45, 0xb2, 0x2a, 0x92, 0x5d, 0x55, 0xc4, 0xb8, 0x99, 0x3f, 0x77, 0x6b, 0xab,
0x97, 0x2e, 0xf2, 0x37, 0x36, 0xd2, 0xc6, 0x75, 0xa3, 0x63, 0x19, 0x76, 0xb6, 0x7a, 0x68, 0xd0,
0xf2, 0xdd, 0x40, 0x8e, 0x70, 0xdd, 0x96, 0xeb, 0x6c, 0xdb, 0xd4, 0xf6, 0xb7, 0xd4, 0x73, 0xb5,
0xf4, 0x1d, 0x32, 0x9a, 0xe3, 0x86, 0xdb, 0x91, 0x08, 0xdc, 0x70, 0x3b, 0x93, 0x11, 0xfb, 0xf5,
0xbd, 0xd8, 0x0f, 0xd5, 0xbc, 0xc3, 0x3f, 0x3c, 0x55, 0xed, 0x63, 0x34, 0xbe, 0xce, 0xd7, 0xb4,
0xf6, 0xb4, 0xa6, 0xb5, 0x87, 0x82, 0xa2, 0xbc, 0xa0, 0xe8, 0xb1, 0xa0, 0xe8, 0xb5, 0xa0, 0xe8,
0xe6, 0xec, 0x9f, 0xbf, 0x7d, 0xea, 0xd4, 0xb4, 0x61, 0x1b, 0x9e, 0xbc, 0x05, 0x00, 0x00, 0xff,
0xff, 0xc3, 0x54, 0xa2, 0xf3, 0x36, 0x02, 0x00, 0x00,
// 367 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xc1, 0x4e, 0xc2, 0x40,
0x10, 0x86, 0x59, 0x15, 0x02, 0xeb, 0xc5, 0xac, 0x68, 0xb0, 0xc6, 0x42, 0xb8, 0x48, 0x3c, 0xec,
0x0a, 0x1e, 0x4d, 0x4c, 0x44, 0x39, 0x6b, 0xaa, 0x89, 0xc6, 0x5b, 0xa9, 0x63, 0x6d, 0x42, 0xbb,
0xb5, 0x5d, 0x9a, 0x70, 0xf3, 0x11, 0x78, 0x26, 0x4f, 0x1c, 0x3d, 0x7a, 0x52, 0xe9, 0x93, 0x18,
0x76, 0xb7, 0x60, 0xc0, 0x88, 0xf1, 0x36, 0x3b, 0xf3, 0xcd, 0xf4, 0x9f, 0x7f, 0x8a, 0xcf, 0x5d,
0x4f, 0x3c, 0xf6, 0xbb, 0xd4, 0xe1, 0x3e, 0x73, 0x78, 0x20, 0x6c, 0x2f, 0x80, 0xe8, 0xfe, 0x7b,
0x68, 0x87, 0x1e, 0x8b, 0x21, 0x4a, 0x3c, 0x07, 0x62, 0x06, 0x09, 0x04, 0x22, 0x66, 0x49, 0x53,
0x47, 0x34, 0x8c, 0xb8, 0xe0, 0x64, 0x6f, 0xc6, 0xd3, 0x8c, 0xa5, 0x9a, 0x48, 0x9a, 0x46, 0xd9,
0xe5, 0x2e, 0x97, 0x24, 0x9b, 0x44, 0xaa, 0xc9, 0xd8, 0x71, 0x39, 0x77, 0x7b, 0xc0, 0xe4, 0xab,
0xdb, 0x7f, 0x60, 0x76, 0x30, 0xd0, 0xa5, 0xdd, 0xf9, 0x12, 0xf8, 0xa1, 0xc8, 0x8a, 0xd5, 0xf9,
0xa2, 0xf0, 0x7c, 0x88, 0x85, 0xed, 0x87, 0x0a, 0xa8, 0x6f, 0xe1, 0xcd, 0x2b, 0x11, 0x81, 0xed,
0x77, 0xa4, 0x02, 0x0b, 0x9e, 0xfa, 0x10, 0x8b, 0xfa, 0x0d, 0xde, 0xb8, 0xe4, 0xb1, 0x90, 0x49,
0x9d, 0x23, 0x67, 0xb8, 0x08, 0x41, 0x02, 0x3d, 0x1e, 0x42, 0x05, 0xd5, 0x50, 0x63, 0xbd, 0xb5,
0x4f, 0x7f, 0xdd, 0x85, 0x76, 0x34, 0x6e, 0x4d, 0x1b, 0xeb, 0x43, 0x84, 0x8b, 0x59, 0x9a, 0xb4,
0x71, 0x69, 0xaa, 0x47, 0x8f, 0x34, 0xa8, 0x52, 0x4c, 0x33, 0xc5, 0xf4, 0x3a, 0x23, 0xda, 0xc5,
0xd1, 0x7b, 0x35, 0x37, 0xfc, 0xa8, 0x22, 0x6b, 0xd6, 0x46, 0xca, 0x38, 0x2f, 0x78, 0xe8, 0x39,
0x95, 0x95, 0x1a, 0x6a, 0x94, 0x2c, 0xf5, 0x20, 0x07, 0x38, 0x2f, 0x65, 0x54, 0x56, 0xe5, 0xd4,
0xf2, 0xc2, 0xd4, 0xd3, 0x60, 0x60, 0x29, 0xa4, 0xf5, 0x82, 0x70, 0x41, 0x6d, 0x4f, 0x5c, 0x5c,
0x50, 0x6e, 0x90, 0xd6, 0x92, 0xd5, 0x7e, 0x30, 0xcd, 0xf8, 0xab, 0x1d, 0x87, 0x88, 0x5c, 0xe0,
0xb5, 0x89, 0xbf, 0x84, 0x2d, 0x69, 0x99, 0x3f, 0x82, 0xb1, 0xbd, 0xb0, 0x49, 0x67, 0x72, 0xee,
0xf6, 0xed, 0x68, 0x6c, 0xe6, 0xde, 0xc6, 0x66, 0xee, 0x39, 0x35, 0xd1, 0x28, 0x35, 0xd1, 0x6b,
0x6a, 0xa2, 0xcf, 0xd4, 0x44, 0x77, 0x27, 0xff, 0xfc, 0x6b, 0x8f, 0x55, 0xd4, 0x2d, 0xc8, 0x2f,
0x1d, 0x7d, 0x05, 0x00, 0x00, 0xff, 0xff, 0xdd, 0x37, 0xcb, 0x0e, 0xfe, 0x02, 0x00, 0x00,
}

View File

@ -3,17 +3,23 @@ syntax = "proto3";
package containerd.services.events.v1;
import "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
option go_package = "github.com/containerd/containerd/api/services/events/v1;events";
service Events {
rpc Stream(StreamEventsRequest) returns (stream Envelope);
rpc Post(PostEventRequest) returns (google.protobuf.Empty);
}
message StreamEventsRequest {}
message PostEventRequest {
Envelope envelope = 1;
}
message Envelope {
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string topic = 2;

View File

@ -583,8 +583,8 @@ func (this *RuntimeEvent) String() string {
`Type:` + fmt.Sprintf("%v", this.Type) + `,`,
`Pid:` + fmt.Sprintf("%v", this.Pid) + `,`,
`ExitStatus:` + fmt.Sprintf("%v", this.ExitStatus) + `,`,
`ExitedAt:` + strings.Replace(strings.Replace(this.ExitedAt.String(), "Timestamp", "google_protobuf2.Timestamp", 1), `&`, ``, 1) + `,`,
`Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "google_protobuf2.Timestamp", 1), `&`, ``, 1) + `,`,
`ExitedAt:` + strings.Replace(strings.Replace(this.ExitedAt.String(), "Timestamp", "google_protobuf3.Timestamp", 1), `&`, ``, 1) + `,`,
`Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "google_protobuf3.Timestamp", 1), `&`, ``, 1) + `,`,
`}`,
}, "")
return s
@ -597,7 +597,7 @@ func (this *RuntimeDelete) String() string {
`ContainerID:` + fmt.Sprintf("%v", this.ContainerID) + `,`,
`Runtime:` + fmt.Sprintf("%v", this.Runtime) + `,`,
`ExitStatus:` + fmt.Sprintf("%v", this.ExitStatus) + `,`,
`ExitedAt:` + strings.Replace(strings.Replace(this.ExitedAt.String(), "Timestamp", "google_protobuf2.Timestamp", 1), `&`, ``, 1) + `,`,
`ExitedAt:` + strings.Replace(strings.Replace(this.ExitedAt.String(), "Timestamp", "google_protobuf3.Timestamp", 1), `&`, ``, 1) + `,`,
`}`,
}, "")
return s

View File

@ -49,6 +49,10 @@ func main() {
Name: "socket,s",
Usage: "abstract socket path to serve on",
},
cli.StringFlag{
Name: "address,a",
Usage: "grpc address back to containerd",
},
}
app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {
@ -74,6 +78,7 @@ func main() {
sv, err := shim.NewService(
path,
context.GlobalString("namespace"),
context.GlobalString("address"),
)
if err != nil {
return err

View File

@ -28,7 +28,6 @@ func (s *eventSink) Write(evt goevents.Event) error {
if !ok {
return errors.New("event is not a sink event")
}
topic := getTopic(e.ctx)
ns, _ := namespaces.Namespace(e.ctx)
if ns != "" && ns != s.ns {
@ -36,6 +35,12 @@ func (s *eventSink) Write(evt goevents.Event) error {
return nil
}
if ev, ok := e.event.(*events.Envelope); ok {
s.ch <- ev
return nil
}
topic := getTopic(e.ctx)
eventData, err := typeurl.MarshalAny(e.event)
if err != nil {
return err

View File

@ -56,8 +56,8 @@ type bundle struct {
}
// NewShim connects to the shim managing the bundle and tasks
func (b *bundle) NewShim(ctx context.Context, binary string, remote bool) (*client.Client, error) {
opt := client.WithStart(binary)
func (b *bundle) NewShim(ctx context.Context, binary, grpcAddress string, remote bool) (*client.Client, error) {
opt := client.WithStart(binary, grpcAddress)
if !remote {
opt = client.WithLocal
}

View File

@ -8,7 +8,6 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"google.golang.org/grpc"
@ -84,22 +83,17 @@ func New(ic *plugin.InitContext) (interface{}, error) {
return nil, err
}
cfg := ic.Config.(*Config)
c, cancel := context.WithCancel(ic.Context)
r := &Runtime{
root: ic.Root,
remote: !cfg.NoShim,
shim: cfg.Shim,
runtime: cfg.Runtime,
events: make(chan *eventsapi.RuntimeEvent, 2048),
eventsContext: c,
eventsCancel: cancel,
monitor: monitor.(runtime.TaskMonitor),
tasks: newTaskList(),
emitter: events.GetPoster(ic.Context),
db: m.(*bolt.DB),
root: ic.Root,
remote: !cfg.NoShim,
shim: cfg.Shim,
runtime: cfg.Runtime,
monitor: monitor.(runtime.TaskMonitor),
tasks: newTaskList(),
emitter: events.GetPoster(ic.Context),
db: m.(*bolt.DB),
address: ic.Address,
}
// set the events output for a monitor if it generates events
r.monitor.Events(r.events)
tasks, err := r.restoreTasks(ic.Context)
if err != nil {
return nil, err
@ -108,9 +102,6 @@ func New(ic *plugin.InitContext) (interface{}, error) {
if err := r.tasks.addWithNamespace(t.namespace, t); err != nil {
return nil, err
}
if err := r.handleEvents(ic.Context, t.shim); err != nil {
return nil, err
}
}
return r, nil
}
@ -120,14 +111,12 @@ type Runtime struct {
shim string
runtime string
remote bool
address string
events chan *eventsapi.RuntimeEvent
eventsContext context.Context
eventsCancel func()
monitor runtime.TaskMonitor
tasks *taskList
emitter events.Poster
db *bolt.DB
monitor runtime.TaskMonitor
tasks *taskList
emitter events.Poster
db *bolt.DB
}
func (r *Runtime) ID() string {
@ -148,7 +137,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
bundle.Delete()
}
}()
s, err := bundle.NewShim(ctx, r.shim, r.remote)
s, err := bundle.NewShim(ctx, r.shim, r.address, r.remote)
if err != nil {
return nil, err
}
@ -159,9 +148,6 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
}
}
}()
if err = r.handleEvents(ctx, s); err != nil {
return nil, err
}
sopts := &shim.CreateTaskRequest{
ID: id,
Bundle: bundle.path,
@ -332,48 +318,6 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
return o, nil
}
func (r *Runtime) handleEvents(ctx context.Context, s *client.Client) error {
events, err := s.Stream(r.eventsContext, &shim.StreamEventsRequest{})
if err != nil {
return err
}
go r.forward(ctx, events)
return nil
}
// forward forwards events from a shim to the events service and monitors
func (r *Runtime) forward(ctx context.Context, events shim.Shim_StreamClient) {
for {
e, err := events.Recv()
if err != nil {
if !strings.HasSuffix(err.Error(), "transport is closing") {
log.G(r.eventsContext).WithError(err).Error("get event from shim")
}
return
}
r.events <- e
if err := r.emit(ctx, "/runtime/"+getTopic(e), e); err != nil {
return
}
}
}
func getTopic(e *eventsapi.RuntimeEvent) string {
switch e.Type {
case eventsapi.RuntimeEvent_CREATE:
return "task-create"
case eventsapi.RuntimeEvent_START:
return "task-start"
case eventsapi.RuntimeEvent_EXEC_ADDED:
return "task-execadded"
case eventsapi.RuntimeEvent_OOM:
return "task-oom"
case eventsapi.RuntimeEvent_EXIT:
return "task-exit"
}
return ""
}
func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error {
ctx = namespaces.WithNamespace(ctx, ns)
rt, err := r.getRuntime(ctx, ns, id)

View File

@ -1,4 +1,4 @@
// +build linux
// +build !windows
package shim
@ -10,7 +10,6 @@ import (
"os"
"os/exec"
"strings"
"syscall"
"time"
"golang.org/x/sys/unix"
@ -28,7 +27,7 @@ import (
type ClientOpt func(context.Context, Config) (shim.ShimClient, io.Closer, error)
// WithStart executes a new shim process
func WithStart(binary string) ClientOpt {
func WithStart(binary, address string) ClientOpt {
return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
socket, err := newSocket(config)
if err != nil {
@ -41,7 +40,7 @@ func WithStart(binary string) ClientOpt {
}
defer f.Close()
cmd := newCommand(binary, config, f)
cmd := newCommand(binary, address, config, f)
if err := reaper.Default.Start(cmd); err != nil {
return nil, nil, errors.Wrapf(err, "failed to start shim")
}
@ -56,9 +55,10 @@ func WithStart(binary string) ClientOpt {
}
}
func newCommand(binary string, config Config, socket *os.File) *exec.Cmd {
func newCommand(binary, address string, config Config, socket *os.File) *exec.Cmd {
args := []string{
"--namespace", config.Namespace,
"--address", address,
}
if config.Debug {
args = append(args, "--debug")
@ -68,11 +68,12 @@ func newCommand(binary string, config Config, socket *os.File) *exec.Cmd {
// make sure the shim can be re-parented to system init
// and is cloned in a new mount namespace because the overlay/filesystems
// will be mounted by the shim
cmd.SysProcAttr = &syscall.SysProcAttr{
Cloneflags: syscall.CLONE_NEWNS,
Setpgid: true,
}
cmd.SysProcAttr = &atter
cmd.ExtraFiles = append(cmd.ExtraFiles, socket)
if config.Debug {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
return cmd
}
@ -88,12 +89,12 @@ func newSocket(config Config) (*net.UnixListener, error) {
return l.(*net.UnixListener), nil
}
func connect(address string) (*grpc.ClientConn, error) {
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) {
gopts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithInsecure(),
grpc.WithTimeout(100 * time.Second),
grpc.WithDialer(dialer),
grpc.WithDialer(d),
grpc.FailOnNonTempDialError(true),
}
conn, err := grpc.Dial(dialAddress(address), gopts...)
@ -104,6 +105,11 @@ func connect(address string) (*grpc.ClientConn, error) {
}
func dialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout)
}
func annonDialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", "\x00"+address, timeout)
}
@ -114,7 +120,7 @@ func dialAddress(address string) string {
// WithConnect connects to an existing shim
func WithConnect(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
conn, err := connect(config.Address)
conn, err := connect(config.Address, annonDialer)
if err != nil {
return nil, nil, err
}
@ -123,7 +129,7 @@ func WithConnect(ctx context.Context, config Config) (shim.ShimClient, io.Closer
// WithLocal uses an in process shim
func WithLocal(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
service, err := NewService(config.Path, config.Namespace)
service, err := NewService(config.Path, config.Namespace, "")
if err != nil {
return nil, nil, err
}

View File

@ -0,0 +1,10 @@
// +build linux
package shim
import "syscall"
var atter = syscall.SysProcAttr{
Cloneflags: syscall.CLONE_NEWNS,
Setpgid: true,
}

View File

@ -0,0 +1,9 @@
// +build !linux,!windows
package shim
import "syscall"
var atter = syscall.SysProcAttr{
Setpgid: true,
}

View File

@ -6,6 +6,7 @@ import (
"path/filepath"
events "github.com/containerd/containerd/api/services/events/v1"
evt "github.com/containerd/containerd/events"
shimapi "github.com/containerd/containerd/linux/shim/v1"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
@ -129,3 +130,19 @@ func (e *streamEvents) SendMsg(m interface{}) error {
func (e *streamEvents) RecvMsg(m interface{}) error {
return nil
}
type poster interface {
Post(ctx context.Context, in *events.PostEventRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error)
}
type localEventsClient struct {
emitter evt.Poster
}
func (l *localEventsClient) Post(ctx context.Context, r *events.PostEventRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
ctx = evt.WithTopic(ctx, r.Envelope.Topic)
if err := l.emitter.Post(ctx, r.Envelope); err != nil {
return nil, err
}
return empty, nil
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"os"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -13,8 +14,12 @@ import (
"github.com/containerd/console"
events "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/types/task"
evt "github.com/containerd/containerd/events"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/typeurl"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"golang.org/x/net/context"
@ -29,16 +34,32 @@ var empty = &google_protobuf.Empty{}
const RuncRoot = "/run/containerd/runc"
// NewService returns a new shim service that can be used via GRPC
func NewService(path, namespace string) (*Service, error) {
func NewService(path, namespace, address string) (*Service, error) {
if namespace == "" {
return nil, fmt.Errorf("shim namespace cannot be empty")
}
return &Service{
context := namespaces.WithNamespace(context.Background(), namespace)
var client poster
if address != "" {
conn, err := connect(address, dialer)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
}
client = events.NewEventsClient(conn)
} else {
client = &localEventsClient{
emitter: evt.GetPoster(context),
}
}
s := &Service{
path: path,
processes: make(map[string]process),
events: make(chan *events.RuntimeEvent, 4096),
namespace: namespace,
}, nil
context: context,
}
go s.forward(client)
return s, nil
}
type Service struct {
@ -52,6 +73,7 @@ type Service struct {
eventsMu sync.Mutex
deferredEvent *events.RuntimeEvent
namespace string
context context.Context
}
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
@ -367,3 +389,38 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er
}
return pids, nil
}
func (s *Service) forward(client poster) {
for e := range s.events {
a, err := typeurl.MarshalAny(e)
if err != nil {
log.G(s.context).WithError(err).Error("marshal event")
continue
}
if _, err := client.Post(s.context, &events.PostEventRequest{
Envelope: &events.Envelope{
Timestamp: time.Now(),
Topic: "/runtime/" + getTopic(e),
Event: a,
},
}); err != nil {
log.G(s.context).WithError(err).Error("post event")
}
}
}
func getTopic(e *events.RuntimeEvent) string {
switch e.Type {
case events.RuntimeEvent_CREATE:
return "task-create"
case events.RuntimeEvent_START:
return "task-start"
case events.RuntimeEvent_EXEC_ADDED:
return "task-execadded"
case events.RuntimeEvent_OOM:
return "task-oom"
case events.RuntimeEvent_EXIT:
return "task-exit"
}
return "?"
}

View File

@ -7,6 +7,8 @@ import (
"github.com/containerd/cgroups"
events "github.com/containerd/containerd/api/services/events/v1"
evt "github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
metrics "github.com/docker/go-metrics"
@ -35,6 +37,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
collector: collector,
oom: oom,
context: ic.Context,
emitter: ic.Emitter,
}, nil
}
@ -42,7 +45,7 @@ type cgroupsMonitor struct {
collector *Collector
oom *OOMCollector
context context.Context
events chan<- *events.RuntimeEvent
emitter *evt.Emitter
}
func (m *cgroupsMonitor) Monitor(c runtime.Task) error {
@ -67,15 +70,13 @@ func (m *cgroupsMonitor) Stop(c runtime.Task) error {
return nil
}
func (m *cgroupsMonitor) Events(events chan<- *events.RuntimeEvent) {
m.events = events
}
func (m *cgroupsMonitor) trigger(id string, cg cgroups.Cgroup) {
m.events <- &events.RuntimeEvent{
if err := m.emitter.Post(m.context, &events.RuntimeEvent{
Timestamp: time.Now(),
Type: events.RuntimeEvent_OOM,
ID: id,
ContainerID: id,
}); err != nil {
log.G(m.context).WithError(err).Error("post OOM event")
}
}

View File

@ -19,6 +19,7 @@ func NewContext(ctx context.Context, plugins map[PluginType]map[string]interface
type InitContext struct {
Root string
Address string
Context context.Context
Config interface{}
Emitter *events.Emitter

View File

@ -1,15 +1,11 @@
package runtime
import events "github.com/containerd/containerd/api/services/events/v1"
// TaskMonitor provides an interface for monitoring of containers within containerd
type TaskMonitor interface {
// Monitor adds the provided container to the monitor
Monitor(Task) error
// Stop stops and removes the provided container from the monitor
Stop(Task) error
// Events emits events to the channel for the monitor
Events(chan<- *events.RuntimeEvent)
}
func NewMultiTaskMonitor(monitors ...TaskMonitor) TaskMonitor {
@ -33,9 +29,6 @@ func (mm *noopTaskMonitor) Stop(c Task) error {
return nil
}
func (mm *noopTaskMonitor) Events(events chan<- *events.RuntimeEvent) {
}
type multiTaskMonitor struct {
monitors []TaskMonitor
}
@ -57,9 +50,3 @@ func (mm *multiTaskMonitor) Stop(c Task) error {
}
return nil
}
func (mm *multiTaskMonitor) Events(events chan<- *events.RuntimeEvent) {
for _, m := range mm.monitors {
m.Events(events)
}
}

View File

@ -13,6 +13,7 @@ import (
containers "github.com/containerd/containerd/api/services/containers/v1"
content "github.com/containerd/containerd/api/services/content/v1"
diff "github.com/containerd/containerd/api/services/diff/v1"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
images "github.com/containerd/containerd/api/services/images/v1"
namespaces "github.com/containerd/containerd/api/services/namespaces/v1"
snapshot "github.com/containerd/containerd/api/services/snapshot/v1"
@ -68,6 +69,7 @@ func New(ctx context.Context, config *Config) (*Server, error) {
id,
)
initContext.Emitter = s.emitter
initContext.Address = config.GRPC.Address
// load the plugin specific configuration if it is provided
if p.Config != nil {
@ -203,6 +205,8 @@ func interceptor(
ctx = log.WithModule(ctx, "diff")
case namespaces.NamespacesServer:
ctx = log.WithModule(ctx, "namespaces")
case eventsapi.EventsServer:
ctx = log.WithModule(ctx, "events")
default:
log.G(ctx).Warnf("unknown GRPC server type: %#v\n", info.Server)
}

View File

@ -8,6 +8,8 @@ import (
api "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/plugin"
"github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
@ -52,3 +54,11 @@ func (s *Service) Stream(req *api.StreamEventsRequest, srv api.Events_StreamServ
}
}
}
func (s *Service) Post(ctx context.Context, r *api.PostEventRequest) (*empty.Empty, error) {
ctx = events.WithTopic(ctx, r.Envelope.Topic)
if err := s.emitter.Post(ctx, r.Envelope); err != nil {
return nil, err
}
return &empty.Empty{}, nil
}