253 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			253 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package server
 | 
						|
 | 
						|
import (
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"syscall"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"google.golang.org/grpc"
 | 
						|
	"google.golang.org/grpc/codes"
 | 
						|
 | 
						|
	"github.com/docker/containerd/api/grpc/types"
 | 
						|
	"github.com/docker/containerd/runtime"
 | 
						|
	"github.com/docker/containerd/specs"
 | 
						|
	"github.com/docker/containerd/supervisor"
 | 
						|
	"golang.org/x/net/context"
 | 
						|
)
 | 
						|
 | 
						|
type apiServer struct {
 | 
						|
	sv *supervisor.Supervisor
 | 
						|
}
 | 
						|
 | 
						|
// NewServer returns grpc server instance
 | 
						|
func NewServer(sv *supervisor.Supervisor) types.APIServer {
 | 
						|
	return &apiServer{
 | 
						|
		sv: sv,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
 | 
						|
	if c.BundlePath == "" {
 | 
						|
		return nil, errors.New("empty bundle path")
 | 
						|
	}
 | 
						|
	e := &supervisor.StartTask{}
 | 
						|
	e.ID = c.Id
 | 
						|
	e.BundlePath = c.BundlePath
 | 
						|
	e.Stdin = c.Stdin
 | 
						|
	e.Stdout = c.Stdout
 | 
						|
	e.Stderr = c.Stderr
 | 
						|
	e.Labels = c.Labels
 | 
						|
	e.NoPivotRoot = c.NoPivotRoot
 | 
						|
	e.StartResponse = make(chan supervisor.StartResponse, 1)
 | 
						|
	createContainerConfigCheckpoint(e, c)
 | 
						|
	s.sv.SendTask(e)
 | 
						|
	if err := <-e.ErrorCh(); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	r := <-e.StartResponse
 | 
						|
	apiC, err := createAPIContainer(r.Container, false)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return &types.CreateContainerResponse{
 | 
						|
		Container: apiC,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) {
 | 
						|
	e := &supervisor.SignalTask{}
 | 
						|
	e.ID = r.Id
 | 
						|
	e.PID = r.Pid
 | 
						|
	e.Signal = syscall.Signal(int(r.Signal))
 | 
						|
	s.sv.SendTask(e)
 | 
						|
	if err := <-e.ErrorCh(); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return &types.SignalResponse{}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) (*types.AddProcessResponse, error) {
 | 
						|
	process := &specs.ProcessSpec{
 | 
						|
		Terminal: r.Terminal,
 | 
						|
		Args:     r.Args,
 | 
						|
		Env:      r.Env,
 | 
						|
		Cwd:      r.Cwd,
 | 
						|
	}
 | 
						|
	setPlatformRuntimeProcessSpecUserFields(r, process)
 | 
						|
 | 
						|
	if r.Id == "" {
 | 
						|
		return nil, fmt.Errorf("container id cannot be empty")
 | 
						|
	}
 | 
						|
	if r.Pid == "" {
 | 
						|
		return nil, fmt.Errorf("process id cannot be empty")
 | 
						|
	}
 | 
						|
	e := &supervisor.AddProcessTask{}
 | 
						|
	e.ID = r.Id
 | 
						|
	e.PID = r.Pid
 | 
						|
	e.ProcessSpec = process
 | 
						|
	e.Stdin = r.Stdin
 | 
						|
	e.Stdout = r.Stdout
 | 
						|
	e.Stderr = r.Stderr
 | 
						|
	e.StartResponse = make(chan supervisor.StartResponse, 1)
 | 
						|
	s.sv.SendTask(e)
 | 
						|
	if err := <-e.ErrorCh(); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	<-e.StartResponse
 | 
						|
	return &types.AddProcessResponse{}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) {
 | 
						|
	e := &supervisor.GetContainersTask{}
 | 
						|
	e.ID = r.Id
 | 
						|
	s.sv.SendTask(e)
 | 
						|
	if err := <-e.ErrorCh(); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	m := s.sv.Machine()
 | 
						|
	state := &types.StateResponse{
 | 
						|
		Machine: &types.Machine{
 | 
						|
			Cpus:   uint32(m.Cpus),
 | 
						|
			Memory: uint64(m.Memory),
 | 
						|
		},
 | 
						|
	}
 | 
						|
	for _, c := range e.Containers {
 | 
						|
		apiC, err := createAPIContainer(c, true)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		state.Containers = append(state.Containers, apiC)
 | 
						|
	}
 | 
						|
	return state, nil
 | 
						|
}
 | 
						|
 | 
						|
func createAPIContainer(c runtime.Container, getPids bool) (*types.Container, error) {
 | 
						|
	processes, err := c.Processes()
 | 
						|
	if err != nil {
 | 
						|
		return nil, grpc.Errorf(codes.Internal, "get processes for container: "+err.Error())
 | 
						|
	}
 | 
						|
	var procs []*types.Process
 | 
						|
	for _, p := range processes {
 | 
						|
		oldProc := p.Spec()
 | 
						|
		stdio := p.Stdio()
 | 
						|
		appendToProcs := &types.Process{
 | 
						|
			Pid:       p.ID(),
 | 
						|
			SystemPid: uint32(p.SystemPid()),
 | 
						|
			Terminal:  oldProc.Terminal,
 | 
						|
			Args:      oldProc.Args,
 | 
						|
			Env:       oldProc.Env,
 | 
						|
			Cwd:       oldProc.Cwd,
 | 
						|
			Stdin:     stdio.Stdin,
 | 
						|
			Stdout:    stdio.Stdout,
 | 
						|
			Stderr:    stdio.Stderr,
 | 
						|
		}
 | 
						|
		setUserFieldsInProcess(appendToProcs, oldProc)
 | 
						|
		procs = append(procs, appendToProcs)
 | 
						|
	}
 | 
						|
	var pids []int
 | 
						|
	state := c.State()
 | 
						|
	if getPids && (state == runtime.Running || state == runtime.Paused) {
 | 
						|
		if pids, err = c.Pids(); err != nil {
 | 
						|
			return nil, grpc.Errorf(codes.Internal, "get all pids for container: "+err.Error())
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return &types.Container{
 | 
						|
		Id:         c.ID(),
 | 
						|
		BundlePath: c.Path(),
 | 
						|
		Processes:  procs,
 | 
						|
		Labels:     c.Labels(),
 | 
						|
		Status:     string(state),
 | 
						|
		Pids:       toUint32(pids),
 | 
						|
		Runtime:    c.Runtime(),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func toUint32(its []int) []uint32 {
 | 
						|
	o := []uint32{}
 | 
						|
	for _, i := range its {
 | 
						|
		o = append(o, uint32(i))
 | 
						|
	}
 | 
						|
	return o
 | 
						|
}
 | 
						|
 | 
						|
func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) {
 | 
						|
	e := &supervisor.UpdateTask{}
 | 
						|
	e.ID = r.Id
 | 
						|
	e.State = runtime.State(r.Status)
 | 
						|
	if r.Resources != nil {
 | 
						|
		rs := r.Resources
 | 
						|
		e.Resources = &runtime.Resource{}
 | 
						|
		if rs.CpuShares != 0 {
 | 
						|
			e.Resources.CPUShares = int64(rs.CpuShares)
 | 
						|
		}
 | 
						|
		if rs.BlkioWeight != 0 {
 | 
						|
			e.Resources.BlkioWeight = uint16(rs.BlkioWeight)
 | 
						|
		}
 | 
						|
		if rs.CpuPeriod != 0 {
 | 
						|
			e.Resources.CPUPeriod = int64(rs.CpuPeriod)
 | 
						|
		}
 | 
						|
		if rs.CpuQuota != 0 {
 | 
						|
			e.Resources.CPUQuota = int64(rs.CpuQuota)
 | 
						|
		}
 | 
						|
		if rs.CpusetCpus != "" {
 | 
						|
			e.Resources.CpusetCpus = rs.CpusetCpus
 | 
						|
		}
 | 
						|
		if rs.CpusetMems != "" {
 | 
						|
			e.Resources.CpusetMems = rs.CpusetMems
 | 
						|
		}
 | 
						|
		if rs.KernelMemoryLimit != 0 {
 | 
						|
			e.Resources.KernelMemory = int64(rs.KernelMemoryLimit)
 | 
						|
		}
 | 
						|
		if rs.MemoryLimit != 0 {
 | 
						|
			e.Resources.Memory = int64(rs.MemoryLimit)
 | 
						|
		}
 | 
						|
		if rs.MemoryReservation != 0 {
 | 
						|
			e.Resources.MemoryReservation = int64(rs.MemoryReservation)
 | 
						|
		}
 | 
						|
		if rs.MemorySwap != 0 {
 | 
						|
			e.Resources.MemorySwap = int64(rs.MemorySwap)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	s.sv.SendTask(e)
 | 
						|
	if err := <-e.ErrorCh(); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return &types.UpdateContainerResponse{}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
 | 
						|
	e := &supervisor.UpdateProcessTask{}
 | 
						|
	e.ID = r.Id
 | 
						|
	e.PID = r.Pid
 | 
						|
	e.Height = int(r.Height)
 | 
						|
	e.Width = int(r.Width)
 | 
						|
	e.CloseStdin = r.CloseStdin
 | 
						|
	s.sv.SendTask(e)
 | 
						|
	if err := <-e.ErrorCh(); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return &types.UpdateProcessResponse{}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error {
 | 
						|
	t := time.Time{}
 | 
						|
	if r.Timestamp != 0 {
 | 
						|
		t = time.Unix(int64(r.Timestamp), 0)
 | 
						|
	}
 | 
						|
	events := s.sv.Events(t)
 | 
						|
	defer s.sv.Unsubscribe(events)
 | 
						|
	for e := range events {
 | 
						|
		if err := stream.Send(&types.Event{
 | 
						|
			Id:        e.ID,
 | 
						|
			Type:      e.Type,
 | 
						|
			Timestamp: uint64(e.Timestamp.Unix()),
 | 
						|
			Pid:       e.PID,
 | 
						|
			Status:    uint32(e.Status),
 | 
						|
		}); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 |