Merge pull request #9891 from dmcgowan/move-events-plugin

Move events to plugins and core
This commit is contained in:
Fu Wei
2024-02-29 12:45:25 +00:00
committed by GitHub
30 changed files with 35 additions and 34 deletions

80
core/events/events.go Normal file
View File

@@ -0,0 +1,80 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"context"
"time"
"github.com/containerd/typeurl/v2"
)
// Envelope provides the packaging for an event.
type Envelope struct {
Timestamp time.Time
Namespace string
Topic string
Event typeurl.Any
}
// Field returns the value for the given fieldpath as a string, if defined.
// If the value is not defined, the second value will be false.
func (e *Envelope) Field(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
// unhandled: timestamp
case "namespace":
return e.Namespace, len(e.Namespace) > 0
case "topic":
return e.Topic, len(e.Topic) > 0
case "event":
decoded, err := typeurl.UnmarshalAny(e.Event)
if err != nil {
return "", false
}
adaptor, ok := decoded.(interface {
Field([]string) (string, bool)
})
if !ok {
return "", false
}
return adaptor.Field(fieldpath[1:])
}
return "", false
}
// Event is a generic interface for any type of event
type Event interface{}
// Publisher posts the event.
type Publisher interface {
Publish(ctx context.Context, topic string, event Event) error
}
// Forwarder forwards an event to the underlying event bus
type Forwarder interface {
Forward(ctx context.Context, envelope *Envelope) error
}
// Subscriber allows callers to subscribe to events
type Subscriber interface {
Subscribe(ctx context.Context, filters ...string) (ch <-chan *Envelope, errs <-chan error)
}

View File

@@ -0,0 +1,248 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exchange
import (
"context"
"fmt"
"strings"
"time"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/pkg/filters"
"github.com/containerd/containerd/v2/pkg/identifiers"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/typeurl/v2"
goevents "github.com/docker/go-events"
)
// Exchange broadcasts events
type Exchange struct {
broadcaster *goevents.Broadcaster
}
// NewExchange returns a new event Exchange
func NewExchange() *Exchange {
return &Exchange{
broadcaster: goevents.NewBroadcaster(),
}
}
var _ events.Publisher = &Exchange{}
var _ events.Forwarder = &Exchange{}
var _ events.Subscriber = &Exchange{}
// Forward accepts an envelope to be directly distributed on the exchange.
//
// This is useful when an event is forwarded on behalf of another namespace or
// when the event is propagated on behalf of another publisher.
func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) {
if err := validateEnvelope(envelope); err != nil {
return err
}
defer func() {
logger := log.G(ctx).WithFields(log.Fields{
"topic": envelope.Topic,
"ns": envelope.Namespace,
"type": envelope.Event.GetTypeUrl(),
})
if err != nil {
logger.WithError(err).Error("error forwarding event")
} else {
logger.Trace("event forwarded")
}
}()
return e.broadcaster.Write(envelope)
}
// Publish packages and sends an event. The caller will be considered the
// initial publisher of the event. This means the timestamp will be calculated
// at this point and this method may read from the calling context.
func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event) (err error) {
var (
namespace string
envelope events.Envelope
)
namespace, err = namespaces.NamespaceRequired(ctx)
if err != nil {
return fmt.Errorf("failed publishing event: %w", err)
}
if err := validateTopic(topic); err != nil {
return fmt.Errorf("envelope topic %q: %w", topic, err)
}
encoded, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
envelope.Timestamp = time.Now().UTC()
envelope.Namespace = namespace
envelope.Topic = topic
envelope.Event = encoded
defer func() {
logger := log.G(ctx).WithFields(log.Fields{
"topic": envelope.Topic,
"ns": envelope.Namespace,
"type": envelope.Event.GetTypeUrl(),
})
if err != nil {
logger.WithError(err).Error("error publishing event")
} else {
logger.Trace("event published")
}
}()
return e.broadcaster.Write(&envelope)
}
// Subscribe to events on the exchange. Events are sent through the returned
// channel ch. If an error is encountered, it will be sent on channel errs and
// errs will be closed. To end the subscription, cancel the provided context.
//
// Zero or more filters may be provided as strings. Only events that match
// *any* of the provided filters will be sent on the channel. The filters use
// the standard containerd filters package syntax.
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) {
var (
evch = make(chan *events.Envelope)
errq = make(chan error, 1)
channel = goevents.NewChannel(0)
queue = goevents.NewQueue(channel)
dst goevents.Sink = queue
)
closeAll := func() {
channel.Close()
queue.Close()
e.broadcaster.Remove(dst)
close(errq)
}
ch = evch
errs = errq
if len(fs) > 0 {
filter, err := filters.ParseAll(fs...)
if err != nil {
errq <- fmt.Errorf("failed parsing subscription filters: %w", err)
closeAll()
return
}
dst = goevents.NewFilter(queue, goevents.MatcherFunc(func(gev goevents.Event) bool {
return filter.Match(adapt(gev))
}))
}
e.broadcaster.Add(dst)
go func() {
defer closeAll()
var err error
loop:
for {
select {
case ev := <-channel.C:
env, ok := ev.(*events.Envelope)
if !ok {
// TODO(stevvooe): For the most part, we are well protected
// from this condition. Both Forward and Publish protect
// from this.
err = fmt.Errorf("invalid envelope encountered %#v; please file a bug", ev)
break
}
select {
case evch <- env:
case <-ctx.Done():
break loop
}
case <-ctx.Done():
break loop
}
}
if err == nil {
if cerr := ctx.Err(); cerr != context.Canceled {
err = cerr
}
}
errq <- err
}()
return
}
func validateTopic(topic string) error {
if topic == "" {
return fmt.Errorf("must not be empty: %w", errdefs.ErrInvalidArgument)
}
if topic[0] != '/' {
return fmt.Errorf("must start with '/': %w", errdefs.ErrInvalidArgument)
}
if len(topic) == 1 {
return fmt.Errorf("must have at least one component: %w", errdefs.ErrInvalidArgument)
}
components := strings.Split(topic[1:], "/")
for _, component := range components {
if err := identifiers.Validate(component); err != nil {
return fmt.Errorf("failed validation on component %q: %w", component, err)
}
}
return nil
}
func validateEnvelope(envelope *events.Envelope) error {
if err := identifiers.Validate(envelope.Namespace); err != nil {
return fmt.Errorf("event envelope has invalid namespace: %w", err)
}
if err := validateTopic(envelope.Topic); err != nil {
return fmt.Errorf("envelope topic %q: %w", envelope.Topic, err)
}
if envelope.Timestamp.IsZero() {
return fmt.Errorf("timestamp must be set on forwarded event: %w", errdefs.ErrInvalidArgument)
}
return nil
}
func adapt(ev interface{}) filters.Adaptor {
if adaptor, ok := ev.(filters.Adaptor); ok {
return adaptor
}
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
return "", false
})
}

