Fix concurrency issues
This commit is contained in:
		| @@ -69,7 +69,6 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) { | ||||
| 	signals := make(chan os.Signal, bufferSize) | ||||
| 	signal.Notify(signals) | ||||
| 	for s := range signals { | ||||
| 		logrus.WithField("signal", s).Debug("containerd: received signal") | ||||
| 		switch s { | ||||
| 		case syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP: | ||||
| 			os.Exit(0) | ||||
| @@ -96,7 +95,7 @@ func reap() (exits []*containerd.ExitEvent, err error) { | ||||
| 			if err == syscall.ECHILD { | ||||
| 				return exits, nil | ||||
| 			} | ||||
| 			return nil, err | ||||
| 			return exits, err | ||||
| 		} | ||||
| 		if pid <= 0 { | ||||
| 			return exits, nil | ||||
|   | ||||
							
								
								
									
										8
									
								
								event.go
									
									
									
									
									
								
							
							
						
						
									
										8
									
								
								event.go
									
									
									
									
									
								
							| @@ -27,3 +27,11 @@ type StartContainerEvent struct { | ||||
| func (c *StartContainerEvent) String() string { | ||||
| 	return "create container" | ||||
| } | ||||
|  | ||||
| type ContainerStartErrorEvent struct { | ||||
| 	ID string | ||||
| } | ||||
|  | ||||
| func (c *ContainerStartErrorEvent) String() string { | ||||
| 	return "container start error" | ||||
| } | ||||
|   | ||||
| @@ -2,6 +2,7 @@ package containerd | ||||
|  | ||||
| import ( | ||||
| 	"os" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/Sirupsen/logrus" | ||||
| 	"github.com/opencontainers/runc/libcontainer" | ||||
| @@ -21,6 +22,11 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) { | ||||
| 		stateDir:   stateDir, | ||||
| 		containers: make(map[string]Container), | ||||
| 		runtime:    runtime, | ||||
| 		tasks:      make(chan *startTask, concurrency*100), | ||||
| 	} | ||||
| 	for i := 0; i < concurrency; i++ { | ||||
| 		s.workerGroup.Add(1) | ||||
| 		go s.startContainerWorker(s.tasks) | ||||
| 	} | ||||
| 	return s, nil | ||||
| } | ||||
| @@ -29,12 +35,13 @@ type Supervisor struct { | ||||
| 	// stateDir is the directory on the system to store container runtime state information. | ||||
| 	stateDir string | ||||
|  | ||||
| 	processes  []Process | ||||
| 	containers map[string]Container | ||||
|  | ||||
| 	runtime Runtime | ||||
|  | ||||
| 	events chan Event | ||||
| 	events      chan Event | ||||
| 	tasks       chan *startTask | ||||
| 	workerGroup sync.WaitGroup | ||||
| } | ||||
|  | ||||
| // Start is a non-blocking call that runs the supervisor for monitoring contianer processes and | ||||
| @@ -48,7 +55,6 @@ func (s *Supervisor) Start(events chan Event) error { | ||||
| 	s.events = events | ||||
| 	go func() { | ||||
| 		for evt := range events { | ||||
| 			logrus.WithField("event", evt).Debug("containerd: processing event") | ||||
| 			switch e := evt.(type) { | ||||
| 			case *ExitEvent: | ||||
| 				logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}). | ||||
| @@ -61,8 +67,7 @@ func (s *Supervisor) Start(events chan Event) error { | ||||
| 					continue | ||||
| 				} | ||||
| 				container.SetExited(e.Status) | ||||
| 				delete(s.containers, container.ID()) | ||||
| 				if err := container.Delete(); err != nil { | ||||
| 				if err := s.deleteContainer(container); err != nil { | ||||
| 					logrus.WithField("error", err).Error("containerd: deleting container") | ||||
| 				} | ||||
| 			case *StartContainerEvent: | ||||
| @@ -72,17 +77,27 @@ func (s *Supervisor) Start(events chan Event) error { | ||||
| 					continue | ||||
| 				} | ||||
| 				s.containers[e.ID] = container | ||||
| 				if err := container.Start(); err != nil { | ||||
| 					e.Err <- err | ||||
| 					continue | ||||
| 				s.tasks <- &startTask{ | ||||
| 					err:       e.Err, | ||||
| 					container: container, | ||||
| 				} | ||||
| 			case *ContainerStartErrorEvent: | ||||
| 				if container, ok := s.containers[e.ID]; ok { | ||||
| 					if err := s.deleteContainer(container); err != nil { | ||||
| 						logrus.WithField("error", err).Error("containerd: deleting container") | ||||
| 					} | ||||
| 				} | ||||
| 				e.Err <- nil | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *Supervisor) deleteContainer(container Container) error { | ||||
| 	delete(s.containers, container.ID()) | ||||
| 	return container.Delete() | ||||
| } | ||||
|  | ||||
| func (s *Supervisor) getContainerForPid(pid int) (Container, error) { | ||||
| 	for _, container := range s.containers { | ||||
| 		cpid, err := container.Pid() | ||||
| @@ -104,3 +119,22 @@ func (s *Supervisor) getContainerForPid(pid int) (Container, error) { | ||||
| func (s *Supervisor) SendEvent(evt Event) { | ||||
| 	s.events <- evt | ||||
| } | ||||
|  | ||||
| type startTask struct { | ||||
| 	container Container | ||||
| 	err       chan error | ||||
| } | ||||
|  | ||||
| func (s *Supervisor) startContainerWorker(tasks chan *startTask) { | ||||
| 	defer s.workerGroup.Done() | ||||
| 	for t := range tasks { | ||||
| 		if err := t.container.Start(); err != nil { | ||||
| 			s.SendEvent(&ContainerStartErrorEvent{ | ||||
| 				ID: t.container.ID(), | ||||
| 			}) | ||||
| 			t.err <- err | ||||
| 			continue | ||||
| 		} | ||||
| 		t.err <- nil | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Michael Crosby
					Michael Crosby