Merge pull request #939 from ijc25/reconnect-shim-event-stream

Reconnect to shim event stream after containerd restart
This commit is contained in:
Michael Crosby 2017-06-01 09:52:13 -07:00 committed by GitHub
commit 6ff220a116
2 changed files with 32 additions and 11 deletions

View File

@ -257,6 +257,10 @@ func (r *Runtime) loadContainer(path string) (*Task, error) {
return nil, err return nil, err
} }
if err = r.handleEvents(s); err != nil {
return nil, err
}
data, err := ioutil.ReadFile(filepath.Join(path, configFilename)) data, err := ioutil.ReadFile(filepath.Join(path, configFilename))
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -30,14 +30,16 @@ func New(path string) *Service {
} }
type Service struct { type Service struct {
initProcess *initProcess initProcess *initProcess
path string path string
id string id string
bundle string bundle string
mu sync.Mutex mu sync.Mutex
processes map[int]process processes map[int]process
events chan *task.Event events chan *task.Event
execID int eventsMu sync.Mutex
deferredEvent *task.Event
execID int
} }
func (s *Service) Create(ctx context.Context, r *shimapi.CreateRequest) (*shimapi.CreateResponse, error) { func (s *Service) Create(ctx context.Context, r *shimapi.CreateRequest) (*shimapi.CreateResponse, error) {
@ -145,12 +147,27 @@ func (s *Service) Pty(ctx context.Context, r *shimapi.PtyRequest) (*google_proto
} }
func (s *Service) Events(r *shimapi.EventsRequest, stream shimapi.Shim_EventsServer) error { func (s *Service) Events(r *shimapi.EventsRequest, stream shimapi.Shim_EventsServer) error {
for e := range s.events { s.eventsMu.Lock()
if err := stream.Send(e); err != nil { defer s.eventsMu.Unlock()
if s.deferredEvent != nil {
if err := stream.Send(s.deferredEvent); err != nil {
return err return err
} }
s.deferredEvent = nil
}
for {
select {
case e := <-s.events:
if err := stream.Send(e); err != nil {
s.deferredEvent = e
return err
}
case <-stream.Context().Done():
return stream.Context().Err()
}
} }
return nil
} }
func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) { func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {