Update containerd to v1.0.0-beta.0
Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
@@ -17,8 +17,6 @@ limitations under the License.
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/api/services/events/v1"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
@@ -32,26 +30,25 @@ import (
|
||||
// eventMonitor monitors containerd event and updates internal state correspondingly.
|
||||
// TODO(random-liu): [P1] Is it possible to drop event during containerd is running?
|
||||
type eventMonitor struct {
|
||||
c *criContainerdService
|
||||
eventstream events.Events_SubscribeClient
|
||||
cancel context.CancelFunc
|
||||
closeCh chan struct{}
|
||||
c *criContainerdService
|
||||
ch <-chan *events.Envelope
|
||||
errCh <-chan error
|
||||
closeCh chan struct{}
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// Create new event monitor. New event monitor will start subscribing containerd event. All events
|
||||
// happen after it should be monitored.
|
||||
func newEventMonitor(c *criContainerdService) (*eventMonitor, error) {
|
||||
func newEventMonitor(c *criContainerdService) *eventMonitor {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
e, err := c.client.Events(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to subscribe containerd event: %v", err)
|
||||
}
|
||||
ch, errCh := c.client.Subscribe(ctx)
|
||||
return &eventMonitor{
|
||||
c: c,
|
||||
eventstream: e,
|
||||
cancel: cancel,
|
||||
closeCh: make(chan struct{}),
|
||||
}, nil
|
||||
c: c,
|
||||
ch: ch,
|
||||
errCh: errCh,
|
||||
closeCh: make(chan struct{}),
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
// start starts the event monitor which monitors and handles all container events. It returns
|
||||
@@ -59,14 +56,15 @@ func newEventMonitor(c *criContainerdService) (*eventMonitor, error) {
|
||||
func (em *eventMonitor) start() <-chan struct{} {
|
||||
go func() {
|
||||
for {
|
||||
e, err := em.eventstream.Recv()
|
||||
if err != nil {
|
||||
select {
|
||||
case e := <-em.ch:
|
||||
glog.V(4).Infof("Received container event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
|
||||
em.handleEvent(e)
|
||||
case err := <-em.errCh:
|
||||
glog.Errorf("Failed to handle event stream: %v", err)
|
||||
close(em.closeCh)
|
||||
return
|
||||
}
|
||||
glog.V(4).Infof("Received container event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
|
||||
em.handleEvent(e)
|
||||
}
|
||||
}()
|
||||
return em.closeCh
|
||||
|
||||
@@ -155,10 +155,7 @@ func NewCRIContainerdService(
|
||||
return nil, fmt.Errorf("failed to create stream server: %v", err)
|
||||
}
|
||||
|
||||
c.eventMonitor, err = newEventMonitor(c)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create event monitor: %v", err)
|
||||
}
|
||||
c.eventMonitor = newEventMonitor(c)
|
||||
|
||||
// Create the grpc server and register runtime and image services.
|
||||
c.server = grpc.NewServer()
|
||||
|
||||
Reference in New Issue
Block a user