From 224417aab7124423a24c3dbdf081cc40b0cd34d8 Mon Sep 17 00:00:00 2001 From: yason Date: Fri, 8 Dec 2017 00:03:01 +0800 Subject: [PATCH] add testcase for event.exchange and refactor Signed-off-by: yason --- events/exchange/exchange_test.go | 162 +++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) diff --git a/events/exchange/exchange_test.go b/events/exchange/exchange_test.go index bb004e810..f03809e59 100644 --- a/events/exchange/exchange_test.go +++ b/events/exchange/exchange_test.go @@ -100,6 +100,168 @@ func TestExchangeBasic(t *testing.T) { } } +func TestExchangeFilters(t *testing.T) { + var ( + ctx = namespaces.WithNamespace(context.Background(), t.Name()) + exchange = NewExchange() + + // config events, All events will be published + containerCreateEvents = []events.Event{ + &eventstypes.ContainerCreate{ID: "asdf"}, + &eventstypes.ContainerCreate{ID: "qwer"}, + &eventstypes.ContainerCreate{ID: "zxcv"}, + } + taskExitEvents = []events.Event{ + &eventstypes.TaskExit{ContainerID: "abcdef"}, + } + testEventSets = []struct { + topic string + events []events.Event + }{ + { + topic: "/containers/create", + events: containerCreateEvents, + }, + { + topic: "/tasks/exit", + events: taskExitEvents, + }, + } + allTestEvents = func(eventSets []struct { + topic string + events []events.Event + }) (events []events.Event) { + for _, v := range eventSets { + events = append(events, v.events...) + } + return + }(testEventSets) + + // config test cases + testCases = []struct { + testName string + filters []string + + // The fields as below are for store data. Don't config them. + expectEvents []events.Event + eventq <-chan *events.Envelope + errq <-chan error + cancel func() + }{ + { + testName: "No Filter", + expectEvents: allTestEvents, + }, + { + testName: "Filter events by one topic", + filters: []string{ + `topic=="/containers/create"`, + }, + expectEvents: containerCreateEvents, + }, + { + testName: "Filter events by field", + filters: []string{ + "event.id", + }, + expectEvents: containerCreateEvents, + }, + { + testName: "Filter events by field OR topic", + filters: []string{ + `topic=="/containers/create"`, + "event.id", + }, + expectEvents: containerCreateEvents, + }, + { + testName: "Filter events by regex ", + filters: []string{ + `topic~="/containers/*"`, + }, + expectEvents: containerCreateEvents, + }, + { + testName: "Filter events for by anyone of two topics", + filters: []string{ + `topic=="/tasks/exit"`, + `topic=="/containers/create"`, + }, + expectEvents: append(containerCreateEvents, taskExitEvents...), + }, + { + testName: "Filter events for by one topic AND id", + filters: []string{ + `topic=="/containers/create",event.id=="qwer"`, + }, + expectEvents: []events.Event{ + &eventstypes.ContainerCreate{ID: "qwer"}, + }, + }, + } + ) + + t.Log("subscribe") + for i := range testCases { + var ctx1 context.Context + ctx1, testCases[i].cancel = context.WithCancel(ctx) + testCases[i].eventq, testCases[i].errq = exchange.Subscribe(ctx1, testCases[i].filters...) + } + + t.Log("publish") + var wg sync.WaitGroup + wg.Add(1) + errChan := make(chan error) + go func() { + defer wg.Done() + defer close(errChan) + for _, es := range testEventSets { + for _, e := range es.events { + if err := exchange.Publish(ctx, es.topic, e); err != nil { + errChan <- err + return + } + } + } + + t.Log("finished publishing") + }() + + t.Log("waiting") + wg.Wait() + if err := <-errChan; err != nil { + t.Fatal(err) + } + + t.Log("receive event") + for _, subscriber := range testCases { + t.Logf("test case: %q", subscriber.testName) + var received []events.Event + subscribercheck: + for { + select { + case env := <-subscriber.eventq: + ev, err := typeurl.UnmarshalAny(env.Event) + if err != nil { + t.Fatal(err) + } + received = append(received, ev) + case err := <-subscriber.errq: + if err != nil { + t.Fatal(err) + } + break subscribercheck + } + + if reflect.DeepEqual(received, subscriber.expectEvents) { + // when we do this, we expect the errs channel to be closed and + // this will return. + subscriber.cancel() + } + } + } +} + func TestExchangeValidateTopic(t *testing.T) { namespace := t.Name() ctx := namespaces.WithNamespace(context.Background(), namespace)