View File

@@ -0,0 +1,326 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exchange
import (
"context"
"errors"
"testing"
"time"
eventstypes "github.com/containerd/containerd/v2/api/events"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/protobuf"
"github.com/containerd/errdefs"
"github.com/containerd/typeurl/v2"
"github.com/google/go-cmp/cmp"
)
func TestExchangeBasic(t *testing.T) {
ctx := namespaces.WithNamespace(context.Background(), t.Name())
testevents := []events.Event{
&eventstypes.ContainerCreate{ID: "asdf"},
&eventstypes.ContainerCreate{ID: "qwer"},
&eventstypes.ContainerCreate{ID: "zxcv"},
}
exchange := NewExchange()
t.Log("subscribe")
var cancel1, cancel2 func()
// Create two subscribers for same set of events and make sure they
// traverse the exchange.
ctx1, cancel1 := context.WithCancel(ctx)
eventq1, errq1 := exchange.Subscribe(ctx1)
ctx2, cancel2 := context.WithCancel(ctx)
eventq2, errq2 := exchange.Subscribe(ctx2)
t.Log("publish")
errChan := make(chan error)
go func() {
defer close(errChan)
for _, event := range testevents {
if err := exchange.Publish(ctx, "/test", event); err != nil {
errChan <- err
return
}
}
t.Log("finished publishing")
}()
t.Log("waiting")
if err := <-errChan; err != nil {
t.Fatal(err)
}
for _, subscriber := range []struct {
eventq <-chan *events.Envelope
errq <-chan error
cancel func()
}{
{
eventq: eventq1,
errq: errq1,
cancel: cancel1,
},
{
eventq: eventq2,
errq: errq2,
cancel: cancel2,
},
} {
var received []events.Event
subscribercheck:
for {
select {
case env := <-subscriber.eventq:
ev, err := typeurl.UnmarshalAny(env.Event)
if err != nil {
t.Fatal(err)
}
received = append(received, ev.(*eventstypes.ContainerCreate))
case err := <-subscriber.errq:
if err != nil {
t.Fatal(err)
}
break subscribercheck
}
if cmp.Equal(received, testevents, protobuf.Compare) {
// when we do this, we expect the errs channel to be closed and
// this will return.
subscriber.cancel()
}
}
}
}
func TestExchangeFilters(t *testing.T) {
var (
ctx = namespaces.WithNamespace(context.Background(), t.Name())
exchange = NewExchange()
// config events, All events will be published
containerCreateEvents = []events.Event{
&eventstypes.ContainerCreate{ID: "asdf"},
&eventstypes.ContainerCreate{ID: "qwer"},
&eventstypes.ContainerCreate{ID: "zxcv"},
}
taskExitEvents = []events.Event{
&eventstypes.TaskExit{ContainerID: "abcdef"},
}
testEventSets = []struct {
topic string
events []events.Event
}{
{
topic: "/containers/create",
events: containerCreateEvents,
},
{
topic: "/tasks/exit",
events: taskExitEvents,
},
}
allTestEvents = func(eventSets []struct {
topic string
events []events.Event
}) (events []events.Event) {
for _, v := range eventSets {
events = append(events, v.events...)
}
return
}(testEventSets)
// config test cases
testCases = []struct {
testName string
filters []string
// The fields as below are for store data. Don't config them.
expectEvents []events.Event
eventq <-chan *events.Envelope
errq <-chan error
cancel func()
}{
{
testName: "No Filter",
expectEvents: allTestEvents,
},
{
testName: "Filter events by one topic",
filters: []string{
`topic=="/containers/create"`,
},
expectEvents: containerCreateEvents,
},
{
testName: "Filter events by field",
filters: []string{
"event.id",
},
expectEvents: containerCreateEvents,
},
{
testName: "Filter events by field OR topic",
filters: []string{
`topic=="/containers/create"`,
"event.id",
},
expectEvents: containerCreateEvents,
},
{
testName: "Filter events by regex ",
filters: []string{
`topic~="/containers/*"`,
},
expectEvents: containerCreateEvents,
},
{
testName: "Filter events for by anyone of two topics",
filters: []string{
`topic=="/tasks/exit"`,
`topic=="/containers/create"`,
},
expectEvents: append(containerCreateEvents, taskExitEvents...),
},
{
testName: "Filter events for by one topic AND id",
filters: []string{
`topic=="/containers/create",event.id=="qwer"`,
},
expectEvents: []events.Event{
&eventstypes.ContainerCreate{ID: "qwer"},
},
},
}
)
t.Log("subscribe")
for i := range testCases {
var ctx1 context.Context
ctx1, testCases[i].cancel = context.WithCancel(ctx)
testCases[i].eventq, testCases[i].errq = exchange.Subscribe(ctx1, testCases[i].filters...)
}
t.Log("publish")
errChan := make(chan error)
go func() {
defer close(errChan)
for _, es := range testEventSets {
for _, e := range es.events {
if err := exchange.Publish(ctx, es.topic, e); err != nil {
errChan <- err
return
}
}
}
t.Log("finished publishing")
}()
t.Log("waiting")
if err := <-errChan; err != nil {
t.Fatal(err)
}
t.Log("receive event")
for _, subscriber := range testCases {
t.Logf("test case: %q", subscriber.testName)
var received []events.Event
subscribercheck:
for {
select {
case env := <-subscriber.eventq:
ev, err := typeurl.UnmarshalAny(env.Event)
if err != nil {
t.Fatal(err)
}
received = append(received, ev)
case err := <-subscriber.errq:
if err != nil {
t.Fatal(err)
}
break subscribercheck
}
if cmp.Equal(received, subscriber.expectEvents, protobuf.Compare) {
// when we do this, we expect the errs channel to be closed and
// this will return.
subscriber.cancel()
}
}
}
}
func TestExchangeValidateTopic(t *testing.T) {
namespace := t.Name()
ctx := namespaces.WithNamespace(context.Background(), namespace)
exchange := NewExchange()
for _, testcase := range []struct {
input string
err error
}{
{
input: "/test",
},
{
input: "/test/test",
},
{
input: "test",
err: errdefs.ErrInvalidArgument,
},
} {
t.Run(testcase.input, func(t *testing.T) {
event := &eventstypes.ContainerCreate{ID: t.Name()}
if err := exchange.Publish(ctx, testcase.input, event); !errors.Is(err, testcase.err) {
if err == nil {
t.Fatalf("expected error %v, received nil", testcase.err)
} else {
t.Fatalf("expected error %v, received %v", testcase.err, err)
}
}
evany, err := typeurl.MarshalAny(event)
if err != nil {
t.Fatal(err)
}
envelope := events.Envelope{
Timestamp: time.Now().UTC(),
Namespace: namespace,
Topic: testcase.input,
Event: evany,
}
// make sure we get same errors with forward.
if err := exchange.Forward(ctx, &envelope); !errors.Is(err, testcase.err) {
if err == nil {
t.Fatalf("expected error %v, received nil", testcase.err)
} else {
t.Fatalf("expected error %v, received %v", testcase.err, err)
}
}
})
}
}

