diff --git a/containerd/main.go b/containerd/main.go index 76285ee34..4cdca13aa 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -104,13 +104,14 @@ func main() { func daemon(context *cli.Context) error { signals := make(chan os.Signal, 2048) signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1) - sv, err := supervisor.New( - context.String("state-dir"), - context.String("runtime"), - context.String("shim"), - context.StringSlice("runtime-args"), - context.Duration("start-timeout"), - context.Int("retain-count")) + sv, err := supervisor.New(supervisor.Config{ + StateDir: context.String("state-dir"), + Runtime: context.String("runtime"), + ShimName: context.String("shim"), + RuntimeArgs: context.StringSlice("runtime-args"), + Timeout: context.Duration("start-timeout"), + EventRetainCount: context.Int("retain-count"), + }) if err != nil { return err } diff --git a/supervisor/create.go b/supervisor/create.go index fa3468cdb..8a9957426 100644 --- a/supervisor/create.go +++ b/supervisor/create.go @@ -24,22 +24,22 @@ type StartTask struct { } func (s *Supervisor) start(t *StartTask) error { - rt := s.runtime - rtArgs := s.runtimeArgs + rt := s.config.Runtime + rtArgs := s.config.RuntimeArgs if t.Runtime != "" { rt = t.Runtime rtArgs = t.RuntimeArgs } container, err := runtime.New(runtime.ContainerOpts{ - Root: s.stateDir, + Root: s.config.StateDir, ID: t.ID, Bundle: t.BundlePath, Runtime: rt, RuntimeArgs: rtArgs, - Shim: s.shim, + Shim: s.config.ShimName, Labels: t.Labels, NoPivotRoot: t.NoPivotRoot, - Timeout: s.timeout, + Timeout: s.config.Timeout, }) if err != nil { return err @@ -58,7 +58,6 @@ func (s *Supervisor) start(t *StartTask) error { if t.Checkpoint != nil { task.CheckpointPath = filepath.Join(t.CheckpointDir, t.Checkpoint.Name) } - s.startTasks <- task return errDeferredResponse } diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index 4306cf608..9d89c3e13 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -17,10 +17,19 @@ const ( defaultBufferSize = 2048 // size of queue in eventloop ) +type Config struct { + StateDir string + Runtime string + ShimName string + RuntimeArgs []string + Timeout time.Duration + EventRetainCount int +} + // New returns an initialized Process supervisor. -func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) { +func New(c Config) (*Supervisor, error) { startTasks := make(chan *startTask, 10) - if err := os.MkdirAll(stateDir, 0755); err != nil { + if err := os.MkdirAll(c.StateDir, 0755); err != nil { return nil, err } machine, err := CollectMachineInformation() @@ -32,19 +41,15 @@ func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, ti return nil, err } s := &Supervisor{ - stateDir: stateDir, + config: c, containers: make(map[string]*containerInfo), startTasks: startTasks, machine: machine, subscribers: make(map[chan Event]struct{}), tasks: make(chan Task, defaultBufferSize), monitor: monitor, - runtime: runtimeName, - runtimeArgs: runtimeArgs, - shim: shimName, - timeout: timeout, } - if err := setupEventLog(s, retainCount); err != nil { + if err := setupEventLog(s, c.EventRetainCount); err != nil { return nil, err } go s.exitHandler() @@ -65,7 +70,7 @@ func setupEventLog(s *Supervisor, retainCount int) error { } logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events") events := s.Events(time.Time{}, false, "") - return eventLogger(s, filepath.Join(s.stateDir, "events.log"), events, retainCount) + return eventLogger(s, filepath.Join(s.config.StateDir, "events.log"), events, retainCount) } func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error { @@ -122,7 +127,7 @@ func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) } func readEventLog(s *Supervisor) error { - f, err := os.Open(filepath.Join(s.stateDir, "events.log")) + f, err := os.Open(filepath.Join(s.config.StateDir, "events.log")) if err != nil { if os.IsNotExist(err) { return nil @@ -146,14 +151,9 @@ func readEventLog(s *Supervisor) error { // Supervisor represents a container supervisor type Supervisor struct { - // stateDir is the directory on the system to store container runtime state information. - stateDir string - // name of the OCI compatible runtime used to execute containers - runtime string - runtimeArgs []string - shim string - containers map[string]*containerInfo - startTasks chan *startTask + config Config + containers map[string]*containerInfo + startTasks chan *startTask // we need a lock around the subscribers map only because additions and deletions from // the map are via the API so we cannot really control the concurrency subscriberLock sync.RWMutex @@ -163,7 +163,6 @@ type Supervisor struct { monitor *Monitor eventLog []Event eventLock sync.Mutex - timeout time.Duration } // Stop closes all startTasks and sends a SIGTERM to each container's pid1 then waits for they to @@ -252,9 +251,9 @@ func (s *Supervisor) notifySubscribers(e Event) { // state of the Supervisor func (s *Supervisor) Start() error { logrus.WithFields(logrus.Fields{ - "stateDir": s.stateDir, - "runtime": s.runtime, - "runtimeArgs": s.runtimeArgs, + "stateDir": s.config.StateDir, + "runtime": s.config.Runtime, + "runtimeArgs": s.config.RuntimeArgs, "memory": s.machine.Memory, "cpus": s.machine.Cpus, }).Debug("containerd: supervisor running") @@ -300,7 +299,7 @@ func (s *Supervisor) monitorProcess(p runtime.Process) error { } func (s *Supervisor) restore() error { - dirs, err := ioutil.ReadDir(s.stateDir) + dirs, err := ioutil.ReadDir(s.config.StateDir) if err != nil { return err } @@ -309,7 +308,7 @@ func (s *Supervisor) restore() error { continue } id := d.Name() - container, err := runtime.Load(s.stateDir, id, s.shim, s.timeout) + container, err := runtime.Load(s.config.StateDir, id, s.config.ShimName, s.config.Timeout) if err != nil { return err }