
This change further plumbs the components required for implementing event filters. Specifically, we now have the ability to filter on the `topic` and `namespace`. In the course of implementing this functionality, it was found that there were mismatches in the events API that created extra serialization round trips. A modification to `typeurl.MarshalAny` and a clear separation between publishing and forwarding allow us to avoid these serialization issues. Unfortunately, this has required a few tweaks to the GRPC API, so this is a breaking change. `Publish` and `Forward` have been clearly separated in the GRPC API. `Publish` honors the contextual namespace and performs timestamping while `Forward` simply validates and forwards. The behavior of `Subscribe` is to propagate events for all namespaces unless specifically filtered (and hence the relation to this particular change. The following is an example of using filters to monitor the task events generated while running the [bucketbench tool](https://github.com/estesp/bucketbench): ``` $ ctr events 'topic~=/tasks/.+,namespace==bb' ... 2017-07-28 22:19:51.78944874 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-6-8","pid":25889} 2017-07-28 22:19:51.791893688 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-4-8","pid":25882} 2017-07-28 22:19:51.792608389 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-2-9","pid":25860} 2017-07-28 22:19:51.793035217 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-5-6","pid":25869} 2017-07-28 22:19:51.802659622 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-0-7","pid":25877} 2017-07-28 22:19:51.805192898 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-3-6","pid":25856} 2017-07-28 22:19:51.832374931 +0000 UTC bb /tasks/exit {"container_id":"bb-ctr-8-6","id":"bb-ctr-8-6","pid":25864,"exited_at":"2017-07-28T22:19:51.832013043Z"} 2017-07-28 22:19:51.84001249 +0000 UTC bb /tasks/exit {"container_id":"bb-ctr-2-9","id":"bb-ctr-2-9","pid":25860,"exited_at":"2017-07-28T22:19:51.839717714Z"} 2017-07-28 22:19:51.840272635 +0000 UTC bb /tasks/exit {"container_id":"bb-ctr-7-6","id":"bb-ctr-7-6","pid":25855,"exited_at":"2017-07-28T22:19:51.839796335Z"} ... ``` In addition to the events changes, we now display the namespace origin of the event in the cli tool. This will be followed by a PR to add individual field filtering for the events API for each event type. Signed-off-by: Stephen J Day <stephen.day@docker.com>
153 lines
3.3 KiB
Go
153 lines
3.3 KiB
Go
package events
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
events "github.com/containerd/containerd/api/services/events/v1"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/namespaces"
|
|
"github.com/containerd/containerd/typeurl"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
func TestExchangeBasic(t *testing.T) {
|
|
ctx := namespaces.WithNamespace(context.Background(), t.Name())
|
|
testevents := []Event{
|
|
&events.ContainerCreate{ID: "asdf"},
|
|
&events.ContainerCreate{ID: "qwer"},
|
|
&events.ContainerCreate{ID: "zxcv"},
|
|
}
|
|
exchange := NewExchange()
|
|
|
|
t.Log("subscribe")
|
|
var cancel1, cancel2 func()
|
|
|
|
// Create two subscribers for same set of events and make sure they
|
|
// traverse the exchange.
|
|
ctx1, cancel1 := context.WithCancel(ctx)
|
|
eventq1, errq1 := exchange.Subscribe(ctx1)
|
|
|
|
ctx2, cancel2 := context.WithCancel(ctx)
|
|
eventq2, errq2 := exchange.Subscribe(ctx2)
|
|
|
|
t.Log("publish")
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for _, event := range testevents {
|
|
fmt.Println("publish", event)
|
|
if err := exchange.Publish(ctx, "/test", event); err != nil {
|
|
fmt.Println("publish error", err)
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
t.Log("finished publishing")
|
|
}()
|
|
|
|
t.Log("waiting")
|
|
wg.Wait()
|
|
|
|
for _, subscriber := range []struct {
|
|
eventq <-chan *events.Envelope
|
|
errq <-chan error
|
|
cancel func()
|
|
}{
|
|
{
|
|
eventq: eventq1,
|
|
errq: errq1,
|
|
cancel: cancel1,
|
|
},
|
|
{
|
|
eventq: eventq2,
|
|
errq: errq2,
|
|
cancel: cancel2,
|
|
},
|
|
} {
|
|
var received []Event
|
|
subscribercheck:
|
|
for {
|
|
select {
|
|
case env := <-subscriber.eventq:
|
|
ev, err := typeurl.UnmarshalAny(env.Event)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
received = append(received, ev.(*events.ContainerCreate))
|
|
case err := <-subscriber.errq:
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
break subscribercheck
|
|
}
|
|
|
|
if reflect.DeepEqual(received, testevents) {
|
|
// when we do this, we expect the errs channel to be closed and
|
|
// this will return.
|
|
subscriber.cancel()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestExchangeValidateTopic(t *testing.T) {
|
|
namespace := t.Name()
|
|
ctx := namespaces.WithNamespace(context.Background(), namespace)
|
|
exchange := NewExchange()
|
|
|
|
for _, testcase := range []struct {
|
|
input string
|
|
err error
|
|
}{
|
|
{
|
|
input: "/test",
|
|
},
|
|
{
|
|
input: "/test/test",
|
|
},
|
|
{
|
|
input: "test",
|
|
err: errdefs.ErrInvalidArgument,
|
|
},
|
|
} {
|
|
t.Run(testcase.input, func(t *testing.T) {
|
|
event := &events.ContainerCreate{ID: t.Name()}
|
|
if err := exchange.Publish(ctx, testcase.input, event); errors.Cause(err) != testcase.err {
|
|
if err == nil {
|
|
t.Fatalf("expected error %v, received nil", testcase.err)
|
|
} else {
|
|
t.Fatalf("expected error %v, received %v", testcase.err, err)
|
|
}
|
|
}
|
|
|
|
evany, err := typeurl.MarshalAny(event)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
envelope := events.Envelope{
|
|
Timestamp: time.Now().UTC(),
|
|
Namespace: namespace,
|
|
Topic: testcase.input,
|
|
Event: evany,
|
|
}
|
|
|
|
// make sure we get same errors with forward.
|
|
if err := exchange.Forward(ctx, &envelope); errors.Cause(err) != testcase.err {
|
|
if err == nil {
|
|
t.Fatalf("expected error %v, received nil", testcase.err)
|
|
} else {
|
|
t.Fatalf("expected error %v, received %v", testcase.err, err)
|
|
}
|
|
}
|
|
|
|
})
|
|
}
|
|
}
|