View File

@@ -28,9 +28,9 @@ import (
eventstypes "github.com/containerd/containerd/v2/api/events"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/core/snapshots"
"github.com/containerd/containerd/v2/internal/cleanup"
"github.com/containerd/containerd/v2/pkg/events"
"github.com/containerd/containerd/v2/pkg/gc"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/log"

View File

@@ -22,10 +22,10 @@ import (
"context"
"github.com/containerd/cgroups/v3"
"github.com/containerd/containerd/v2/core/events"
v1 "github.com/containerd/containerd/v2/core/metrics/cgroups/v1"
v2 "github.com/containerd/containerd/v2/core/metrics/cgroups/v2"
"github.com/containerd/containerd/v2/core/runtime"
"github.com/containerd/containerd/v2/pkg/events"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/version"
"github.com/containerd/platforms"

View File

@@ -23,8 +23,8 @@ import (
cgroups "github.com/containerd/cgroups/v3/cgroup1"
eventstypes "github.com/containerd/containerd/v2/api/events"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/core/runtime"
"github.com/containerd/containerd/v2/pkg/events"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/errdefs"
"github.com/containerd/log"

View File

@@ -21,8 +21,8 @@ package v2
import (
"context"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/core/runtime"
"github.com/containerd/containerd/v2/pkg/events"
"github.com/docker/go-metrics"
)

View File

@@ -29,12 +29,12 @@ import (
apitypes "github.com/containerd/containerd/v2/api/types"
"github.com/containerd/containerd/v2/core/containers"
"github.com/containerd/containerd/v2/core/events/exchange"
"github.com/containerd/containerd/v2/core/metadata"
"github.com/containerd/containerd/v2/core/runtime"
shimbinary "github.com/containerd/containerd/v2/core/runtime/v2/shim"
"github.com/containerd/containerd/v2/core/sandbox"
"github.com/containerd/containerd/v2/internal/cleanup"
"github.com/containerd/containerd/v2/pkg/events/exchange"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/timeout"
"github.com/containerd/containerd/v2/plugins"

View File

@@ -38,9 +38,9 @@ import (
eventstypes "github.com/containerd/containerd/v2/api/events"
task "github.com/containerd/containerd/v2/api/runtime/task/v3"
"github.com/containerd/containerd/v2/api/types"
"github.com/containerd/containerd/v2/core/events/exchange"
"github.com/containerd/containerd/v2/core/runtime"
client "github.com/containerd/containerd/v2/core/runtime/v2/shim"
"github.com/containerd/containerd/v2/pkg/events/exchange"
"github.com/containerd/containerd/v2/pkg/identifiers"
"github.com/containerd/containerd/v2/pkg/timeout"
"github.com/containerd/containerd/v2/protobuf"

View File

@@ -23,7 +23,7 @@ import (
v1 "github.com/containerd/containerd/v2/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/v2/api/types"
"github.com/containerd/containerd/v2/pkg/events"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/ttrpcutil"
"github.com/containerd/containerd/v2/protobuf"

View File

@@ -32,7 +32,7 @@ import (
shimapi "github.com/containerd/containerd/v2/api/runtime/task/v3"
"github.com/containerd/containerd/v2/api/types"
"github.com/containerd/containerd/v2/pkg/events"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/shutdown"
"github.com/containerd/containerd/v2/plugins"