From a5d246404cd9748271f704770107d65701029f23 Mon Sep 17 00:00:00 2001 From: Ian Campbell Date: Wed, 31 May 2017 12:15:18 +0100 Subject: [PATCH] Reconnect to shim event stream after containerd restart There are three aspects which need to be covered: - the runtime needs to restart its event pump when it reconnects (in loadContainer). - on the server side shim needs to monitor the stream context so it knows when the connection goes away. - if the shim's stream.Send() fails (because the stream died between taking the event off the channel and calling stream.Send()) then to avoid losing that event the shim should remember it and send it out first on the next stream. The shim's event production machinery only handles producing a single event stream, so add an interlock to ensure there is only one reader of the `s.events` channel at a time. Subsequent attempts to use Events will block until the existing owner is done. Fixes #921. Signed-off-by: Ian Campbell --- linux/runtime.go | 4 ++++ linux/shim/service.go | 39 ++++++++++++++++++++++++++++----------- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/linux/runtime.go b/linux/runtime.go index cf1094ab4..6bd835c48 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -257,6 +257,10 @@ func (r *Runtime) loadContainer(path string) (*Task, error) { return nil, err } + if err = r.handleEvents(s); err != nil { + return nil, err + } + data, err := ioutil.ReadFile(filepath.Join(path, configFilename)) if err != nil { return nil, err diff --git a/linux/shim/service.go b/linux/shim/service.go index fb460cb3b..4e4877748 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -30,14 +30,16 @@ func New(path string) *Service { } type Service struct { - initProcess *initProcess - path string - id string - bundle string - mu sync.Mutex - processes map[int]process - events chan *task.Event - execID int + initProcess *initProcess + path string + id string + bundle string + mu sync.Mutex + processes map[int]process + events chan *task.Event + eventsMu sync.Mutex + deferredEvent *task.Event + execID int } func (s *Service) Create(ctx context.Context, r *shimapi.CreateRequest) (*shimapi.CreateResponse, error) { @@ -146,12 +148,27 @@ func (s *Service) Pty(ctx context.Context, r *shimapi.PtyRequest) (*google_proto } func (s *Service) Events(r *shimapi.EventsRequest, stream shimapi.Shim_EventsServer) error { - for e := range s.events { - if err := stream.Send(e); err != nil { + s.eventsMu.Lock() + defer s.eventsMu.Unlock() + + if s.deferredEvent != nil { + if err := stream.Send(s.deferredEvent); err != nil { return err } + s.deferredEvent = nil + } + + for { + select { + case e := <-s.events: + if err := stream.Send(e); err != nil { + s.deferredEvent = e + return err + } + case <-stream.Context().Done(): + return stream.Context().Err() + } } - return nil } func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {