diff --git a/api/v1/client.go b/api/v1/client.go deleted file mode 100644 index 46a7d67b9..000000000 --- a/api/v1/client.go +++ /dev/null @@ -1,101 +0,0 @@ -package v1 - -import ( - "bytes" - "encoding/json" - "fmt" - "net/http" - "strconv" -) - -func NewClient(addr string) *Client { - return &Client{ - addr: addr, - } -} - -type Client struct { - addr string -} - -type StartOpts struct { - Path string - Checkpoint string -} - -// Start starts a container with the specified id and path to the container's -// bundle on the system. -func (c *Client) Start(id string, opts StartOpts) error { - container := Container{ - BundlePath: opts.Path, - Checkpoint: opts.Checkpoint, - } - buf := bytes.NewBuffer(nil) - if err := json.NewEncoder(buf).Encode(container); err != nil { - return err - } - r, err := http.Post(c.addr+"/containers/"+id, "application/json", buf) - if err != nil { - return err - } - r.Body.Close() - if r.StatusCode != http.StatusCreated { - return fmt.Errorf("unexpected status %d", r.StatusCode) - } - return nil -} - -func (c *Client) State() ([]Container, error) { - r, err := http.Get(c.addr + "/state") - if err != nil { - return nil, err - } - var s State - if err := json.NewDecoder(r.Body).Decode(&s); err != nil { - return nil, err - } - r.Body.Close() - return s.Containers, nil -} - -func (c *Client) SignalProcess(id string, pid, signal int) error { - sig := Signal{ - Signal: signal, - } - buf := bytes.NewBuffer(nil) - if err := json.NewEncoder(buf).Encode(sig); err != nil { - return err - } - r, err := http.Post(c.addr+"/containers/"+id+"/process/"+strconv.Itoa(pid), "application/json", buf) - if err != nil { - return err - } - r.Body.Close() - return nil -} - -func (c *Client) Checkpoints(id string) ([]Checkpoint, error) { - r, err := http.Get(c.addr + "/containers/" + id + "/checkpoint") - if err != nil { - return nil, err - } - defer r.Body.Close() - var checkpoints []Checkpoint - if err := json.NewDecoder(r.Body).Decode(&checkpoints); err != nil { - return nil, err - } - return checkpoints, nil -} - -func (c *Client) CreateCheckpoint(id, name string, cp Checkpoint) error { - buf := bytes.NewBuffer(nil) - if err := json.NewEncoder(buf).Encode(cp); err != nil { - return err - } - r, err := http.Post(c.addr+"/containers/"+id+"/checkpoint", "application/json", buf) - if err != nil { - return err - } - r.Body.Close() - return nil -} diff --git a/api/v1/server.go b/api/v1/server.go deleted file mode 100644 index f92b7458f..000000000 --- a/api/v1/server.go +++ /dev/null @@ -1,387 +0,0 @@ -package v1 - -import ( - "encoding/json" - "net/http" - "strconv" - "syscall" - - "github.com/Sirupsen/logrus" - "github.com/docker/containerd" - "github.com/docker/containerd/runtime" - "github.com/gorilla/mux" - "github.com/opencontainers/specs" -) - -func NewServer(supervisor *containerd.Supervisor) http.Handler { - r := mux.NewRouter() - s := &server{ - supervisor: supervisor, - r: r, - } - // process handlers - r.HandleFunc("/containers/{id:.*}/process/{pid:.*}", s.signalPid).Methods("POST") - r.HandleFunc("/containers/{id:.*}/process", s.addProcess).Methods("PUT") - - // checkpoint and restore handlers - // TODO: PUT handler for adding a checkpoint to containerd?? - r.HandleFunc("/containers/{id:.*}/checkpoint/{name:.*}", s.createCheckpoint).Methods("POST") - r.HandleFunc("/containers/{id:.*}/checkpoint/{name:.*}", s.deleteCheckpoint).Methods("DELETE") - r.HandleFunc("/containers/{id:.*}/checkpoint", s.listCheckpoints).Methods("GET") - - // container handlers - r.HandleFunc("/containers/{id:.*}", s.createContainer).Methods("POST") - r.HandleFunc("/containers/{id:.*}", s.updateContainer).Methods("PATCH") - - // internal method for replaying the journal - // r.HandleFunc("/event", s.event).Methods("POST") - r.HandleFunc("/events", s.events).Methods("GET") - - // containerd handlers - r.HandleFunc("/state", s.state).Methods("GET") - return s -} - -type server struct { - r *mux.Router - supervisor *containerd.Supervisor -} - -func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.r.ServeHTTP(w, r) -} - -func (s *server) updateContainer(w http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - var state ContainerState - if err := json.NewDecoder(r.Body).Decode(&state); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - e := containerd.NewEvent(containerd.UpdateContainerEventType) - e.ID = id - if state.Signal != 0 { - e.Signal = syscall.Signal(state.Signal) - } - e.State = &runtime.State{ - Status: runtime.Status(string(state.Status)), - } - s.supervisor.SendEvent(e) - if err := <-e.Err; err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} - -func (s *server) events(w http.ResponseWriter, r *http.Request) { - events := s.supervisor.Events() - enc := json.NewEncoder(w) - for evt := range events { - var v interface{} - switch evt.Type { - case containerd.ExitEventType: - v = createExitEvent(evt) - } - if err := enc.Encode(v); err != nil { - // TODO: handled closed conn - logrus.WithField("error", err).Error("encode event") - } - } -} - -func createExitEvent(e *containerd.Event) *Event { - return &Event{ - Type: "exit", - ID: e.ID, - Status: e.Status, - } -} - -func (s *server) event(w http.ResponseWriter, r *http.Request) { - var e containerd.Event - if err := json.NewDecoder(r.Body).Decode(&e); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - e.Err = make(chan error, 1) - s.supervisor.SendEvent(&e) - if err := <-e.Err; err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - if e.Containers != nil && len(e.Containers) > 0 { - if err := s.writeState(w, &e); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } -} - -func (s *server) addProcess(w http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - var process specs.Process - if err := json.NewDecoder(r.Body).Decode(&process); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - e := containerd.NewEvent(containerd.AddProcessEventType) - e.ID = id - e.Process = &process - s.supervisor.SendEvent(e) - if err := <-e.Err; err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - p := Process{ - Pid: e.Pid, - Terminal: process.Terminal, - Args: process.Args, - Env: process.Env, - Cwd: process.Cwd, - } - p.User.UID = process.User.UID - p.User.GID = process.User.GID - p.User.AdditionalGids = process.User.AdditionalGids - if err := json.NewEncoder(w).Encode(p); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} - -func (s *server) signalPid(w http.ResponseWriter, r *http.Request) { - var ( - vars = mux.Vars(r) - id = vars["id"] - spid = vars["pid"] - ) - pid, err := strconv.Atoi(spid) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - var signal Signal - if err := json.NewDecoder(r.Body).Decode(&signal); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - e := containerd.NewEvent(containerd.SignalEventType) - e.ID = id - e.Pid = pid - e.Signal = syscall.Signal(signal.Signal) - s.supervisor.SendEvent(e) - if err := <-e.Err; err != nil { - status := http.StatusInternalServerError - if err == containerd.ErrContainerNotFound { - status = http.StatusNotFound - } - http.Error(w, err.Error(), status) - return - } -} - -func (s *server) state(w http.ResponseWriter, r *http.Request) { - e := containerd.NewEvent(containerd.GetContainerEventType) - s.supervisor.SendEvent(e) - if err := <-e.Err; err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - if err := s.writeState(w, e); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} - -func (s *server) writeState(w http.ResponseWriter, e *containerd.Event) error { - m := s.supervisor.Machine() - state := State{ - Containers: []Container{}, - Machine: Machine{ - ID: m.ID, - Cpus: m.Cpus, - Memory: m.Memory, - }, - } - for _, c := range e.Containers { - processes, err := c.Processes() - if err != nil { - logrus.WithFields(logrus.Fields{ - "error": err, - "container": c.ID(), - }).Error("get processes for container") - } - var pids []Process - for _, p := range processes { - if proc := createProcess(p); proc != nil { - pids = append(pids, *proc) - } - } - state.Containers = append(state.Containers, Container{ - ID: c.ID(), - BundlePath: c.Path(), - Processes: pids, - State: &ContainerState{ - Status: Status(c.State().Status), - }, - }) - } - return json.NewEncoder(w).Encode(&state) -} - -func createProcess(in runtime.Process) *Process { - pid, err := in.Pid() - if err != nil { - logrus.WithField("error", err).Error("get process pid") - return nil - } - process := in.Spec() - p := &Process{ - Pid: pid, - Terminal: process.Terminal, - Args: process.Args, - Env: process.Env, - Cwd: process.Cwd, - } - p.User.UID = process.User.UID - p.User.GID = process.User.GID - p.User.AdditionalGids = process.User.AdditionalGids - return p -} - -func (s *server) createContainer(w http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - var c Container - if err := json.NewDecoder(r.Body).Decode(&c); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - if c.BundlePath == "" { - http.Error(w, "empty bundle path", http.StatusBadRequest) - return - } - e := containerd.NewEvent(containerd.StartContainerEventType) - e.ID = id - e.BundlePath = c.BundlePath - if c.Checkpoint != "" { - e.Checkpoint = &runtime.Checkpoint{ - Name: c.Checkpoint, - } - } - e.Stdio = &runtime.Stdio{ - Stderr: c.Stderr, - Stdout: c.Stdout, - } - s.supervisor.SendEvent(e) - if err := <-e.Err; err != nil { - code := http.StatusInternalServerError - if err == containerd.ErrBundleNotFound { - code = http.StatusNotFound - } - http.Error(w, err.Error(), code) - return - } - w.WriteHeader(http.StatusCreated) -} - -func (s *server) listCheckpoints(w http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - e := containerd.NewEvent(containerd.GetContainerEventType) - s.supervisor.SendEvent(e) - if err := <-e.Err; err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - var container runtime.Container - for _, c := range e.Containers { - if c.ID() == id { - container = c - break - } - } - if container == nil { - http.Error(w, "container not found", http.StatusNotFound) - return - } - checkpoints, err := container.Checkpoints() - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - out := []Checkpoint{} - for _, c := range checkpoints { - out = append(out, Checkpoint{ - Name: c.Name, - Tcp: c.Tcp, - Shell: c.Shell, - UnixSockets: c.UnixSockets, - Timestamp: c.Timestamp, - }) - } - if err := json.NewEncoder(w).Encode(out); err != nil { - logrus.WithField("error", err).Error("encode checkpoints") - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} - -func (s *server) createCheckpoint(w http.ResponseWriter, r *http.Request) { - var ( - vars = mux.Vars(r) - id = vars["id"] - name = vars["name"] - ) - var cp Checkpoint - // most options to the checkpoint action can be left out so don't - // decode unless the client passed anything in the body. - if r.ContentLength > 0 { - if err := json.NewDecoder(r.Body).Decode(&cp); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - } - e := containerd.NewEvent(containerd.CreateCheckpointEventType) - e.ID = id - e.Checkpoint = &runtime.Checkpoint{ - Name: name, - Exit: cp.Exit, - Tcp: cp.Tcp, - UnixSockets: cp.UnixSockets, - Shell: cp.Shell, - } - s.supervisor.SendEvent(e) - if err := <-e.Err; err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.WriteHeader(http.StatusCreated) -} - -func (s *server) deleteCheckpoint(w http.ResponseWriter, r *http.Request) { - var ( - vars = mux.Vars(r) - id = vars["id"] - name = vars["name"] - ) - if name == "" { - http.Error(w, "checkpoint name cannot be empty", http.StatusBadRequest) - return - } - var cp Checkpoint - if r.ContentLength > 0 { - if err := json.NewDecoder(r.Body).Decode(&cp); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - } - e := containerd.NewEvent(containerd.DeleteCheckpointEventType) - e.ID = id - e.Checkpoint = &runtime.Checkpoint{ - Name: name, - } - s.supervisor.SendEvent(e) - if err := <-e.Err; err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} diff --git a/api/v1/types.go b/api/v1/types.go deleted file mode 100644 index 4946bb0c6..000000000 --- a/api/v1/types.go +++ /dev/null @@ -1,70 +0,0 @@ -package v1 - -import "time" - -type State struct { - Containers []Container `json:"containers"` - Machine Machine `json:"machine"` -} - -type Status string - -const ( - Paused Status = "paused" - Running Status = "running" -) - -type Machine struct { - ID string `json:"id"` - Cpus int `json:"cpus"` - Memory int64 `json:"memory"` -} - -type ContainerState struct { - Status Status `json:"status,omitempty"` - Signal int `json:"signal,omitempty"` -} - -type Container struct { - ID string `json:"id,omitempty"` - BundlePath string `json:"bundlePath,omitempty"` - Processes []Process `json:"processes,omitempty"` - Stdout string `json:"stdout,omitempty"` - Stderr string `json:"stderr,omitempty"` - State *ContainerState `json:"state,omitempty"` - Checkpoint string `json:"checkpoint,omitempty"` -} - -type User struct { - UID uint32 `json:"uid"` - GID uint32 `json:"gid"` - AdditionalGids []uint32 `json:"additionalGids,omitempty"` -} - -type Process struct { - Terminal bool `json:"terminal"` - User User `json:"user"` - Args []string `json:"args,omitempty"` - Env []string `json:"env,omitempty"` - Cwd string `json:"cwd,omitempty"` - Pid int `json:"pid,omitempty"` -} - -type Signal struct { - Signal int `json:"signal"` -} - -type Event struct { - Type string `json:"type"` - ID string `json:"id,omitempty"` - Status int `json:"status,omitempty"` -} - -type Checkpoint struct { - Name string `json:"name,omitempty"` - Timestamp time.Time `json:"timestamp,omitempty"` - Exit bool `json:"exit,omitempty"` - Tcp bool `json:"tcp"` - UnixSockets bool `json:"unixSockets"` - Shell bool `json:"shell"` -} diff --git a/containerd/main.go b/containerd/main.go index 01977a33d..5bc401713 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -3,21 +3,18 @@ package main import ( "log" "net" - "net/http" "os" "runtime" "sync" "time" "google.golang.org/grpc" - "google.golang.org/grpc/grpclog" "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" "github.com/docker/containerd" "github.com/docker/containerd/api/grpc/server" "github.com/docker/containerd/api/grpc/types" - "github.com/docker/containerd/api/v1" "github.com/docker/containerd/util" "github.com/rcrowley/go-metrics" ) @@ -150,18 +147,13 @@ func daemon(id, stateDir string, concurrency, bufferSize int) error { if err := supervisor.Start(); err != nil { return err } - if os.Getenv("GRPC") != "" { - lis, err := net.Listen("tcp", ":8888") - if err != nil { - grpclog.Fatalf("failed to listen: %v", err) - } - grpcServer := grpc.NewServer() - types.RegisterAPIServer(grpcServer, server.NewServer(supervisor)) - return grpcServer.Serve(lis) - + l, err := net.Listen("tcp", ":8888") + if err != nil { + return err } - server := v1.NewServer(supervisor) - return http.ListenAndServe("localhost:8888", server) + s := grpc.NewServer() + types.RegisterAPIServer(s, server.NewServer(supervisor)) + return s.Serve(l) } // getDefaultID returns the hostname for the instance host