add testcase for event.exchange and refactor
Signed-off-by: yason <yan.xuean@zte.com.cn>
This commit is contained in:
parent
89623f28b8
commit
224417aab7
@ -100,6 +100,168 @@ func TestExchangeBasic(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestExchangeFilters(t *testing.T) {
|
||||||
|
var (
|
||||||
|
ctx = namespaces.WithNamespace(context.Background(), t.Name())
|
||||||
|
exchange = NewExchange()
|
||||||
|
|
||||||
|
// config events, All events will be published
|
||||||
|
containerCreateEvents = []events.Event{
|
||||||
|
&eventstypes.ContainerCreate{ID: "asdf"},
|
||||||
|
&eventstypes.ContainerCreate{ID: "qwer"},
|
||||||
|
&eventstypes.ContainerCreate{ID: "zxcv"},
|
||||||
|
}
|
||||||
|
taskExitEvents = []events.Event{
|
||||||
|
&eventstypes.TaskExit{ContainerID: "abcdef"},
|
||||||
|
}
|
||||||
|
testEventSets = []struct {
|
||||||
|
topic string
|
||||||
|
events []events.Event
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
topic: "/containers/create",
|
||||||
|
events: containerCreateEvents,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
topic: "/tasks/exit",
|
||||||
|
events: taskExitEvents,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
allTestEvents = func(eventSets []struct {
|
||||||
|
topic string
|
||||||
|
events []events.Event
|
||||||
|
}) (events []events.Event) {
|
||||||
|
for _, v := range eventSets {
|
||||||
|
events = append(events, v.events...)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}(testEventSets)
|
||||||
|
|
||||||
|
// config test cases
|
||||||
|
testCases = []struct {
|
||||||
|
testName string
|
||||||
|
filters []string
|
||||||
|
|
||||||
|
// The fields as below are for store data. Don't config them.
|
||||||
|
expectEvents []events.Event
|
||||||
|
eventq <-chan *events.Envelope
|
||||||
|
errq <-chan error
|
||||||
|
cancel func()
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
testName: "No Filter",
|
||||||
|
expectEvents: allTestEvents,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
testName: "Filter events by one topic",
|
||||||
|
filters: []string{
|
||||||
|
`topic=="/containers/create"`,
|
||||||
|
},
|
||||||
|
expectEvents: containerCreateEvents,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
testName: "Filter events by field",
|
||||||
|
filters: []string{
|
||||||
|
"event.id",
|
||||||
|
},
|
||||||
|
expectEvents: containerCreateEvents,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
testName: "Filter events by field OR topic",
|
||||||
|
filters: []string{
|
||||||
|
`topic=="/containers/create"`,
|
||||||
|
"event.id",
|
||||||
|
},
|
||||||
|
expectEvents: containerCreateEvents,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
testName: "Filter events by regex ",
|
||||||
|
filters: []string{
|
||||||
|
`topic~="/containers/*"`,
|
||||||
|
},
|
||||||
|
expectEvents: containerCreateEvents,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
testName: "Filter events for by anyone of two topics",
|
||||||
|
filters: []string{
|
||||||
|
`topic=="/tasks/exit"`,
|
||||||
|
`topic=="/containers/create"`,
|
||||||
|
},
|
||||||
|
expectEvents: append(containerCreateEvents, taskExitEvents...),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
testName: "Filter events for by one topic AND id",
|
||||||
|
filters: []string{
|
||||||
|
`topic=="/containers/create",event.id=="qwer"`,
|
||||||
|
},
|
||||||
|
expectEvents: []events.Event{
|
||||||
|
&eventstypes.ContainerCreate{ID: "qwer"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
t.Log("subscribe")
|
||||||
|
for i := range testCases {
|
||||||
|
var ctx1 context.Context
|
||||||
|
ctx1, testCases[i].cancel = context.WithCancel(ctx)
|
||||||
|
testCases[i].eventq, testCases[i].errq = exchange.Subscribe(ctx1, testCases[i].filters...)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log("publish")
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
errChan := make(chan error)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
defer close(errChan)
|
||||||
|
for _, es := range testEventSets {
|
||||||
|
for _, e := range es.events {
|
||||||
|
if err := exchange.Publish(ctx, es.topic, e); err != nil {
|
||||||
|
errChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log("finished publishing")
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.Log("waiting")
|
||||||
|
wg.Wait()
|
||||||
|
if err := <-errChan; err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log("receive event")
|
||||||
|
for _, subscriber := range testCases {
|
||||||
|
t.Logf("test case: %q", subscriber.testName)
|
||||||
|
var received []events.Event
|
||||||
|
subscribercheck:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case env := <-subscriber.eventq:
|
||||||
|
ev, err := typeurl.UnmarshalAny(env.Event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
received = append(received, ev)
|
||||||
|
case err := <-subscriber.errq:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
break subscribercheck
|
||||||
|
}
|
||||||
|
|
||||||
|
if reflect.DeepEqual(received, subscriber.expectEvents) {
|
||||||
|
// when we do this, we expect the errs channel to be closed and
|
||||||
|
// this will return.
|
||||||
|
subscriber.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestExchangeValidateTopic(t *testing.T) {
|
func TestExchangeValidateTopic(t *testing.T) {
|
||||||
namespace := t.Name()
|
namespace := t.Name()
|
||||||
ctx := namespaces.WithNamespace(context.Background(), namespace)
|
ctx := namespaces.WithNamespace(context.Background(), namespace)
|
||||||
|
Loading…
Reference in New Issue
Block a user