Remove tasks map from service
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
3c4e2a36dc
commit
745398b2e9
@ -133,7 +133,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts)
|
|||||||
os.RemoveAll(path)
|
os.RemoveAll(path)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
c := newTask(id, opts.Spec, s)
|
c := newTask(id, namespace, opts.Spec, s)
|
||||||
// after the task is created, add it to the monitor
|
// after the task is created, add it to the monitor
|
||||||
if err = r.monitor.Monitor(c); err != nil {
|
if err = r.monitor.Monitor(c); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -176,7 +176,7 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
|
|||||||
if !fi.IsDir() {
|
if !fi.IsDir() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tasks, err := r.loadContainers(ctx, fi.Name())
|
tasks, err := r.loadTasks(ctx, fi.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -185,7 +185,15 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
|
|||||||
return o, nil
|
return o, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) loadContainers(ctx context.Context, ns string) ([]plugin.Task, error) {
|
func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) {
|
||||||
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return r.loadTask(ctx, filepath.Join(r.root, namespace, id))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]plugin.Task, error) {
|
||||||
dir, err := ioutil.ReadDir(filepath.Join(r.root, ns))
|
dir, err := ioutil.ReadDir(filepath.Join(r.root, ns))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -198,7 +206,7 @@ func (r *Runtime) loadContainers(ctx context.Context, ns string) ([]plugin.Task,
|
|||||||
id := fi.Name()
|
id := fi.Name()
|
||||||
// TODO: optimize this if it is call frequently to list all containers
|
// TODO: optimize this if it is call frequently to list all containers
|
||||||
// i.e. dont' reconnect to the the shim's ever time
|
// i.e. dont' reconnect to the the shim's ever time
|
||||||
c, err := r.loadContainer(ctx, filepath.Join(r.root, ns, id))
|
c, err := r.loadTask(ctx, filepath.Join(r.root, ns, id))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id)
|
log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id)
|
||||||
// if we fail to load the container, connect to the shim, make sure if the shim has
|
// if we fail to load the container, connect to the shim, make sure if the shim has
|
||||||
@ -283,7 +291,7 @@ func (r *Runtime) deleteBundle(namespace, id string) error {
|
|||||||
return os.RemoveAll(filepath.Join(r.root, namespace, id))
|
return os.RemoveAll(filepath.Join(r.root, namespace, id))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) loadContainer(ctx context.Context, path string) (*Task, error) {
|
func (r *Runtime) loadTask(ctx context.Context, path string) (*Task, error) {
|
||||||
namespace, err := namespaces.NamespaceRequired(ctx)
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -304,6 +312,7 @@ func (r *Runtime) loadContainer(ctx context.Context, path string) (*Task, error)
|
|||||||
containerID: id,
|
containerID: id,
|
||||||
shim: s,
|
shim: s,
|
||||||
spec: data,
|
spec: data,
|
||||||
|
namespace: namespace,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,13 +16,15 @@ type Task struct {
|
|||||||
containerID string
|
containerID string
|
||||||
spec []byte
|
spec []byte
|
||||||
shim shim.ShimClient
|
shim shim.ShimClient
|
||||||
|
namespace string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTask(id string, spec []byte, shim shim.ShimClient) *Task {
|
func newTask(id, namespace string, spec []byte, shim shim.ShimClient) *Task {
|
||||||
return &Task{
|
return &Task{
|
||||||
containerID: id,
|
containerID: id,
|
||||||
shim: shim,
|
shim: shim,
|
||||||
spec: spec,
|
spec: spec,
|
||||||
|
namespace: namespace,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -32,6 +34,7 @@ func (c *Task) Info() plugin.TaskInfo {
|
|||||||
ContainerID: c.containerID,
|
ContainerID: c.containerID,
|
||||||
Runtime: runtimeName,
|
Runtime: runtimeName,
|
||||||
Spec: c.spec,
|
Spec: c.spec,
|
||||||
|
Namespace: c.namespace,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@ type TaskInfo struct {
|
|||||||
ContainerID string
|
ContainerID string
|
||||||
Runtime string
|
Runtime string
|
||||||
Spec []byte
|
Spec []byte
|
||||||
|
Namespace string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Task interface {
|
type Task interface {
|
||||||
|
@ -34,6 +34,8 @@ type Exit struct {
|
|||||||
type Runtime interface {
|
type Runtime interface {
|
||||||
// Create creates a container with the provided id and options
|
// Create creates a container with the provided id and options
|
||||||
Create(ctx context.Context, id string, opts CreateOpts) (Task, error)
|
Create(ctx context.Context, id string, opts CreateOpts) (Task, error)
|
||||||
|
// Get returns a container
|
||||||
|
Get(context.Context, string) (Task, error)
|
||||||
// Containers returns all the current containers for the runtime
|
// Containers returns all the current containers for the runtime
|
||||||
Tasks(context.Context) ([]Task, error)
|
Tasks(context.Context) ([]Task, error)
|
||||||
// Delete removes the container in the runtime
|
// Delete removes the container in the runtime
|
||||||
|
@ -7,7 +7,6 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
"github.com/boltdb/bolt"
|
||||||
api "github.com/containerd/containerd/api/services/execution"
|
api "github.com/containerd/containerd/api/services/execution"
|
||||||
@ -48,7 +47,6 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|||||||
}
|
}
|
||||||
return &Service{
|
return &Service{
|
||||||
runtimes: ic.Runtimes,
|
runtimes: ic.Runtimes,
|
||||||
tasks: make(map[string]plugin.Task),
|
|
||||||
db: ic.Meta,
|
db: ic.Meta,
|
||||||
collector: c,
|
collector: c,
|
||||||
store: ic.Content,
|
store: ic.Content,
|
||||||
@ -56,10 +54,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
mu sync.Mutex
|
|
||||||
|
|
||||||
runtimes map[string]plugin.Runtime
|
runtimes map[string]plugin.Runtime
|
||||||
tasks map[string]plugin.Task
|
|
||||||
db *bolt.DB
|
db *bolt.DB
|
||||||
collector *collector
|
collector *collector
|
||||||
store content.Store
|
store content.Store
|
||||||
@ -67,16 +62,6 @@ type Service struct {
|
|||||||
|
|
||||||
func (s *Service) Register(server *grpc.Server) error {
|
func (s *Service) Register(server *grpc.Server) error {
|
||||||
api.RegisterTasksServer(server, s)
|
api.RegisterTasksServer(server, s)
|
||||||
// load all tasks
|
|
||||||
for _, r := range s.runtimes {
|
|
||||||
tasks, err := r.Tasks(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, c := range tasks {
|
|
||||||
s.tasks[c.Info().ContainerID] = c
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -142,18 +127,10 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s.mu.Lock()
|
|
||||||
if _, ok := s.tasks[r.ContainerID]; ok {
|
|
||||||
s.mu.Unlock()
|
|
||||||
return nil, grpc.Errorf(codes.AlreadyExists, "task %v already exists", r.ContainerID)
|
|
||||||
}
|
|
||||||
c, err := runtime.Create(ctx, r.ContainerID, opts)
|
c, err := runtime.Create(ctx, r.ContainerID, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.mu.Unlock()
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s.tasks[r.ContainerID] = c
|
|
||||||
s.mu.Unlock()
|
|
||||||
state, err := c.State(ctx)
|
state, err := c.State(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).Error(err)
|
log.G(ctx).Error(err)
|
||||||
@ -165,7 +142,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) {
|
func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) {
|
||||||
c, err := s.getTask(r.ContainerID)
|
c, err := s.getTask(ctx, r.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -176,7 +153,7 @@ func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_proto
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.DeleteResponse, error) {
|
func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.DeleteResponse, error) {
|
||||||
c, err := s.getTask(r.ContainerID)
|
c, err := s.getTask(ctx, r.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -188,9 +165,6 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.Delete
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(s.tasks, r.ContainerID)
|
|
||||||
|
|
||||||
return &api.DeleteResponse{
|
return &api.DeleteResponse{
|
||||||
ExitStatus: exit.Status,
|
ExitStatus: exit.Status,
|
||||||
ExitedAt: exit.Timestamp,
|
ExitedAt: exit.Timestamp,
|
||||||
@ -233,7 +207,7 @@ func taskFromContainerd(ctx context.Context, c plugin.Task) (*task.Task, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoResponse, error) {
|
func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoResponse, error) {
|
||||||
task, err := s.getTask(r.ContainerID)
|
task, err := s.getTask(ctx, r.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -248,20 +222,24 @@ func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoRespon
|
|||||||
|
|
||||||
func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) {
|
func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) {
|
||||||
resp := &api.ListResponse{}
|
resp := &api.ListResponse{}
|
||||||
s.mu.Lock()
|
for _, r := range s.runtimes {
|
||||||
defer s.mu.Unlock()
|
tasks, err := r.Tasks(ctx)
|
||||||
for _, cd := range s.tasks {
|
|
||||||
c, err := taskFromContainerd(ctx, cd)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp.Tasks = append(resp.Tasks, c)
|
for _, t := range tasks {
|
||||||
|
tt, err := taskFromContainerd(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp.Tasks = append(resp.Tasks, tt)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_protobuf.Empty, error) {
|
func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_protobuf.Empty, error) {
|
||||||
c, err := s.getTask(r.ContainerID)
|
c, err := s.getTask(ctx, r.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -273,7 +251,7 @@ func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_proto
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_protobuf.Empty, error) {
|
func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_protobuf.Empty, error) {
|
||||||
c, err := s.getTask(r.ContainerID)
|
c, err := s.getTask(ctx, r.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -285,7 +263,7 @@ func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_pro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobuf.Empty, error) {
|
func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobuf.Empty, error) {
|
||||||
c, err := s.getTask(r.ContainerID)
|
c, err := s.getTask(ctx, r.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -306,7 +284,7 @@ func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobu
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Processes(ctx context.Context, r *api.ProcessesRequest) (*api.ProcessesResponse, error) {
|
func (s *Service) Processes(ctx context.Context, r *api.ProcessesRequest) (*api.ProcessesResponse, error) {
|
||||||
c, err := s.getTask(r.ContainerID)
|
c, err := s.getTask(ctx, r.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -338,7 +316,7 @@ func (s *Service) Events(r *api.EventsRequest, server api.Tasks_EventsServer) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecResponse, error) {
|
func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecResponse, error) {
|
||||||
c, err := s.getTask(r.ContainerID)
|
c, err := s.getTask(ctx, r.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -364,7 +342,7 @@ func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecRespon
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.Empty, error) {
|
func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.Empty, error) {
|
||||||
c, err := s.getTask(r.ContainerID)
|
c, err := s.getTask(ctx, r.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -378,7 +356,7 @@ func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*google_protobuf.Empty, error) {
|
func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*google_protobuf.Empty, error) {
|
||||||
c, err := s.getTask(r.ContainerID)
|
c, err := s.getTask(ctx, r.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -389,7 +367,7 @@ func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*go
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointRequest) (*api.CheckpointResponse, error) {
|
func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointRequest) (*api.CheckpointResponse, error) {
|
||||||
c, err := s.getTask(r.ContainerID)
|
c, err := s.getTask(ctx, r.ContainerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -454,14 +432,14 @@ func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io.
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) getTask(id string) (plugin.Task, error) {
|
func (s *Service) getTask(ctx context.Context, id string) (plugin.Task, error) {
|
||||||
s.mu.Lock()
|
for _, r := range s.runtimes {
|
||||||
c, ok := s.tasks[id]
|
t, err := r.Get(ctx, id)
|
||||||
s.mu.Unlock()
|
if err == nil {
|
||||||
if !ok {
|
return t, nil
|
||||||
return nil, grpc.Errorf(codes.NotFound, "task %v not found", id)
|
|
||||||
}
|
}
|
||||||
return c, nil
|
}
|
||||||
|
return nil, grpc.Errorf(codes.NotFound, "task %v not found", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) getRuntime(name string) (plugin.Runtime, error) {
|
func (s *Service) getRuntime(name string) (plugin.Runtime, error) {
|
||||||
|
@ -34,7 +34,6 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func New(ic *plugin.InitContext) (interface{}, error) {
|
func New(ic *plugin.InitContext) (interface{}, error) {
|
||||||
|
|
||||||
rootDir := filepath.Join(ic.Root, runtimeName)
|
rootDir := filepath.Join(ic.Root, runtimeName)
|
||||||
if err := os.MkdirAll(rootDir, 0755); err != nil {
|
if err := os.MkdirAll(rootDir, 0755); err != nil {
|
||||||
return nil, errors.Wrapf(err, "could not create state directory at %s", rootDir)
|
return nil, errors.Wrapf(err, "could not create state directory at %s", rootDir)
|
||||||
@ -152,10 +151,19 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
|
|||||||
list = append(list, c)
|
list = append(list, c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return list, nil
|
return list, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
c, ok := r.containers[id]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("container %s does not exit", id)
|
||||||
|
}
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Runtime) Events(ctx context.Context) <-chan *plugin.Event {
|
func (r *Runtime) Events(ctx context.Context) <-chan *plugin.Event {
|
||||||
return r.events
|
return r.events
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user