commit 15a96783ca2ac8c0eb2c400701e8eb335059c63b Author: Michael Crosby Date: Thu Nov 5 15:29:53 2015 -0800 Initial commit diff --git a/api/v1/server.go b/api/v1/server.go new file mode 100644 index 000000000..dc1647ff4 --- /dev/null +++ b/api/v1/server.go @@ -0,0 +1,66 @@ +package v1 + +import ( + "encoding/json" + "net/http" + + "github.com/crosbymichael/containerd" + "github.com/gorilla/mux" +) + +func NewServer(supervisor *containerd.Supervisor) http.Handler { + r := mux.NewRouter() + s := &server{ + supervisor: supervisor, + r: r, + } + r.HandleFunc("/containers", s.containers).Methods("GET") + r.HandleFunc("/containers/{id:*}", s.createContainer).Methods("POST") + r.HandleFunc("/containers/{id:*}", s.deleteContainer).Methods("DELETE") + return s +} + +type server struct { + r *mux.Router + supervisor *containerd.Supervisor +} + +func (s *server) HandleHTTP(w http.ResponseWriter, r *http.Request) { + s.r.HandleHTTP(w, r) +} + +func (s *server) containers(w http.ResponseWriter, r *http.Request) { + +} + +func (s *server) events(w http.ResponseWriter, r *http.Request) { + +} + +func (s *server) deleteContainer(w http.ResponseWriter, r *http.Request) { + +} + +func (s *server) createContainer(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + var c Container + if err := json.NewDecoder(r.Body).Decode(&c); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + e := &containerd.CreateContainerEvent{ + ID: id, + BundlePath: c.BundlePath, + Err: make(chan error, 1), + } + s.supervisor.SendEvent(e) + if err := <-e.Err; err != nil { + code := http.StatusInternalServerError + if err == containerd.ErrBundleNotFound { + code = http.StatusNotFound + } + http.Error(w, err.Error(), code) + return + } + w.WriteHeader(http.StatusCreated) +} diff --git a/api/v1/types.go b/api/v1/types.go new file mode 100644 index 000000000..bf16d7186 --- /dev/null +++ b/api/v1/types.go @@ -0,0 +1,11 @@ +package v1 + +type State struct { + Containers []Container `json:"containers"` +} + +type Container struct { + ID string `json:"id,omitempty"` + BundlePath string `json:"bundlePath,omitempty"` + Processes []int `json:"processes,omitempty"` +} diff --git a/container.go b/container.go new file mode 100644 index 000000000..d36a1e845 --- /dev/null +++ b/container.go @@ -0,0 +1,6 @@ +package containerd + +type Container interface { + SetExited(status int) + Delete() error +} diff --git a/containerd/daemon.go b/containerd/daemon.go new file mode 100644 index 000000000..d212f8c1a --- /dev/null +++ b/containerd/daemon.go @@ -0,0 +1,87 @@ +package main + +import ( + "os" + "os/signal" + "syscall" + + "github.com/Sirupsen/logrus" + "github.com/codegangsta/cli" + "github.com/crosbymichael/containerd" + "github.com/opencontainers/runc/libcontainer/utils" +) + +var DaemonCommand = cli.Command{ + Name: "daemon", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "state-dir", + Value: "/run/containerd", + Usage: "runtime state directory", + }, + cli.IntFlag{ + Name: "buffer-size", + Value: 2048, + Usage: "set the channel buffer size for events and signals", + }, + }, + Action: func(context *cli.Context) { + }, +} + +func daemon(stateDir string, bufferSize int) error { + supervisor, err := container.NewSupervisor(stateDir) + if err != nil { + return err + } + events := make(chan containerd.Event, bufferSize) + // start the signal handler in the background. + go startSignalHandler(supervisor, bufferSize) + return supervisor.Run(events) +} + +func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) { + logrus.Debug("containerd: starting signal handler") + signals := make(chan os.Signal, bufferSize) + signal.Notify(signals) + for s := range signals { + logrus.WithField("signal", s).Debug("containerd: received signal") + switch s { + case syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP: + supervisor.Stop() + case syscall.SIGCHLD: + exits, err := reap() + if err != nil { + logrus.WithField("error", err).Error("containerd: reaping child processes") + } + for _, e := range exits { + if err := supervisor.Process(e); err != nil { + logrus.WithField("error", err).Error("containerd: processing events") + } + } + } + } +} + +func reap() (exits []*containerd.ExitEvent, err error) { + var ( + ws syscall.WaitStatus + rus syscall.Rusage + ) + for { + pid, err := syscall.Wait4(-1, &ws, syscall.WNOHANG, &rus) + if err != nil { + if err == syscall.ECHILD { + return exits, nil + } + return nil, err + } + if pid <= 0 { + return exits, nil + } + exits = append(exits, *conatinerd.ExitEvent{ + Pid: pid, + Status: utils.ExitStatus(ws), + }) + } +} diff --git a/containerd/main.go b/containerd/main.go new file mode 100644 index 000000000..c7ae003fc --- /dev/null +++ b/containerd/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "os" + + "github.com/Sirupsen/logrus" + "github.com/codegangsta/cli" +) + +const ( + Version = "0.0.1" + Usage = `High performance conatiner daemon` +) + +func main() { + app := cli.NewApp() + app.Name = "containerd" + app.Version = Version + app.Usage = Usage + app.Authors = []cli.Author{ + Name: "@crosbymichael", + Email: "crosbymichael@gmail.com", + } + app.Commands = []cli.Command{ + DaemonCommand, + } + if err := app.Run(os.Args); err != nil { + logrus.Fatal(err) + } +} diff --git a/errors.go b/errors.go new file mode 100644 index 000000000..5f620b071 --- /dev/null +++ b/errors.go @@ -0,0 +1,15 @@ +package containerd + +import "errors" + +var ( + // External errors + ErrEventChanNil = errors.New("containerd: event channel is nil") + ErrBundleNotFound = errors.New("containerd: bundle not found") + ErrContainerNotFound = errors.New("containerd: container not found") + ErrContainerExists = errors.New("containerd: container already exists") + + // Internal errors + errShutdown = errors.New("containerd: supervisor is shutdown") + errRootNotAbs = errors.New("containerd: rootfs path is not an absolute path") +) diff --git a/event.go b/event.go new file mode 100644 index 000000000..3f495a654 --- /dev/null +++ b/event.go @@ -0,0 +1,38 @@ +package containerd + +type Event interface { + String() string +} + +type CallbackEvent interface { + Event() Event + Callback() chan Event +} + +type ExitEvent struct { + Pid int + Status int +} + +func (e *ExitEvent) String() string { + return "exit event" +} + +type StartedEvent struct { + ID string + Container Container +} + +func (s *StartedEvent) String() string { + return "started event" +} + +type CreateContainerEvent struct { + ID string + BundlePath string + Err chan error +} + +func (c *CreateContainerEvent) String() string { + return "create container" +} diff --git a/jobs.go b/jobs.go new file mode 100644 index 000000000..51e0b94a0 --- /dev/null +++ b/jobs.go @@ -0,0 +1,8 @@ +package containerd + +type Job interface { +} + +type CreateJob struct { + Err chan error +} diff --git a/process.go b/process.go new file mode 100644 index 000000000..f09d68f13 --- /dev/null +++ b/process.go @@ -0,0 +1,6 @@ +package containerd + +type Process interface { + // Signal sends a signal to the process. + SetExited(status int) +} diff --git a/runtime.go b/runtime.go new file mode 100644 index 000000000..1dafe8b2b --- /dev/null +++ b/runtime.go @@ -0,0 +1,7 @@ +package containerd + +// runtime handles containers, containers handle their own actions. +type Runtime interface { + Create(id, bundlePath string) (Container, error) + Delete(id sting) error +} diff --git a/runtime_linux.go b/runtime_linux.go new file mode 100644 index 000000000..80e415ad5 --- /dev/null +++ b/runtime_linux.go @@ -0,0 +1,529 @@ +// +build libcontainer + +package containerd + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "syscall" + + "github.com/opencontainers/runc/libcontainer" + "github.com/opencontainers/runc/libcontainer/cgroups" + "github.com/opencontainers/runc/libcontainer/configs" + "github.com/opencontainers/runc/libcontainer/seccomp" + "github.com/opencontainers/specs" +) + +func init() { + if len(os.Args) > 1 && os.Args[1] == "init" { + runtime.GOMAXPROCS(1) + runtime.LockOSThread() + factory, _ := libcontainer.New("") + if err := factory.StartInitialization(); err != nil { + fatal(err) + } + panic("--this line should have never been executed, congratulations--") + } +} + +type libcontainerContainer struct { + c libcontainer.Container + initProcess *libcontainer.Process + exitStatus int + exited bool +} + +func (c *libcontainerContainer) SetExited(status int) { + c.exitStatus = status + // meh + c.exited = true +} + +func (c *libcontainerContainer) Delete() error { + return c.c.Destroy() +} + +func NewLibcontainerRuntime(stateDir string) (Runtime, error) { + f, err := libcontainer.New(abs, libcontainer.Cgroupfs, func(l *libcontainer.LinuxFactory) error { + //l.CriuPath = context.GlobalString("criu") + return nil + }) + r := &libcontainerRuntime{ + factory: f, + containers: make(map[string]libcontainer.Container), + } + return r, nil + +} + +type libcontainerRuntime struct { + factory libcontainer.Factory +} + +func (r *libcontainerRuntime) Create(id, bundlePath string) (Container, error) { + spec, rspec, err := loadSpec( + filepath.Join(bundlePath, "config.json"), + filepath.Join(bundlePath, "runtime.json"), + ) + if err != nil { + return nil, err + } + config, err := r.createLibcontainerConfig(id, spec, rspec) + if err != nil { + return nil, err + } + container, err := r.factory.Create(id, config) + if err != nil { + return nil, err + } + process := r.newProcess(spec.Process) + c := &libcontainerContainer{ + c: container, + initProcess: process, + } + if err := container.Start(process); err != nil { + container.Destroy() + return nil, err + } + return c, nil +} + +// newProcess returns a new libcontainer Process with the arguments from the +// spec and stdio from the current process. +func (r *libcontainerRuntime) newProcess(p specs.Process) *libcontainer.Process { + return &libcontainer.Process{ + Args: p.Args, + Env: p.Env, + // TODO: fix libcontainer's API to better support uid/gid in a typesafe way. + User: fmt.Sprintf("%d:%d", p.User.UID, p.User.GID), + Cwd: p.Cwd, + } +} + +// loadSpec loads the specification from the provided path. +// If the path is empty then the default path will be "config.json" +func (r *libcontainerRuntime) loadSpec(cPath, rPath string) (spec *specs.LinuxSpec, rspec *specs.LinuxRuntimeSpec, err error) { + cf, err := os.Open(cPath) + if err != nil { + if os.IsNotExist(err) { + return nil, nil, fmt.Errorf("JSON specification file at %s not found", cPath) + } + return spec, rspec, err + } + defer cf.Close() + + rf, err := os.Open(rPath) + if err != nil { + if os.IsNotExist(err) { + return nil, nil, fmt.Errorf("JSON runtime config file at %s not found", rPath) + } + return spec, rspec, err + } + defer rf.Close() + + if err = json.NewDecoder(cf).Decode(&spec); err != nil { + return spec, rspec, err + } + if err = json.NewDecoder(rf).Decode(&rspec); err != nil { + return spec, rspec, err + } + return spec, rspec, r.checkSpecVersion(spec) +} + +// checkSpecVersion makes sure that the spec version matches runc's while we are in the initial +// development period. It is better to hard fail than have missing fields or options in the spec. +func (r *libcontainerRuntime) checkSpecVersion(s *specs.LinuxSpec) error { + if s.Version != specs.Version { + return fmt.Errorf("spec version is not compatible with implemented version %q: spec %q", specs.Version, s.Version) + } + return nil +} + +func (r *libcontainerRuntime) createLibcontainerConfig(cgroupName, bundlePath string, spec *specs.LinuxSpec, rspec *specs.LinuxRuntimeSpec) (*configs.Config, error) { + rootfsPath := spec.Root.Path + if !filepath.IsAbs(rootfsPath) { + rootfsPath = filepath.Join(bundlePath, rootfsPath) + } + config := &configs.Config{ + Rootfs: rootfsPath, + Capabilities: spec.Linux.Capabilities, + Readonlyfs: spec.Root.Readonly, + Hostname: spec.Hostname, + } + + exists := false + for _, ns := range rspec.Linux.Namespaces { + t, exists := namespaceMapping[ns.Type] + if !exists { + return nil, fmt.Errorf("namespace %q does not exist", ns) + } + config.Namespaces.Add(t, ns.Path) + } + if config.Namespaces.Contains(configs.NEWNET) { + config.Networks = []*configs.Network{ + { + Type: "loopback", + }, + } + } + for _, mp := range spec.Mounts { + m, ok := rspec.Mounts[mp.Name] + if !ok { + return nil, fmt.Errorf("Mount with Name %q not found in runtime config", mp.Name) + } + config.Mounts = append(config.Mounts, r.createLibcontainerMount(bundlePath, mp.Path, m)) + } + if err := r.createDevices(rspec, config); err != nil { + return nil, err + } + if err := r.setupUserNamespace(rspec, config); err != nil { + return nil, err + } + for _, rlimit := range rspec.Linux.Rlimits { + rl, err := r.createLibContainerRlimit(rlimit) + if err != nil { + return nil, err + } + config.Rlimits = append(config.Rlimits, rl) + } + c, err := r.createCgroupConfig(cgroupName, rspec, config.Devices) + if err != nil { + return nil, err + } + config.Cgroups = c + if config.Readonlyfs { + r.setReadonly(config) + config.MaskPaths = []string{ + "/proc/kcore", + } + config.ReadonlyPaths = []string{ + "/proc/sys", "/proc/sysrq-trigger", "/proc/irq", "/proc/bus", + } + } + seccomp, err := r.setupSeccomp(&rspec.Linux.Seccomp) + if err != nil { + return nil, err + } + config.Seccomp = seccomp + config.Sysctl = rspec.Linux.Sysctl + config.ProcessLabel = rspec.Linux.SelinuxProcessLabel + config.AppArmorProfile = rspec.Linux.ApparmorProfile + for _, g := range spec.Process.User.AdditionalGids { + config.AdditionalGroups = append(config.AdditionalGroups, strconv.FormatUint(uint64(g), 10)) + } + r.createHooks(rspec, config) + config.Version = specs.Version + return config, nil +} + +func (r *libcontainerRuntime) createLibcontainerMount(cwd, dest string, m specs.Mount) *configs.Mount { + flags, pgflags, data := parseMountOptions(m.Options) + source := m.Source + if m.Type == "bind" { + if !filepath.IsAbs(source) { + source = filepath.Join(cwd, m.Source) + } + } + return &configs.Mount{ + Device: m.Type, + Source: source, + Destination: dest, + Data: data, + Flags: flags, + PropagationFlags: pgflags, + } +} + +func (r *libcontainerRuntime) createCgroupConfig(name string, spec *specs.LinuxRuntimeSpec, devices []*configs.Device) (*configs.Cgroup, error) { + myCgroupPath, err := cgroups.GetThisCgroupDir("devices") + if err != nil { + return nil, err + } + c := &configs.Cgroup{ + Name: name, + Parent: myCgroupPath, + AllowedDevices: append(devices, allowedDevices...), + } + r := spec.Linux.Resources + c.Memory = r.Memory.Limit + c.MemoryReservation = r.Memory.Reservation + c.MemorySwap = r.Memory.Swap + c.KernelMemory = r.Memory.Kernel + c.MemorySwappiness = r.Memory.Swappiness + c.CpuShares = r.CPU.Shares + c.CpuQuota = r.CPU.Quota + c.CpuPeriod = r.CPU.Period + c.CpuRtRuntime = r.CPU.RealtimeRuntime + c.CpuRtPeriod = r.CPU.RealtimePeriod + c.CpusetCpus = r.CPU.Cpus + c.CpusetMems = r.CPU.Mems + c.BlkioWeight = r.BlockIO.Weight + c.BlkioLeafWeight = r.BlockIO.LeafWeight + for _, wd := range r.BlockIO.WeightDevice { + weightDevice := configs.NewWeightDevice(wd.Major, wd.Minor, wd.Weight, wd.LeafWeight) + c.BlkioWeightDevice = append(c.BlkioWeightDevice, weightDevice) + } + for _, td := range r.BlockIO.ThrottleReadBpsDevice { + throttleDevice := configs.NewThrottleDevice(td.Major, td.Minor, td.Rate) + c.BlkioThrottleReadBpsDevice = append(c.BlkioThrottleReadBpsDevice, throttleDevice) + } + for _, td := range r.BlockIO.ThrottleWriteBpsDevice { + throttleDevice := configs.NewThrottleDevice(td.Major, td.Minor, td.Rate) + c.BlkioThrottleWriteBpsDevice = append(c.BlkioThrottleWriteBpsDevice, throttleDevice) + } + for _, td := range r.BlockIO.ThrottleReadIOPSDevice { + throttleDevice := configs.NewThrottleDevice(td.Major, td.Minor, td.Rate) + c.BlkioThrottleReadIOPSDevice = append(c.BlkioThrottleReadIOPSDevice, throttleDevice) + } + for _, td := range r.BlockIO.ThrottleWriteIOPSDevice { + throttleDevice := configs.NewThrottleDevice(td.Major, td.Minor, td.Rate) + c.BlkioThrottleWriteIOPSDevice = append(c.BlkioThrottleWriteIOPSDevice, throttleDevice) + } + for _, l := range r.HugepageLimits { + c.HugetlbLimit = append(c.HugetlbLimit, &configs.HugepageLimit{ + Pagesize: l.Pagesize, + Limit: l.Limit, + }) + } + c.OomKillDisable = r.DisableOOMKiller + c.NetClsClassid = r.Network.ClassID + for _, m := range r.Network.Priorities { + c.NetPrioIfpriomap = append(c.NetPrioIfpriomap, &configs.IfPrioMap{ + Interface: m.Name, + Priority: m.Priority, + }) + } + return c, nil +} + +func (r *libcontainerRuntime) createDevices(spec *specs.LinuxRuntimeSpec, config *configs.Config) error { + for _, d := range spec.Linux.Devices { + device := &configs.Device{ + Type: d.Type, + Path: d.Path, + Major: d.Major, + Minor: d.Minor, + Permissions: d.Permissions, + FileMode: d.FileMode, + Uid: d.UID, + Gid: d.GID, + } + config.Devices = append(config.Devices, device) + } + return nil +} + +func (r *libcontainerRuntime) setReadonly(config *configs.Config) { + for _, m := range config.Mounts { + if m.Device == "sysfs" { + m.Flags |= syscall.MS_RDONLY + } + } +} + +func (r *libcontainerRuntime) setupUserNamespace(spec *specs.LinuxRuntimeSpec, config *configs.Config) error { + if len(spec.Linux.UIDMappings) == 0 { + return nil + } + config.Namespaces.Add(configs.NEWUSER, "") + create := func(m specs.IDMapping) configs.IDMap { + return configs.IDMap{ + HostID: int(m.HostID), + ContainerID: int(m.ContainerID), + Size: int(m.Size), + } + } + for _, m := range spec.Linux.UIDMappings { + config.UidMappings = append(config.UidMappings, create(m)) + } + for _, m := range spec.Linux.GIDMappings { + config.GidMappings = append(config.GidMappings, create(m)) + } + rootUID, err := config.HostUID() + if err != nil { + return err + } + rootGID, err := config.HostGID() + if err != nil { + return err + } + for _, node := range config.Devices { + node.Uid = uint32(rootUID) + node.Gid = uint32(rootGID) + } + return nil +} + +func (r *libcontainerRuntime) createLibContainerRlimit(rlimit specs.Rlimit) (configs.Rlimit, error) { + rl, err := strToRlimit(rlimit.Type) + if err != nil { + return configs.Rlimit{}, err + } + return configs.Rlimit{ + Type: rl, + Hard: uint64(rlimit.Hard), + Soft: uint64(rlimit.Soft), + }, nil +} + +// parseMountOptions parses the string and returns the flags, propagation +// flags and any mount data that it contains. +func parseMountOptions(options []string) (int, []int, string) { + var ( + flag int + pgflag []int + data []string + ) + flags := map[string]struct { + clear bool + flag int + }{ + "async": {true, syscall.MS_SYNCHRONOUS}, + "atime": {true, syscall.MS_NOATIME}, + "bind": {false, syscall.MS_BIND}, + "defaults": {false, 0}, + "dev": {true, syscall.MS_NODEV}, + "diratime": {true, syscall.MS_NODIRATIME}, + "dirsync": {false, syscall.MS_DIRSYNC}, + "exec": {true, syscall.MS_NOEXEC}, + "mand": {false, syscall.MS_MANDLOCK}, + "noatime": {false, syscall.MS_NOATIME}, + "nodev": {false, syscall.MS_NODEV}, + "nodiratime": {false, syscall.MS_NODIRATIME}, + "noexec": {false, syscall.MS_NOEXEC}, + "nomand": {true, syscall.MS_MANDLOCK}, + "norelatime": {true, syscall.MS_RELATIME}, + "nostrictatime": {true, syscall.MS_STRICTATIME}, + "nosuid": {false, syscall.MS_NOSUID}, + "rbind": {false, syscall.MS_BIND | syscall.MS_REC}, + "relatime": {false, syscall.MS_RELATIME}, + "remount": {false, syscall.MS_REMOUNT}, + "ro": {false, syscall.MS_RDONLY}, + "rw": {true, syscall.MS_RDONLY}, + "strictatime": {false, syscall.MS_STRICTATIME}, + "suid": {true, syscall.MS_NOSUID}, + "sync": {false, syscall.MS_SYNCHRONOUS}, + } + propagationFlags := map[string]struct { + clear bool + flag int + }{ + "private": {false, syscall.MS_PRIVATE}, + "shared": {false, syscall.MS_SHARED}, + "slave": {false, syscall.MS_SLAVE}, + "unbindable": {false, syscall.MS_UNBINDABLE}, + "rprivate": {false, syscall.MS_PRIVATE | syscall.MS_REC}, + "rshared": {false, syscall.MS_SHARED | syscall.MS_REC}, + "rslave": {false, syscall.MS_SLAVE | syscall.MS_REC}, + "runbindable": {false, syscall.MS_UNBINDABLE | syscall.MS_REC}, + } + for _, o := range options { + // If the option does not exist in the flags table or the flag + // is not supported on the platform, + // then it is a data value for a specific fs type + if f, exists := flags[o]; exists && f.flag != 0 { + if f.clear { + flag &= ^f.flag + } else { + flag |= f.flag + } + } else if f, exists := propagationFlags[o]; exists && f.flag != 0 { + pgflag = append(pgflag, f.flag) + } else { + data = append(data, o) + } + } + return flag, pgflag, strings.Join(data, ",") +} + +func (r *libcontainerRuntime) setupSeccomp(config *specs.Seccomp) (*configs.Seccomp, error) { + if config == nil { + return nil, nil + } + + // No default action specified, no syscalls listed, assume seccomp disabled + if config.DefaultAction == "" && len(config.Syscalls) == 0 { + return nil, nil + } + + newConfig := new(configs.Seccomp) + newConfig.Syscalls = []*configs.Syscall{} + + if len(config.Architectures) > 0 { + newConfig.Architectures = []string{} + for _, arch := range config.Architectures { + newArch, err := seccomp.ConvertStringToArch(string(arch)) + if err != nil { + return nil, err + } + newConfig.Architectures = append(newConfig.Architectures, newArch) + } + } + + // Convert default action from string representation + newDefaultAction, err := seccomp.ConvertStringToAction(string(config.DefaultAction)) + if err != nil { + return nil, err + } + newConfig.DefaultAction = newDefaultAction + + // Loop through all syscall blocks and convert them to libcontainer format + for _, call := range config.Syscalls { + newAction, err := seccomp.ConvertStringToAction(string(call.Action)) + if err != nil { + return nil, err + } + + newCall := configs.Syscall{ + Name: call.Name, + Action: newAction, + Args: []*configs.Arg{}, + } + + // Loop through all the arguments of the syscall and convert them + for _, arg := range call.Args { + newOp, err := seccomp.ConvertStringToOperator(string(arg.Op)) + if err != nil { + return nil, err + } + + newArg := configs.Arg{ + Index: arg.Index, + Value: arg.Value, + ValueTwo: arg.ValueTwo, + Op: newOp, + } + + newCall.Args = append(newCall.Args, &newArg) + } + + newConfig.Syscalls = append(newConfig.Syscalls, &newCall) + } + + return newConfig, nil +} + +func (r *libcontainerRuntime) createHooks(rspec *specs.LinuxRuntimeSpec, config *configs.Config) { + config.Hooks = &configs.Hooks{} + for _, h := range rspec.Hooks.Prestart { + cmd := configs.Command{ + Path: h.Path, + Args: h.Args, + Env: h.Env, + } + config.Hooks.Prestart = append(config.Hooks.Prestart, configs.NewCommandHook(cmd)) + } + for _, h := range rspec.Hooks.Poststop { + cmd := configs.Command{ + Path: h.Path, + Args: h.Args, + Env: h.Env, + } + config.Hooks.Poststop = append(config.Hooks.Poststop, configs.NewCommandHook(cmd)) + } +} diff --git a/supervisor.go b/supervisor.go new file mode 100644 index 000000000..6002d270c --- /dev/null +++ b/supervisor.go @@ -0,0 +1,108 @@ +package containerd + +import ( + "os" + "sync" + + "github.com/Sirupsen/logrus" +) + +// NewSupervisor returns an initialized Process supervisor. +func NewSupervisor(stateDir string, concurrency int, runtime Runtime) (*Supervisor, error) { + if err := os.MkdirAll(stateDir, 0755); err != nil { + return nil, err + } + s := &Supervisor{ + stateDir: stateDir, + processes: make(map[int]Process), + runtime: runtime, + jobs: make(chan Job, 1024), + } + s.state = &runningState{ + s: s, + } + for i := 0; i < concurrency; i++ { + s.workerGroup.Add(1) + go s.worker(i) + } + return s, nil +} + +type Supervisor struct { + // stateDir is the directory on the system to store container runtime state information. + stateDir string + + processes map[int]Container + containers map[string]Container + + runtime Runtime + + events chan Event + jobs chan Job + + workerGroup sync.WaitGroup +} + +// Run is a blocking call that runs the supervisor for monitoring contianer processes and +// executing new containers. +// +// This event loop is the only thing that is allowed to modify state of containers and processes. +func (s *Supervisor) Run(events chan Event) error { + if events == nil { + return ErrEventChanNil + } + s.events = events + for evt := range events { + logrus.WithField("event", evt).Debug("containerd: processing event") + switch e := evt.(type) { + case *ExitEvent: + logrus.WithFields(logrus.Fields{ + "pid": e.Pid, + "status": e.Status, + }).Debug("containerd: process exited") + if container, ok := s.processes[e.Pid]; ok { + container.SetExited(e.Status) + } + case *StartedEvent: + s.containers[e.ID] = e.Container + case *CreateContainerEvent: + j := &CreateJob{ + ID: e.ID, + BundlePath: e.BundlePath, + Err: e.Err, + } + s.jobs <- j + } + } + return nil +} + +func (s *Supervisor) SendEvent(evt Event) { + s.events <- evt +} + +// Stop initiates a shutdown of the supervisor killing all processes under supervision. +func (s *Supervisor) Stop() { + +} + +func (s *Supervisor) worker(id int) { + defer func() { + s.workerGroup.Done() + logrus.WithField("worker", id).Debug("containerd: worker finished") + }() + for job := range s.jobs { + switch j := job.(type) { + case *CreateJob: + container, err := r.s.runtime.Create(j.ID, j.BundlePath) + if err != nil { + j.Err <- err + } + s.SendEvent(&StartedEvent{ + ID: j.ID, + Container: container, + }) + j.Err <- nil + } + } +}