Update GRPC for consistency
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
@@ -1,89 +0,0 @@
|
||||
package execution
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/containerd/containerd/plugin"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func newCollector(ctx context.Context, runtimes map[string]plugin.Runtime) (*collector, error) {
|
||||
c := &collector{
|
||||
context: ctx,
|
||||
ch: make(chan *plugin.Event, 2048),
|
||||
eventClients: make(map[*eventClient]struct{}),
|
||||
}
|
||||
for _, r := range runtimes {
|
||||
if err := c.collect(r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// run the publisher
|
||||
go c.publisher()
|
||||
// run a goroutine that waits for the context to be done
|
||||
// and closes the channel after all writes have finished
|
||||
go c.waitDone()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
type eventClient struct {
|
||||
eCh chan error
|
||||
w *grpcEventWriter
|
||||
}
|
||||
|
||||
type collector struct {
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
|
||||
context context.Context
|
||||
ch chan *plugin.Event
|
||||
eventClients map[*eventClient]struct{}
|
||||
}
|
||||
|
||||
// collect collects events from the provided runtime
|
||||
func (c *collector) collect(r plugin.Runtime) error {
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
for e := range r.Events(c.context) {
|
||||
c.ch <- e
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *collector) forward(w *grpcEventWriter) error {
|
||||
client := &eventClient{
|
||||
w: w,
|
||||
eCh: make(chan error, 1),
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.eventClients[client] = struct{}{}
|
||||
c.mu.Unlock()
|
||||
err := <-client.eCh
|
||||
c.mu.Lock()
|
||||
delete(c.eventClients, client)
|
||||
c.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *collector) publisher() {
|
||||
for e := range c.ch {
|
||||
c.mu.Lock()
|
||||
for client := range c.eventClients {
|
||||
if err := client.w.Write(e); err != nil {
|
||||
client.eCh <- err
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// waitDone waits for the context to finish, waits for all the goroutines to finish
|
||||
// collecting grpc events from the shim, and closes the output channel
|
||||
func (c *collector) waitDone() {
|
||||
<-c.context.Done()
|
||||
c.wg.Wait()
|
||||
close(c.ch)
|
||||
}
|
||||
@@ -67,26 +67,20 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
||||
r := rr.(plugin.Runtime)
|
||||
runtimes[r.ID()] = r
|
||||
}
|
||||
c, err := newCollector(ic.Context, runtimes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e := events.GetPoster(ic.Context)
|
||||
return &Service{
|
||||
runtimes: runtimes,
|
||||
db: m.(*bolt.DB),
|
||||
collector: c,
|
||||
store: ct.(content.Store),
|
||||
emitter: e,
|
||||
runtimes: runtimes,
|
||||
db: m.(*bolt.DB),
|
||||
store: ct.(content.Store),
|
||||
emitter: e,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
runtimes map[string]plugin.Runtime
|
||||
db *bolt.DB
|
||||
collector *collector
|
||||
store content.Store
|
||||
emitter events.Poster
|
||||
runtimes map[string]plugin.Runtime
|
||||
db *bolt.DB
|
||||
store content.Store
|
||||
emitter events.Poster
|
||||
}
|
||||
|
||||
func (s *Service) Register(server *grpc.Server) error {
|
||||
@@ -94,7 +88,7 @@ func (s *Service) Register(server *grpc.Server) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.CreateResponse, error) {
|
||||
func (s *Service) Create(ctx context.Context, r *api.CreateTaskRequest) (*api.CreateTaskResponse, error) {
|
||||
var (
|
||||
checkpointPath string
|
||||
err error
|
||||
@@ -152,7 +146,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
|
||||
Options: m.Options,
|
||||
})
|
||||
}
|
||||
runtime, err := s.getRuntime(container.Runtime)
|
||||
runtime, err := s.getRuntime(container.Runtime.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -171,13 +165,13 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &api.CreateResponse{
|
||||
return &api.CreateTaskResponse{
|
||||
ContainerID: r.ContainerID,
|
||||
Pid: state.Pid,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) {
|
||||
func (s *Service) Start(ctx context.Context, r *api.StartTaskRequest) (*google_protobuf.Empty, error) {
|
||||
t, err := s.getTask(ctx, r.ContainerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -195,7 +189,7 @@ func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_proto
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.DeleteResponse, error) {
|
||||
func (s *Service) Delete(ctx context.Context, r *api.DeleteTaskRequest) (*api.DeleteResponse, error) {
|
||||
t, err := s.getTask(ctx, r.ContainerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -271,7 +265,7 @@ func taskFromContainerd(ctx context.Context, c plugin.Task) (*task.Task, error)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoResponse, error) {
|
||||
func (s *Service) Get(ctx context.Context, r *api.GetTaskRequest) (*api.GetTaskResponse, error) {
|
||||
task, err := s.getTask(ctx, r.ContainerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -280,13 +274,13 @@ func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoRespon
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &api.InfoResponse{
|
||||
return &api.GetTaskResponse{
|
||||
Task: t,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) {
|
||||
resp := &api.ListResponse{}
|
||||
func (s *Service) List(ctx context.Context, r *api.ListTasksRequest) (*api.ListTasksResponse, error) {
|
||||
resp := &api.ListTasksResponse{}
|
||||
for _, r := range s.runtimes {
|
||||
tasks, err := r.Tasks(ctx)
|
||||
if err != nil {
|
||||
@@ -303,7 +297,7 @@ func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListRespon
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_protobuf.Empty, error) {
|
||||
func (s *Service) Pause(ctx context.Context, r *api.PauseTaskRequest) (*google_protobuf.Empty, error) {
|
||||
t, err := s.getTask(ctx, r.ContainerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -315,7 +309,7 @@ func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_proto
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_protobuf.Empty, error) {
|
||||
func (s *Service) Resume(ctx context.Context, r *api.ResumeTaskRequest) (*google_protobuf.Empty, error) {
|
||||
t, err := s.getTask(ctx, r.ContainerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -348,7 +342,7 @@ func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobu
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
func (s *Service) Processes(ctx context.Context, r *api.ProcessesRequest) (*api.ProcessesResponse, error) {
|
||||
func (s *Service) ListProcesses(ctx context.Context, r *api.ListProcessesRequest) (*api.ListProcessesResponse, error) {
|
||||
t, err := s.getTask(ctx, r.ContainerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -365,22 +359,12 @@ func (s *Service) Processes(ctx context.Context, r *api.ProcessesRequest) (*api.
|
||||
Pid: pid,
|
||||
})
|
||||
}
|
||||
|
||||
resp := &api.ProcessesResponse{
|
||||
return &api.ListProcessesResponse{
|
||||
Processes: ps,
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) Events(r *api.EventsRequest, server api.Tasks_EventsServer) error {
|
||||
w := &grpcEventWriter{
|
||||
server: server,
|
||||
}
|
||||
return s.collector.forward(w)
|
||||
}
|
||||
|
||||
func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecResponse, error) {
|
||||
func (s *Service) Exec(ctx context.Context, r *api.ExecProcessRequest) (*api.ExecProcessResponse, error) {
|
||||
t, err := s.getTask(ctx, r.ContainerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -401,12 +385,12 @@ func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecRespon
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &api.ExecResponse{
|
||||
return &api.ExecProcessResponse{
|
||||
Pid: state.Pid,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.Empty, error) {
|
||||
func (s *Service) ResizePty(ctx context.Context, r *api.ResizePtyRequest) (*google_protobuf.Empty, error) {
|
||||
t, err := s.getTask(ctx, r.ContainerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -420,18 +404,20 @@ func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*google_protobuf.Empty, error) {
|
||||
func (s *Service) CloseIO(ctx context.Context, r *api.CloseIORequest) (*google_protobuf.Empty, error) {
|
||||
t, err := s.getTask(ctx, r.ContainerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := t.CloseStdin(ctx, r.Pid); err != nil {
|
||||
return nil, err
|
||||
if r.Stdin {
|
||||
if err := t.CloseStdin(ctx, r.Pid); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointRequest) (*api.CheckpointResponse, error) {
|
||||
func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointTaskRequest) (*api.CheckpointTaskResponse, error) {
|
||||
t, err := s.getTask(ctx, r.ContainerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -441,16 +427,7 @@ func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointRequest) (*ap
|
||||
return nil, err
|
||||
}
|
||||
defer os.RemoveAll(image)
|
||||
if err := t.Checkpoint(ctx, plugin.CheckpointOpts{
|
||||
Exit: r.Exit,
|
||||
AllowTCP: r.AllowTcp,
|
||||
AllowTerminal: r.AllowTerminal,
|
||||
AllowUnixSockets: r.AllowUnixSockets,
|
||||
FileLocks: r.FileLocks,
|
||||
// ParentImage: r.ParentImage,
|
||||
EmptyNamespaces: r.EmptyNamespaces,
|
||||
Path: image,
|
||||
}); err != nil {
|
||||
if err := t.Checkpoint(ctx, image, r.Options); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// write checkpoint to the content store
|
||||
@@ -469,7 +446,7 @@ func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointRequest) (*ap
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &api.CheckpointResponse{
|
||||
return &api.CheckpointTaskResponse{
|
||||
Descriptors: []*descriptor.Descriptor{
|
||||
cp,
|
||||
specD,
|
||||
@@ -523,31 +500,3 @@ func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type grpcEventWriter struct {
|
||||
server api.Tasks_EventsServer
|
||||
}
|
||||
|
||||
func (g *grpcEventWriter) Write(e *plugin.Event) error {
|
||||
var t task.Event_EventType
|
||||
switch e.Type {
|
||||
case plugin.ExitEvent:
|
||||
t = task.Event_EXIT
|
||||
case plugin.ExecAddEvent:
|
||||
t = task.Event_EXEC_ADDED
|
||||
case plugin.PausedEvent:
|
||||
t = task.Event_PAUSED
|
||||
case plugin.CreateEvent:
|
||||
t = task.Event_CREATE
|
||||
case plugin.StartEvent:
|
||||
t = task.Event_START
|
||||
case plugin.OOMEvent:
|
||||
t = task.Event_OOM
|
||||
}
|
||||
return g.server.Send(&task.Event{
|
||||
Type: t,
|
||||
ID: e.ID,
|
||||
Pid: e.Pid,
|
||||
ExitStatus: e.ExitStatus,
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user