Files
containerd/events/exchange/exchange_test.go
2017-12-13 12:15:50 +08:00

319 lines
7.0 KiB
Go

package exchange
import (
"context"
"reflect"
"sync"
"testing"
"time"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"
"github.com/pkg/errors"
)
func TestExchangeBasic(t *testing.T) {
ctx := namespaces.WithNamespace(context.Background(), t.Name())
testevents := []events.Event{
&eventstypes.ContainerCreate{ID: "asdf"},
&eventstypes.ContainerCreate{ID: "qwer"},
&eventstypes.ContainerCreate{ID: "zxcv"},
}
exchange := NewExchange()
t.Log("subscribe")
var cancel1, cancel2 func()
// Create two subscribers for same set of events and make sure they
// traverse the exchange.
ctx1, cancel1 := context.WithCancel(ctx)
eventq1, errq1 := exchange.Subscribe(ctx1)
ctx2, cancel2 := context.WithCancel(ctx)
eventq2, errq2 := exchange.Subscribe(ctx2)
t.Log("publish")
var wg sync.WaitGroup
wg.Add(1)
errChan := make(chan error)
go func() {
defer wg.Done()
defer close(errChan)
for _, event := range testevents {
if err := exchange.Publish(ctx, "/test", event); err != nil {
errChan <- err
return
}
}
t.Log("finished publishing")
}()
t.Log("waiting")
wg.Wait()
if err := <-errChan; err != nil {
t.Fatal(err)
}
for _, subscriber := range []struct {
eventq <-chan *events.Envelope
errq <-chan error
cancel func()
}{
{
eventq: eventq1,
errq: errq1,
cancel: cancel1,
},
{
eventq: eventq2,
errq: errq2,
cancel: cancel2,
},
} {
var received []events.Event
subscribercheck:
for {
select {
case env := <-subscriber.eventq:
ev, err := typeurl.UnmarshalAny(env.Event)
if err != nil {
t.Fatal(err)
}
received = append(received, ev.(*eventstypes.ContainerCreate))
case err := <-subscriber.errq:
if err != nil {
t.Fatal(err)
}
break subscribercheck
}
if reflect.DeepEqual(received, testevents) {
// when we do this, we expect the errs channel to be closed and
// this will return.
subscriber.cancel()
}
}
}
}
func TestExchangeFilters(t *testing.T) {
var (
ctx = namespaces.WithNamespace(context.Background(), t.Name())
exchange = NewExchange()
// config events, All events will be published
containerCreateEvents = []events.Event{
&eventstypes.ContainerCreate{ID: "asdf"},
&eventstypes.ContainerCreate{ID: "qwer"},
&eventstypes.ContainerCreate{ID: "zxcv"},
}
taskExitEvents = []events.Event{
&eventstypes.TaskExit{ContainerID: "abcdef"},
}
testEventSets = []struct {
topic string
events []events.Event
}{
{
topic: "/containers/create",
events: containerCreateEvents,
},
{
topic: "/tasks/exit",
events: taskExitEvents,
},
}
allTestEvents = func(eventSets []struct {
topic string
events []events.Event
}) (events []events.Event) {
for _, v := range eventSets {
events = append(events, v.events...)
}
return
}(testEventSets)
// config test cases
testCases = []struct {
testName string
filters []string
// The fields as below are for store data. Don't config them.
expectEvents []events.Event
eventq <-chan *events.Envelope
errq <-chan error
cancel func()
}{
{
testName: "No Filter",
expectEvents: allTestEvents,
},
{
testName: "Filter events by one topic",
filters: []string{
`topic=="/containers/create"`,
},
expectEvents: containerCreateEvents,
},
{
testName: "Filter events by field",
filters: []string{
"event.id",
},
expectEvents: containerCreateEvents,
},
{
testName: "Filter events by field OR topic",
filters: []string{
`topic=="/containers/create"`,
"event.id",
},
expectEvents: containerCreateEvents,
},
{
testName: "Filter events by regex ",
filters: []string{
`topic~="/containers/*"`,
},
expectEvents: containerCreateEvents,
},
{
testName: "Filter events for by anyone of two topics",
filters: []string{
`topic=="/tasks/exit"`,
`topic=="/containers/create"`,
},
expectEvents: append(containerCreateEvents, taskExitEvents...),
},
{
testName: "Filter events for by one topic AND id",
filters: []string{
`topic=="/containers/create",event.id=="qwer"`,
},
expectEvents: []events.Event{
&eventstypes.ContainerCreate{ID: "qwer"},
},
},
}
)
t.Log("subscribe")
for i := range testCases {
var ctx1 context.Context
ctx1, testCases[i].cancel = context.WithCancel(ctx)
testCases[i].eventq, testCases[i].errq = exchange.Subscribe(ctx1, testCases[i].filters...)
}
t.Log("publish")
var wg sync.WaitGroup
wg.Add(1)
errChan := make(chan error)
go func() {
defer wg.Done()
defer close(errChan)
for _, es := range testEventSets {
for _, e := range es.events {
if err := exchange.Publish(ctx, es.topic, e); err != nil {
errChan <- err
return
}
}
}
t.Log("finished publishing")
}()
t.Log("waiting")
wg.Wait()
if err := <-errChan; err != nil {
t.Fatal(err)
}
t.Log("receive event")
for _, subscriber := range testCases {
t.Logf("test case: %q", subscriber.testName)
var received []events.Event
subscribercheck:
for {
select {
case env := <-subscriber.eventq:
ev, err := typeurl.UnmarshalAny(env.Event)
if err != nil {
t.Fatal(err)
}
received = append(received, ev)
case err := <-subscriber.errq:
if err != nil {
t.Fatal(err)
}
break subscribercheck
}
if reflect.DeepEqual(received, subscriber.expectEvents) {
// when we do this, we expect the errs channel to be closed and
// this will return.
subscriber.cancel()
}
}
}
}
func TestExchangeValidateTopic(t *testing.T) {
namespace := t.Name()
ctx := namespaces.WithNamespace(context.Background(), namespace)
exchange := NewExchange()
for _, testcase := range []struct {
input string
err error
}{
{
input: "/test",
},
{
input: "/test/test",
},
{
input: "test",
err: errdefs.ErrInvalidArgument,
},
} {
t.Run(testcase.input, func(t *testing.T) {
event := &eventstypes.ContainerCreate{ID: t.Name()}
if err := exchange.Publish(ctx, testcase.input, event); errors.Cause(err) != testcase.err {
if err == nil {
t.Fatalf("expected error %v, received nil", testcase.err)
} else {
t.Fatalf("expected error %v, received %v", testcase.err, err)
}
}
evany, err := typeurl.MarshalAny(event)
if err != nil {
t.Fatal(err)
}
envelope := events.Envelope{
Timestamp: time.Now().UTC(),
Namespace: namespace,
Topic: testcase.input,
Event: evany,
}
// make sure we get same errors with forward.
if err := exchange.Forward(ctx, &envelope); errors.Cause(err) != testcase.err {
if err == nil {
t.Fatalf("expected error %v, received nil", testcase.err)
} else {
t.Fatalf("expected error %v, received %v", testcase.err, err)
}
}
})
}
}