diff --git a/Protobuild.toml b/Protobuild.toml index e368eec77..e3327eb50 100644 --- a/Protobuild.toml +++ b/Protobuild.toml @@ -32,9 +32,13 @@ plugins = ["grpc", "fieldpath"] "google/rpc/status.proto" = "github.com/containerd/containerd/protobuf/google/rpc" [[overrides]] - prefixes = ["github.com/containerd/containerd/api/events"] -plugins = ["fieldpath"] +plugins = ["fieldpath"] # disable grpc for this package + +[[overrides]] +# enable ttrpc and disable fieldpath and grpc for the shim +prefixes = ["github.com/containerd/containerd/linux/shim/v1"] +plugins = ["ttrpc"] # Aggregrate the API descriptors to lock down API changes. [[descriptors]] diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index 31164f7dd..2c6d0ff0a 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -25,8 +25,8 @@ import ( "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" "github.com/sirupsen/logrus" + "github.com/stevvooe/ttrpc" "golang.org/x/sys/unix" - "google.golang.org/grpc" ) var ( @@ -100,7 +100,8 @@ func executeShim() error { return err } logrus.Debug("registering grpc server") - shimapi.RegisterShimServer(server, sv) + shimapi.RegisterShimService(server, sv) + socket := socketFlag if err := serve(server, socket); err != nil { return err @@ -115,7 +116,7 @@ func executeShim() error { // serve serves the grpc API over a unix socket at the provided path // this function does not block -func serve(server *grpc.Server, path string) error { +func serve(server *ttrpc.Server, path string) error { var ( l net.Listener err error @@ -140,7 +141,7 @@ func serve(server *grpc.Server, path string) error { return nil } -func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *grpc.Server, sv *shim.Service) error { +func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *ttrpc.Server, sv *shim.Service) error { var ( termOnce sync.Once done = make(chan struct{}) @@ -158,9 +159,12 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *grpc.Se } case unix.SIGTERM, unix.SIGINT: go termOnce.Do(func() { - server.Stop() + ctx := context.TODO() + if err := server.Shutdown(ctx); err != nil { + logger.WithError(err).Error("failed to shutdown server") + } // Ensure our child is dead if any - sv.Kill(context.Background(), &shimapi.KillRequest{ + sv.Kill(ctx, &shimapi.KillRequest{ Signal: uint32(syscall.SIGKILL), All: true, }) diff --git a/cmd/containerd-shim/shim_linux.go b/cmd/containerd-shim/shim_linux.go index 0b5c9a384..140795ede 100644 --- a/cmd/containerd-shim/shim_linux.go +++ b/cmd/containerd-shim/shim_linux.go @@ -1,20 +1,13 @@ package main import ( - "net" "os" "os/signal" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - - "golang.org/x/net/context" - "golang.org/x/sys/unix" - "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/sys" runc "github.com/containerd/go-runc" - "github.com/pkg/errors" + "github.com/stevvooe/ttrpc" ) // setupSignals creates a new signal handler for all signals and sets the shim as a @@ -32,64 +25,6 @@ func setupSignals() (chan os.Signal, error) { return signals, nil } -func newServer() *grpc.Server { - return grpc.NewServer(grpc.Creds(NewUnixSocketCredentials(0, 0))) -} - -type unixSocketCredentials struct { - uid int - gid int - serverName string -} - -// NewUnixSocketCredentials returns TransportCredentials for a local unix socket -func NewUnixSocketCredentials(uid, gid int) credentials.TransportCredentials { - return &unixSocketCredentials{uid, gid, "locahost"} -} - -func (u *unixSocketCredentials) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { - return nil, nil, errors.New("ClientHandshake is not supported by unixSocketCredentials") -} - -func (u *unixSocketCredentials) ServerHandshake(c net.Conn) (net.Conn, credentials.AuthInfo, error) { - uc, ok := c.(*net.UnixConn) - if !ok { - return nil, nil, errors.New("unixSocketCredentials only supports unix socket") - } - - f, err := uc.File() - if err != nil { - return nil, nil, errors.Wrap(err, "unixSocketCredentials: failed to retrieve connection underlying fd") - } - pcred, err := unix.GetsockoptUcred(int(f.Fd()), unix.SOL_SOCKET, unix.SO_PEERCRED) - if err != nil { - return nil, nil, errors.Wrap(err, "unixSocketCredentials: failed to retrieve socket peer credentials") - } - - if (u.uid != -1 && uint32(u.uid) != pcred.Uid) || (u.gid != -1 && uint32(u.gid) != pcred.Gid) { - return nil, nil, errors.New("unixSocketCredentials: invalid credentials") - } - - return c, u, nil -} - -func (u *unixSocketCredentials) Info() credentials.ProtocolInfo { - return credentials.ProtocolInfo{ - SecurityProtocol: "unix-socket-peer-creds", - SecurityVersion: "1.0", - ServerName: u.serverName, - } -} - -func (u *unixSocketCredentials) Clone() credentials.TransportCredentials { - return &unixSocketCredentials{u.uid, u.gid, u.serverName} -} - -func (u *unixSocketCredentials) OverrideServerName(serverName string) error { - u.serverName = serverName - return nil -} - -func (u *unixSocketCredentials) AuthType() string { - return "unix-socket-peer-creds" +func newServer() *ttrpc.Server { + return ttrpc.NewServer() } diff --git a/cmd/containerd-shim/shim_unix.go b/cmd/containerd-shim/shim_unix.go index b6bf2a6a0..b7aa13e8f 100644 --- a/cmd/containerd-shim/shim_unix.go +++ b/cmd/containerd-shim/shim_unix.go @@ -6,10 +6,9 @@ import ( "os" "os/signal" - "google.golang.org/grpc" - "github.com/containerd/containerd/reaper" runc "github.com/containerd/go-runc" + "github.com/stevvooe/ttrpc" ) // setupSignals creates a new signal handler for all signals and sets the shim as a @@ -23,6 +22,6 @@ func setupSignals() (chan os.Signal, error) { return signals, nil } -func newServer() *grpc.Server { - return grpc.NewServer() +func newServer() *ttrpc.Server { + return ttrpc.NewServer() } diff --git a/cmd/ctr/commands/shim/shim.go b/cmd/ctr/commands/shim/shim.go index e0413c10b..240a20e74 100644 --- a/cmd/ctr/commands/shim/shim.go +++ b/cmd/ctr/commands/shim/shim.go @@ -6,12 +6,9 @@ import ( "fmt" "io/ioutil" "net" - "time" gocontext "context" - "google.golang.org/grpc" - "github.com/containerd/console" "github.com/containerd/containerd/cmd/ctr/commands" shim "github.com/containerd/containerd/linux/shim/v1" @@ -20,6 +17,7 @@ import ( "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/stevvooe/ttrpc" "github.com/urfave/cli" ) @@ -212,21 +210,21 @@ var execCommand = cli.Command{ }, } -func getShimService(context *cli.Context) (shim.ShimClient, error) { +func getShimService(context *cli.Context) (shim.ShimService, error) { bindSocket := context.GlobalString("socket") if bindSocket == "" { return nil, errors.New("socket path must be specified") } - dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(100 * time.Second)} - dialOpts = append(dialOpts, - grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { - return net.DialTimeout("unix", "\x00"+bindSocket, timeout) - }, - )) - conn, err := grpc.Dial(fmt.Sprintf("unix://%s", bindSocket), dialOpts...) + conn, err := net.Dial("unix", "\x00"+bindSocket) if err != nil { return nil, err } - return shim.NewShimClient(conn), nil + + client := ttrpc.NewClient(conn) + + // TODO(stevvooe): This actually leaks the connection. We were leaking it + // before, so may not be a huge deal. + + return shim.NewShimClient(client), nil } diff --git a/cmd/protoc-gen-gogoctrd/main.go b/cmd/protoc-gen-gogoctrd/main.go index e55114cc3..708289d18 100644 --- a/cmd/protoc-gen-gogoctrd/main.go +++ b/cmd/protoc-gen-gogoctrd/main.go @@ -5,6 +5,7 @@ import ( "github.com/gogo/protobuf/protoc-gen-gogo/descriptor" "github.com/gogo/protobuf/vanity" "github.com/gogo/protobuf/vanity/command" + _ "github.com/stevvooe/ttrpc/plugin" ) func main() { diff --git a/linux/shim/client/client.go b/linux/shim/client/client.go index 7525dada7..db59e2cee 100644 --- a/linux/shim/client/client.go +++ b/linux/shim/client/client.go @@ -4,7 +4,6 @@ package client import ( "context" - "fmt" "io" "net" "os" @@ -18,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/stevvooe/ttrpc" "github.com/containerd/containerd/events" "github.com/containerd/containerd/linux/shim" @@ -26,17 +26,16 @@ import ( "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/sys" ptypes "github.com/gogo/protobuf/types" - "google.golang.org/grpc" ) var empty = &ptypes.Empty{} // Opt is an option for a shim client configuration -type Opt func(context.Context, shim.Config) (shimapi.ShimClient, io.Closer, error) +type Opt func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error) // WithStart executes a new shim process func WithStart(binary, address, daemonAddress, cgroup string, nonewns, debug bool, exitHandler func()) Opt { - return func(ctx context.Context, config shim.Config) (_ shimapi.ShimClient, _ io.Closer, err error) { + return func(ctx context.Context, config shim.Config) (_ shimapi.ShimService, _ io.Closer, err error) { socket, err := newSocket(address) if err != nil { return nil, nil, err @@ -139,19 +138,8 @@ func newSocket(address string) (*net.UnixListener, error) { return l.(*net.UnixListener), nil } -func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) { - gopts := []grpc.DialOption{ - grpc.WithBlock(), - grpc.WithInsecure(), - grpc.WithTimeout(100 * time.Second), - grpc.WithDialer(d), - grpc.FailOnNonTempDialError(true), - } - conn, err := grpc.Dial(dialAddress(address), gopts...) - if err != nil { - return nil, errors.Wrapf(err, "failed to dial %q", address) - } - return conn, nil +func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) { + return d(address, 100*time.Second) } func annonDialer(address string, timeout time.Duration) (net.Conn, error) { @@ -159,24 +147,20 @@ func annonDialer(address string, timeout time.Duration) (net.Conn, error) { return net.DialTimeout("unix", "\x00"+address, timeout) } -func dialAddress(address string) string { - return fmt.Sprintf("unix://%s", address) -} - // WithConnect connects to an existing shim func WithConnect(address string) Opt { - return func(ctx context.Context, config shim.Config) (shimapi.ShimClient, io.Closer, error) { + return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) { conn, err := connect(address, annonDialer) if err != nil { return nil, nil, err } - return shimapi.NewShimClient(conn), conn, nil + return shimapi.NewShimClient(ttrpc.NewClient(conn)), conn, nil } } // WithLocal uses an in process shim -func WithLocal(publisher events.Publisher) func(context.Context, shim.Config) (shimapi.ShimClient, io.Closer, error) { - return func(ctx context.Context, config shim.Config) (shimapi.ShimClient, io.Closer, error) { +func WithLocal(publisher events.Publisher) func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error) { + return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) { service, err := shim.NewService(config, publisher) if err != nil { return nil, nil, err @@ -192,15 +176,15 @@ func New(ctx context.Context, config shim.Config, opt Opt) (*Client, error) { return nil, err } return &Client{ - ShimClient: s, - c: c, - exitCh: make(chan struct{}), + ShimService: s, + c: c, + exitCh: make(chan struct{}), }, nil } // Client is a shim client containing the connection to a shim type Client struct { - shimapi.ShimClient + shimapi.ShimService c io.Closer exitCh chan struct{} @@ -212,10 +196,9 @@ type Client struct { func (c *Client) IsAlive(ctx context.Context) (bool, error) { _, err := c.ShimInfo(ctx, empty) if err != nil { - if err != grpc.ErrServerStopped { - return false, err - } - return false, nil + // TODO(stevvooe): There are some error conditions that need to be + // handle with unix sockets existence to give the right answer here. + return false, err } return true, nil } diff --git a/linux/shim/local.go b/linux/shim/local.go index bb856fd90..42649771d 100644 --- a/linux/shim/local.go +++ b/linux/shim/local.go @@ -3,17 +3,16 @@ package shim import ( + "context" "path/filepath" shimapi "github.com/containerd/containerd/linux/shim/v1" ptypes "github.com/gogo/protobuf/types" - "golang.org/x/net/context" "golang.org/x/sys/unix" - "google.golang.org/grpc" ) // NewLocal returns a shim client implementation for issue commands to a shim -func NewLocal(s *Service) shimapi.ShimClient { +func NewLocal(s *Service) shimapi.ShimService { return &local{ s: s, } @@ -23,15 +22,15 @@ type local struct { s *Service } -func (c *local) Create(ctx context.Context, in *shimapi.CreateTaskRequest, opts ...grpc.CallOption) (*shimapi.CreateTaskResponse, error) { +func (c *local) Create(ctx context.Context, in *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) { return c.s.Create(ctx, in) } -func (c *local) Start(ctx context.Context, in *shimapi.StartRequest, opts ...grpc.CallOption) (*shimapi.StartResponse, error) { +func (c *local) Start(ctx context.Context, in *shimapi.StartRequest) (*shimapi.StartResponse, error) { return c.s.Start(ctx, in) } -func (c *local) Delete(ctx context.Context, in *ptypes.Empty, opts ...grpc.CallOption) (*shimapi.DeleteResponse, error) { +func (c *local) Delete(ctx context.Context, in *ptypes.Empty) (*shimapi.DeleteResponse, error) { // make sure we unmount the containers rootfs for this local if err := unix.Unmount(filepath.Join(c.s.config.Path, "rootfs"), 0); err != nil { return nil, err @@ -39,54 +38,54 @@ func (c *local) Delete(ctx context.Context, in *ptypes.Empty, opts ...grpc.CallO return c.s.Delete(ctx, in) } -func (c *local) DeleteProcess(ctx context.Context, in *shimapi.DeleteProcessRequest, opts ...grpc.CallOption) (*shimapi.DeleteResponse, error) { +func (c *local) DeleteProcess(ctx context.Context, in *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) { return c.s.DeleteProcess(ctx, in) } -func (c *local) Exec(ctx context.Context, in *shimapi.ExecProcessRequest, opts ...grpc.CallOption) (*ptypes.Empty, error) { +func (c *local) Exec(ctx context.Context, in *shimapi.ExecProcessRequest) (*ptypes.Empty, error) { return c.s.Exec(ctx, in) } -func (c *local) ResizePty(ctx context.Context, in *shimapi.ResizePtyRequest, opts ...grpc.CallOption) (*ptypes.Empty, error) { +func (c *local) ResizePty(ctx context.Context, in *shimapi.ResizePtyRequest) (*ptypes.Empty, error) { return c.s.ResizePty(ctx, in) } -func (c *local) State(ctx context.Context, in *shimapi.StateRequest, opts ...grpc.CallOption) (*shimapi.StateResponse, error) { +func (c *local) State(ctx context.Context, in *shimapi.StateRequest) (*shimapi.StateResponse, error) { return c.s.State(ctx, in) } -func (c *local) Pause(ctx context.Context, in *ptypes.Empty, opts ...grpc.CallOption) (*ptypes.Empty, error) { +func (c *local) Pause(ctx context.Context, in *ptypes.Empty) (*ptypes.Empty, error) { return c.s.Pause(ctx, in) } -func (c *local) Resume(ctx context.Context, in *ptypes.Empty, opts ...grpc.CallOption) (*ptypes.Empty, error) { +func (c *local) Resume(ctx context.Context, in *ptypes.Empty) (*ptypes.Empty, error) { return c.s.Resume(ctx, in) } -func (c *local) Kill(ctx context.Context, in *shimapi.KillRequest, opts ...grpc.CallOption) (*ptypes.Empty, error) { +func (c *local) Kill(ctx context.Context, in *shimapi.KillRequest) (*ptypes.Empty, error) { return c.s.Kill(ctx, in) } -func (c *local) ListPids(ctx context.Context, in *shimapi.ListPidsRequest, opts ...grpc.CallOption) (*shimapi.ListPidsResponse, error) { +func (c *local) ListPids(ctx context.Context, in *shimapi.ListPidsRequest) (*shimapi.ListPidsResponse, error) { return c.s.ListPids(ctx, in) } -func (c *local) CloseIO(ctx context.Context, in *shimapi.CloseIORequest, opts ...grpc.CallOption) (*ptypes.Empty, error) { +func (c *local) CloseIO(ctx context.Context, in *shimapi.CloseIORequest) (*ptypes.Empty, error) { return c.s.CloseIO(ctx, in) } -func (c *local) Checkpoint(ctx context.Context, in *shimapi.CheckpointTaskRequest, opts ...grpc.CallOption) (*ptypes.Empty, error) { +func (c *local) Checkpoint(ctx context.Context, in *shimapi.CheckpointTaskRequest) (*ptypes.Empty, error) { return c.s.Checkpoint(ctx, in) } -func (c *local) ShimInfo(ctx context.Context, in *ptypes.Empty, opts ...grpc.CallOption) (*shimapi.ShimInfoResponse, error) { +func (c *local) ShimInfo(ctx context.Context, in *ptypes.Empty) (*shimapi.ShimInfoResponse, error) { return c.s.ShimInfo(ctx, in) } -func (c *local) Update(ctx context.Context, in *shimapi.UpdateTaskRequest, opts ...grpc.CallOption) (*ptypes.Empty, error) { +func (c *local) Update(ctx context.Context, in *shimapi.UpdateTaskRequest) (*ptypes.Empty, error) { return c.s.Update(ctx, in) } -func (c *local) Wait(ctx context.Context, in *shimapi.WaitRequest, opts ...grpc.CallOption) (*shimapi.WaitResponse, error) { +func (c *local) Wait(ctx context.Context, in *shimapi.WaitRequest) (*shimapi.WaitResponse, error) { return c.s.Wait(ctx, in) } diff --git a/linux/shim/service.go b/linux/shim/service.go index c2a0aff23..5568621a1 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -3,13 +3,11 @@ package shim import ( + "context" "fmt" "os" "sync" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "github.com/containerd/console" eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" @@ -27,7 +25,8 @@ import ( ptypes "github.com/gogo/protobuf/types" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "golang.org/x/net/context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var empty = &ptypes.Empty{} @@ -167,7 +166,7 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq s.mu.Lock() defer s.mu.Unlock() if r.ID == s.id { - return nil, grpc.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess") + return nil, status.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess") } p := s.processes[r.ID] if p == nil { diff --git a/linux/shim/v1/shim.pb.go b/linux/shim/v1/shim.pb.go index 8d714862d..fd4e32e88 100644 --- a/linux/shim/v1/shim.pb.go +++ b/linux/shim/v1/shim.pb.go @@ -44,16 +44,14 @@ import containerd_v1_types "github.com/containerd/containerd/api/types/task" import time "time" -import ( - context "golang.org/x/net/context" - grpc "google.golang.org/grpc" -) - import github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" import strings "strings" import reflect "reflect" +import context "context" +import github_com_stevvooe_ttrpc "github.com/stevvooe/ttrpc" + import io "io" // Reference imports to suppress errors if they are not otherwise used. @@ -283,578 +281,6 @@ func init() { proto.RegisterType((*WaitRequest)(nil), "containerd.runtime.linux.shim.v1.WaitRequest") proto.RegisterType((*WaitResponse)(nil), "containerd.runtime.linux.shim.v1.WaitResponse") } - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// Client API for Shim service - -type ShimClient interface { - // State returns shim and task state information. - State(ctx context.Context, in *StateRequest, opts ...grpc.CallOption) (*StateResponse, error) - Create(ctx context.Context, in *CreateTaskRequest, opts ...grpc.CallOption) (*CreateTaskResponse, error) - Start(ctx context.Context, in *StartRequest, opts ...grpc.CallOption) (*StartResponse, error) - Delete(ctx context.Context, in *google_protobuf1.Empty, opts ...grpc.CallOption) (*DeleteResponse, error) - DeleteProcess(ctx context.Context, in *DeleteProcessRequest, opts ...grpc.CallOption) (*DeleteResponse, error) - ListPids(ctx context.Context, in *ListPidsRequest, opts ...grpc.CallOption) (*ListPidsResponse, error) - Pause(ctx context.Context, in *google_protobuf1.Empty, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) - Resume(ctx context.Context, in *google_protobuf1.Empty, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) - Checkpoint(ctx context.Context, in *CheckpointTaskRequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) - Kill(ctx context.Context, in *KillRequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) - Exec(ctx context.Context, in *ExecProcessRequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) - ResizePty(ctx context.Context, in *ResizePtyRequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) - CloseIO(ctx context.Context, in *CloseIORequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) - // ShimInfo returns information about the shim. - ShimInfo(ctx context.Context, in *google_protobuf1.Empty, opts ...grpc.CallOption) (*ShimInfoResponse, error) - Update(ctx context.Context, in *UpdateTaskRequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) - Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*WaitResponse, error) -} - -type shimClient struct { - cc *grpc.ClientConn -} - -func NewShimClient(cc *grpc.ClientConn) ShimClient { - return &shimClient{cc} -} - -func (c *shimClient) State(ctx context.Context, in *StateRequest, opts ...grpc.CallOption) (*StateResponse, error) { - out := new(StateResponse) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/State", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) Create(ctx context.Context, in *CreateTaskRequest, opts ...grpc.CallOption) (*CreateTaskResponse, error) { - out := new(CreateTaskResponse) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/Create", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) Start(ctx context.Context, in *StartRequest, opts ...grpc.CallOption) (*StartResponse, error) { - out := new(StartResponse) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/Start", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) Delete(ctx context.Context, in *google_protobuf1.Empty, opts ...grpc.CallOption) (*DeleteResponse, error) { - out := new(DeleteResponse) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/Delete", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) DeleteProcess(ctx context.Context, in *DeleteProcessRequest, opts ...grpc.CallOption) (*DeleteResponse, error) { - out := new(DeleteResponse) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/DeleteProcess", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) ListPids(ctx context.Context, in *ListPidsRequest, opts ...grpc.CallOption) (*ListPidsResponse, error) { - out := new(ListPidsResponse) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/ListPids", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) Pause(ctx context.Context, in *google_protobuf1.Empty, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) { - out := new(google_protobuf1.Empty) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/Pause", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) Resume(ctx context.Context, in *google_protobuf1.Empty, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) { - out := new(google_protobuf1.Empty) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/Resume", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) Checkpoint(ctx context.Context, in *CheckpointTaskRequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) { - out := new(google_protobuf1.Empty) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/Checkpoint", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) Kill(ctx context.Context, in *KillRequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) { - out := new(google_protobuf1.Empty) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/Kill", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) Exec(ctx context.Context, in *ExecProcessRequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) { - out := new(google_protobuf1.Empty) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/Exec", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) ResizePty(ctx context.Context, in *ResizePtyRequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) { - out := new(google_protobuf1.Empty) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/ResizePty", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) CloseIO(ctx context.Context, in *CloseIORequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) { - out := new(google_protobuf1.Empty) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/CloseIO", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) ShimInfo(ctx context.Context, in *google_protobuf1.Empty, opts ...grpc.CallOption) (*ShimInfoResponse, error) { - out := new(ShimInfoResponse) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/ShimInfo", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) Update(ctx context.Context, in *UpdateTaskRequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error) { - out := new(google_protobuf1.Empty) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/Update", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *shimClient) Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*WaitResponse, error) { - out := new(WaitResponse) - err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/Wait", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// Server API for Shim service - -type ShimServer interface { - // State returns shim and task state information. - State(context.Context, *StateRequest) (*StateResponse, error) - Create(context.Context, *CreateTaskRequest) (*CreateTaskResponse, error) - Start(context.Context, *StartRequest) (*StartResponse, error) - Delete(context.Context, *google_protobuf1.Empty) (*DeleteResponse, error) - DeleteProcess(context.Context, *DeleteProcessRequest) (*DeleteResponse, error) - ListPids(context.Context, *ListPidsRequest) (*ListPidsResponse, error) - Pause(context.Context, *google_protobuf1.Empty) (*google_protobuf1.Empty, error) - Resume(context.Context, *google_protobuf1.Empty) (*google_protobuf1.Empty, error) - Checkpoint(context.Context, *CheckpointTaskRequest) (*google_protobuf1.Empty, error) - Kill(context.Context, *KillRequest) (*google_protobuf1.Empty, error) - Exec(context.Context, *ExecProcessRequest) (*google_protobuf1.Empty, error) - ResizePty(context.Context, *ResizePtyRequest) (*google_protobuf1.Empty, error) - CloseIO(context.Context, *CloseIORequest) (*google_protobuf1.Empty, error) - // ShimInfo returns information about the shim. - ShimInfo(context.Context, *google_protobuf1.Empty) (*ShimInfoResponse, error) - Update(context.Context, *UpdateTaskRequest) (*google_protobuf1.Empty, error) - Wait(context.Context, *WaitRequest) (*WaitResponse, error) -} - -func RegisterShimServer(s *grpc.Server, srv ShimServer) { - s.RegisterService(&_Shim_serviceDesc, srv) -} - -func _Shim_State_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(StateRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).State(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/State", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).State(ctx, req.(*StateRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(CreateTaskRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).Create(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/Create", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).Create(ctx, req.(*CreateTaskRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_Start_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(StartRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).Start(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/Start", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).Start(ctx, req.(*StartRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(google_protobuf1.Empty) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).Delete(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/Delete", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).Delete(ctx, req.(*google_protobuf1.Empty)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_DeleteProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(DeleteProcessRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).DeleteProcess(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/DeleteProcess", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).DeleteProcess(ctx, req.(*DeleteProcessRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_ListPids_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ListPidsRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).ListPids(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/ListPids", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).ListPids(ctx, req.(*ListPidsRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_Pause_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(google_protobuf1.Empty) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).Pause(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/Pause", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).Pause(ctx, req.(*google_protobuf1.Empty)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_Resume_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(google_protobuf1.Empty) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).Resume(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/Resume", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).Resume(ctx, req.(*google_protobuf1.Empty)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_Checkpoint_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(CheckpointTaskRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).Checkpoint(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/Checkpoint", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).Checkpoint(ctx, req.(*CheckpointTaskRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_Kill_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(KillRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).Kill(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/Kill", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).Kill(ctx, req.(*KillRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_Exec_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ExecProcessRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).Exec(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/Exec", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).Exec(ctx, req.(*ExecProcessRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_ResizePty_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ResizePtyRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).ResizePty(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/ResizePty", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).ResizePty(ctx, req.(*ResizePtyRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_CloseIO_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(CloseIORequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).CloseIO(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/CloseIO", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).CloseIO(ctx, req.(*CloseIORequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_ShimInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(google_protobuf1.Empty) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).ShimInfo(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/ShimInfo", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).ShimInfo(ctx, req.(*google_protobuf1.Empty)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_Update_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(UpdateTaskRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).Update(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/Update", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).Update(ctx, req.(*UpdateTaskRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Shim_Wait_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(WaitRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ShimServer).Wait(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/containerd.runtime.linux.shim.v1.Shim/Wait", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShimServer).Wait(ctx, req.(*WaitRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _Shim_serviceDesc = grpc.ServiceDesc{ - ServiceName: "containerd.runtime.linux.shim.v1.Shim", - HandlerType: (*ShimServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "State", - Handler: _Shim_State_Handler, - }, - { - MethodName: "Create", - Handler: _Shim_Create_Handler, - }, - { - MethodName: "Start", - Handler: _Shim_Start_Handler, - }, - { - MethodName: "Delete", - Handler: _Shim_Delete_Handler, - }, - { - MethodName: "DeleteProcess", - Handler: _Shim_DeleteProcess_Handler, - }, - { - MethodName: "ListPids", - Handler: _Shim_ListPids_Handler, - }, - { - MethodName: "Pause", - Handler: _Shim_Pause_Handler, - }, - { - MethodName: "Resume", - Handler: _Shim_Resume_Handler, - }, - { - MethodName: "Checkpoint", - Handler: _Shim_Checkpoint_Handler, - }, - { - MethodName: "Kill", - Handler: _Shim_Kill_Handler, - }, - { - MethodName: "Exec", - Handler: _Shim_Exec_Handler, - }, - { - MethodName: "ResizePty", - Handler: _Shim_ResizePty_Handler, - }, - { - MethodName: "CloseIO", - Handler: _Shim_CloseIO_Handler, - }, - { - MethodName: "ShimInfo", - Handler: _Shim_ShimInfo_Handler, - }, - { - MethodName: "Update", - Handler: _Shim_Update_Handler, - }, - { - MethodName: "Wait", - Handler: _Shim_Wait_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "github.com/containerd/containerd/linux/shim/v1/shim.proto", -} - func (m *CreateTaskRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -2151,6 +1577,280 @@ func valueToStringShim(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } + +type ShimService interface { + State(ctx context.Context, req *StateRequest) (*StateResponse, error) + Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error) + Start(ctx context.Context, req *StartRequest) (*StartResponse, error) + Delete(ctx context.Context, req *google_protobuf1.Empty) (*DeleteResponse, error) + DeleteProcess(ctx context.Context, req *DeleteProcessRequest) (*DeleteResponse, error) + ListPids(ctx context.Context, req *ListPidsRequest) (*ListPidsResponse, error) + Pause(ctx context.Context, req *google_protobuf1.Empty) (*google_protobuf1.Empty, error) + Resume(ctx context.Context, req *google_protobuf1.Empty) (*google_protobuf1.Empty, error) + Checkpoint(ctx context.Context, req *CheckpointTaskRequest) (*google_protobuf1.Empty, error) + Kill(ctx context.Context, req *KillRequest) (*google_protobuf1.Empty, error) + Exec(ctx context.Context, req *ExecProcessRequest) (*google_protobuf1.Empty, error) + ResizePty(ctx context.Context, req *ResizePtyRequest) (*google_protobuf1.Empty, error) + CloseIO(ctx context.Context, req *CloseIORequest) (*google_protobuf1.Empty, error) + ShimInfo(ctx context.Context, req *google_protobuf1.Empty) (*ShimInfoResponse, error) + Update(ctx context.Context, req *UpdateTaskRequest) (*google_protobuf1.Empty, error) + Wait(ctx context.Context, req *WaitRequest) (*WaitResponse, error) +} + +func RegisterShimService(srv *github_com_stevvooe_ttrpc.Server, svc ShimService) { + srv.Register("containerd.runtime.linux.shim.v1.Shim", map[string]github_com_stevvooe_ttrpc.Method{ + "State": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req StateRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.State(ctx, &req) + }, + "Create": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req CreateTaskRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Create(ctx, &req) + }, + "Start": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req StartRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Start(ctx, &req) + }, + "Delete": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req google_protobuf1.Empty + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Delete(ctx, &req) + }, + "DeleteProcess": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req DeleteProcessRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.DeleteProcess(ctx, &req) + }, + "ListPids": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req ListPidsRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.ListPids(ctx, &req) + }, + "Pause": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req google_protobuf1.Empty + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Pause(ctx, &req) + }, + "Resume": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req google_protobuf1.Empty + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Resume(ctx, &req) + }, + "Checkpoint": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req CheckpointTaskRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Checkpoint(ctx, &req) + }, + "Kill": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req KillRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Kill(ctx, &req) + }, + "Exec": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req ExecProcessRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Exec(ctx, &req) + }, + "ResizePty": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req ResizePtyRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.ResizePty(ctx, &req) + }, + "CloseIO": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req CloseIORequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.CloseIO(ctx, &req) + }, + "ShimInfo": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req google_protobuf1.Empty + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.ShimInfo(ctx, &req) + }, + "Update": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req UpdateTaskRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Update(ctx, &req) + }, + "Wait": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req WaitRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Wait(ctx, &req) + }, + }) +} + +type shimClient struct { + client *github_com_stevvooe_ttrpc.Client +} + +func NewShimClient(client *github_com_stevvooe_ttrpc.Client) ShimService { + return &shimClient{ + client: client, + } +} + +func (c *shimClient) State(ctx context.Context, req *StateRequest) (*StateResponse, error) { + var resp StateResponse + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "State", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error) { + var resp CreateTaskResponse + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Create", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) Start(ctx context.Context, req *StartRequest) (*StartResponse, error) { + var resp StartResponse + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Start", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) Delete(ctx context.Context, req *google_protobuf1.Empty) (*DeleteResponse, error) { + var resp DeleteResponse + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Delete", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) DeleteProcess(ctx context.Context, req *DeleteProcessRequest) (*DeleteResponse, error) { + var resp DeleteResponse + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "DeleteProcess", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) ListPids(ctx context.Context, req *ListPidsRequest) (*ListPidsResponse, error) { + var resp ListPidsResponse + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "ListPids", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) Pause(ctx context.Context, req *google_protobuf1.Empty) (*google_protobuf1.Empty, error) { + var resp google_protobuf1.Empty + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Pause", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) Resume(ctx context.Context, req *google_protobuf1.Empty) (*google_protobuf1.Empty, error) { + var resp google_protobuf1.Empty + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Resume", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) Checkpoint(ctx context.Context, req *CheckpointTaskRequest) (*google_protobuf1.Empty, error) { + var resp google_protobuf1.Empty + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Checkpoint", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) Kill(ctx context.Context, req *KillRequest) (*google_protobuf1.Empty, error) { + var resp google_protobuf1.Empty + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Kill", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) Exec(ctx context.Context, req *ExecProcessRequest) (*google_protobuf1.Empty, error) { + var resp google_protobuf1.Empty + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Exec", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) ResizePty(ctx context.Context, req *ResizePtyRequest) (*google_protobuf1.Empty, error) { + var resp google_protobuf1.Empty + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "ResizePty", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) CloseIO(ctx context.Context, req *CloseIORequest) (*google_protobuf1.Empty, error) { + var resp google_protobuf1.Empty + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "CloseIO", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) ShimInfo(ctx context.Context, req *google_protobuf1.Empty) (*ShimInfoResponse, error) { + var resp ShimInfoResponse + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "ShimInfo", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) Update(ctx context.Context, req *UpdateTaskRequest) (*google_protobuf1.Empty, error) { + var resp google_protobuf1.Empty + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Update", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) Wait(ctx context.Context, req *WaitRequest) (*WaitResponse, error) { + var resp WaitResponse + if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Wait", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} func (m *CreateTaskRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/vendor.conf b/vendor.conf index 6b8144421..29f3c8828 100644 --- a/vendor.conf +++ b/vendor.conf @@ -41,3 +41,4 @@ github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944 golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4 github.com/dmcgowan/go-tar 2e2c51242e8993c50445dab7c03c8e7febddd0cf +github.com/stevvooe/ttrpc bdb2ab7a8169e485e39421e666e15a505e575fd2 diff --git a/vendor/github.com/stevvooe/ttrpc/LICENSE b/vendor/github.com/stevvooe/ttrpc/LICENSE new file mode 100644 index 000000000..261eeb9e9 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/stevvooe/ttrpc/README.md b/vendor/github.com/stevvooe/ttrpc/README.md new file mode 100644 index 000000000..6e2159a84 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/README.md @@ -0,0 +1,50 @@ +# ttrpc + +GRPC for low-memory environments. + +The existing grpc-go project requires a lot of memory overhead for importing +packages and at runtime. While this is great for many services with low density +requirements, this can be a problem when running a large number of services on +a single machine or on a machine with a small amount of memory. + +Using the same GRPC definitions, this project reduces the binary size and +protocol overhead required. We do this by eliding the `net/http`, `net/http2` +and `grpc` package used by grpc replacing it with a lightweight framing +protocol. The result are smaller binaries that use less resident memory with +the same ease of use as GRPC. + +Please note that while this project supports generating either end of the +protocol, the generated service definitions will be incompatible with regular +GRPC services, as they do not speak the same protocol. + +# Usage + +Create a gogo vanity binary (see +[`cmd/protoc-gen-gogottrpc/main.go`](cmd/protoc-gen-gogottrpc/main.go) for an +example with the ttrpc plugin enabled. + +It's recommended to use [`protobuild`](https://github.com/stevvooe/protobuild) +to build the protobufs for this project, but this will work with protoc +directly, if required. + +# Differences from GRPC + +- The protocol stack has been replaced with a lighter protocol that doesn't + require http, http2 and tls. +- The client and server interface are identical whereas in GRPC there is a + client and server interface that are different. +- The Go stdlib context package is used instead. +- No support for streams yet. + +# Status + +Very new. YMMV. + +TODO: + +- [X] Plumb error codes and GRPC status +- [X] Remove use of any type and dependency on typeurl package +- [X] Ensure that protocol can support streaming in the future +- [ ] Document protocol layout +- [ ] Add testing under concurrent load to ensure +- [ ] Verify connection error handling diff --git a/vendor/github.com/stevvooe/ttrpc/channel.go b/vendor/github.com/stevvooe/ttrpc/channel.go new file mode 100644 index 000000000..a71260bcc --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/channel.go @@ -0,0 +1,99 @@ +package ttrpc + +import ( + "bufio" + "context" + "encoding/binary" + "io" + + "github.com/pkg/errors" +) + +const ( + messageHeaderLength = 10 + messageLengthMax = 8 << 10 +) + +type messageType uint8 + +const ( + messageTypeRequest messageType = 0x1 + messageTypeResponse messageType = 0x2 +) + +// messageHeader represents the fixed-length message header of 10 bytes sent +// with every request. +type messageHeader struct { + Length uint32 // length excluding this header. b[:4] + StreamID uint32 // identifies which request stream message is a part of. b[4:8] + Type messageType // message type b[8] + Flags uint8 // reserved b[9] +} + +func readMessageHeader(p []byte, r io.Reader) (messageHeader, error) { + _, err := io.ReadFull(r, p[:messageHeaderLength]) + if err != nil { + return messageHeader{}, err + } + + return messageHeader{ + Length: binary.BigEndian.Uint32(p[:4]), + StreamID: binary.BigEndian.Uint32(p[4:8]), + Type: messageType(p[8]), + Flags: p[9], + }, nil +} + +func writeMessageHeader(w io.Writer, p []byte, mh messageHeader) error { + binary.BigEndian.PutUint32(p[:4], mh.Length) + binary.BigEndian.PutUint32(p[4:8], mh.StreamID) + p[8] = byte(mh.Type) + p[9] = mh.Flags + + _, err := w.Write(p[:]) + return err +} + +type channel struct { + bw *bufio.Writer + br *bufio.Reader + hrbuf [messageHeaderLength]byte // avoid alloc when reading header + hwbuf [messageHeaderLength]byte +} + +func newChannel(w io.Writer, r io.Reader) *channel { + return &channel{ + bw: bufio.NewWriter(w), + br: bufio.NewReader(r), + } +} + +func (ch *channel) recv(ctx context.Context, p []byte) (messageHeader, error) { + mh, err := readMessageHeader(ch.hrbuf[:], ch.br) + if err != nil { + return messageHeader{}, err + } + + if mh.Length > uint32(len(p)) { + return messageHeader{}, errors.Wrapf(io.ErrShortBuffer, "message length %v over buffer size %v", mh.Length, len(p)) + } + + if _, err := io.ReadFull(ch.br, p[:mh.Length]); err != nil { + return messageHeader{}, errors.Wrapf(err, "failed reading message") + } + + return mh, nil +} + +func (ch *channel) send(ctx context.Context, streamID uint32, t messageType, p []byte) error { + if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t}); err != nil { + return err + } + + _, err := ch.bw.Write(p) + if err != nil { + return err + } + + return ch.bw.Flush() +} diff --git a/vendor/github.com/stevvooe/ttrpc/client.go b/vendor/github.com/stevvooe/ttrpc/client.go new file mode 100644 index 000000000..9dbb3d3b4 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/client.go @@ -0,0 +1,204 @@ +package ttrpc + +import ( + "context" + "net" + "sync" + "sync/atomic" + + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" + "google.golang.org/grpc/status" +) + +type Client struct { + codec codec + channel *channel + requestID uint32 + sendRequests chan sendRequest + recvRequests chan recvRequest + + closed chan struct{} + closeOnce sync.Once + done chan struct{} + err error +} + +func NewClient(conn net.Conn) *Client { + c := &Client{ + codec: codec{}, + requestID: 1, + channel: newChannel(conn, conn), + sendRequests: make(chan sendRequest), + recvRequests: make(chan recvRequest), + closed: make(chan struct{}), + done: make(chan struct{}), + } + + go c.run() + return c +} + +func (c *Client) Call(ctx context.Context, service, method string, req, resp interface{}) error { + payload, err := c.codec.Marshal(req) + if err != nil { + return err + } + + requestID := atomic.AddUint32(&c.requestID, 2) + request := Request{ + Service: service, + Method: method, + Payload: payload, + } + + if err := c.send(ctx, requestID, &request); err != nil { + return err + } + + var response Response + if err := c.recv(ctx, requestID, &response); err != nil { + return err + } + + if err := c.codec.Unmarshal(response.Payload, resp); err != nil { + return err + } + + if response.Status == nil { + return errors.New("no status provided on response") + } + + return status.ErrorProto(response.Status) +} + +func (c *Client) Close() error { + c.closeOnce.Do(func() { + close(c.closed) + }) + + return nil +} + +type sendRequest struct { + ctx context.Context + id uint32 + msg interface{} + err chan error +} + +func (c *Client) send(ctx context.Context, id uint32, msg interface{}) error { + errs := make(chan error, 1) + select { + case c.sendRequests <- sendRequest{ + ctx: ctx, + id: id, + msg: msg, + err: errs, + }: + case <-ctx.Done(): + return ctx.Err() + case <-c.done: + return c.err + } + + select { + case err := <-errs: + return err + case <-ctx.Done(): + return ctx.Err() + case <-c.done: + return c.err + } +} + +type recvRequest struct { + id uint32 + msg interface{} + err chan error +} + +func (c *Client) recv(ctx context.Context, id uint32, msg interface{}) error { + errs := make(chan error, 1) + select { + case c.recvRequests <- recvRequest{ + id: id, + msg: msg, + err: errs, + }: + case <-c.done: + return c.err + case <-ctx.Done(): + return ctx.Err() + } + + select { + case err := <-errs: + return err + case <-c.done: + return c.err + case <-ctx.Done(): + return ctx.Err() + } +} + +type received struct { + mh messageHeader + p []byte + err error +} + +func (c *Client) run() { + defer close(c.done) + var ( + waiters = map[uint32]recvRequest{} + queued = map[uint32]received{} // messages unmatched by waiter + incoming = make(chan received) + ) + + go func() { + // start one more goroutine to recv messages without blocking. + for { + var p [messageLengthMax]byte + mh, err := c.channel.recv(context.TODO(), p[:]) + select { + case incoming <- received{ + mh: mh, + p: p[:mh.Length], + err: err, + }: + case <-c.done: + return + } + } + }() + + for { + select { + case req := <-c.sendRequests: + if p, err := proto.Marshal(req.msg.(proto.Message)); err != nil { + req.err <- err + } else { + req.err <- c.channel.send(req.ctx, req.id, messageTypeRequest, p) + } + case req := <-c.recvRequests: + if r, ok := queued[req.id]; ok { + req.err <- proto.Unmarshal(r.p, req.msg.(proto.Message)) + } + waiters[req.id] = req + case r := <-incoming: + if r.err != nil { + c.err = r.err + return + } + + if waiter, ok := waiters[r.mh.StreamID]; ok { + waiter.err <- proto.Unmarshal(r.p, waiter.msg.(proto.Message)) + } else { + queued[r.mh.StreamID] = r + } + case <-c.closed: + return + } + } +} diff --git a/vendor/github.com/stevvooe/ttrpc/codec.go b/vendor/github.com/stevvooe/ttrpc/codec.go new file mode 100644 index 000000000..7956a7222 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/codec.go @@ -0,0 +1,26 @@ +package ttrpc + +import ( + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" +) + +type codec struct{} + +func (c codec) Marshal(msg interface{}) ([]byte, error) { + switch v := msg.(type) { + case proto.Message: + return proto.Marshal(v) + default: + return nil, errors.Errorf("ttrpc: cannot marshal unknown type: %T", msg) + } +} + +func (c codec) Unmarshal(p []byte, msg interface{}) error { + switch v := msg.(type) { + case proto.Message: + return proto.Unmarshal(p, v) + default: + return errors.Errorf("ttrpc: cannot unmarshal into unknown type: %T", msg) + } +} diff --git a/vendor/github.com/stevvooe/ttrpc/plugin/generator.go b/vendor/github.com/stevvooe/ttrpc/plugin/generator.go new file mode 100644 index 000000000..5603e57f4 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/plugin/generator.go @@ -0,0 +1,131 @@ +package plugin + +import ( + "strings" + + "github.com/gogo/protobuf/protoc-gen-gogo/descriptor" + "github.com/gogo/protobuf/protoc-gen-gogo/generator" +) + +type ttrpcGenerator struct { + *generator.Generator + generator.PluginImports + + typeurlPkg generator.Single + ttrpcPkg generator.Single + contextPkg generator.Single +} + +func init() { + generator.RegisterPlugin(new(ttrpcGenerator)) +} + +func (p *ttrpcGenerator) Name() string { + return "ttrpc" +} + +func (p *ttrpcGenerator) Init(g *generator.Generator) { + p.Generator = g +} + +func (p *ttrpcGenerator) Generate(file *generator.FileDescriptor) { + p.PluginImports = generator.NewPluginImports(p.Generator) + p.contextPkg = p.NewImport("context") + p.typeurlPkg = p.NewImport("github.com/containerd/typeurl") + p.ttrpcPkg = p.NewImport("github.com/stevvooe/ttrpc") + + for _, service := range file.GetService() { + serviceName := service.GetName() + if pkg := file.GetPackage(); pkg != "" { + serviceName = pkg + "." + serviceName + } + + p.genService(serviceName, service) + } +} + +func (p *ttrpcGenerator) genService(fullName string, service *descriptor.ServiceDescriptorProto) { + serviceName := service.GetName() + "Service" + p.P() + p.P("type ", serviceName, " interface{") + p.In() + for _, method := range service.Method { + p.P(method.GetName(), + "(ctx ", p.contextPkg.Use(), ".Context, ", + "req *", p.typeName(method.GetInputType()), ") ", + "(*", p.typeName(method.GetOutputType()), ", error)") + + } + p.Out() + p.P("}") + + p.P() + // registration method + p.P("func Register", serviceName, "(srv *", p.ttrpcPkg.Use(), ".Server, svc ", serviceName, ") {") + p.In() + p.P(`srv.Register("`, fullName, `", map[string]`, p.ttrpcPkg.Use(), ".Method{") + p.In() + for _, method := range service.Method { + p.P(`"`, method.GetName(), `": `, `func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {`) + p.In() + p.P("var req ", p.typeName(method.GetInputType())) + p.P(`if err := unmarshal(&req); err != nil {`) + p.In() + p.P(`return nil, err`) + p.Out() + p.P(`}`) + p.P("return svc.", method.GetName(), "(ctx, &req)") + p.Out() + p.P("},") + } + p.Out() + p.P("})") + p.Out() + p.P("}") + + clientType := service.GetName() + "Client" + clientStructType := strings.ToLower(clientType[:1]) + clientType[1:] + p.P() + p.P("type ", clientStructType, " struct{") + p.In() + p.P("client *", p.ttrpcPkg.Use(), ".Client") + p.Out() + p.P("}") + p.P() + p.P("func New", clientType, "(client *", p.ttrpcPkg.Use(), ".Client)", serviceName, "{") + p.In() + p.P("return &", clientStructType, "{") + p.In() + p.P("client: client,") + p.Out() + p.P("}") + p.Out() + p.P("}") + p.P() + for _, method := range service.Method { + p.P() + p.P("func (c *", clientStructType, ") ", method.GetName(), + "(ctx ", p.contextPkg.Use(), ".Context, ", + "req *", p.typeName(method.GetInputType()), ") ", + "(*", p.typeName(method.GetOutputType()), ", error) {") + p.In() + p.P("var resp ", p.typeName(method.GetOutputType())) + p.P("if err := c.client.Call(ctx, ", `"`+fullName+`", `, `"`+method.GetName()+`"`, ", req, &resp); err != nil {") + p.In() + p.P("return nil, err") + p.Out() + p.P("}") + p.P("return &resp, nil") + p.Out() + p.P("}") + } +} + +func (p *ttrpcGenerator) objectNamed(name string) generator.Object { + p.Generator.RecordTypeUse(name) + return p.Generator.ObjectNamed(name) +} + +func (p *ttrpcGenerator) typeName(str string) string { + return p.Generator.TypeName(p.objectNamed(str)) +} diff --git a/vendor/github.com/stevvooe/ttrpc/server.go b/vendor/github.com/stevvooe/ttrpc/server.go new file mode 100644 index 000000000..407068fc3 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/server.go @@ -0,0 +1,155 @@ +package ttrpc + +import ( + "context" + "net" + + "github.com/containerd/containerd/log" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type Server struct { + services *serviceSet + codec codec +} + +func NewServer() *Server { + return &Server{ + services: newServiceSet(), + } +} + +func (s *Server) Register(name string, methods map[string]Method) { + s.services.register(name, methods) +} + +func (s *Server) Shutdown(ctx context.Context) error { + // TODO(stevvooe): Wait on connection shutdown. + return nil +} + +func (s *Server) Serve(l net.Listener) error { + for { + conn, err := l.Accept() + if err != nil { + log.L.WithError(err).Error("failed accept") + continue + } + + go s.handleConn(conn) + } + + return nil +} + +func (s *Server) handleConn(conn net.Conn) { + defer conn.Close() + + type ( + request struct { + id uint32 + req *Request + } + + response struct { + id uint32 + resp *Response + } + ) + + var ( + ch = newChannel(conn, conn) + ctx, cancel = context.WithCancel(context.Background()) + responses = make(chan response) + requests = make(chan request) + recvErr = make(chan error, 1) + done = make(chan struct{}) + ) + + defer cancel() + defer close(done) + + go func() { + defer close(recvErr) + var p [messageLengthMax]byte + for { + mh, err := ch.recv(ctx, p[:]) + if err != nil { + recvErr <- err + return + } + + if mh.Type != messageTypeRequest { + // we must ignore this for future compat. + continue + } + + var req Request + if err := s.codec.Unmarshal(p[:mh.Length], &req); err != nil { + recvErr <- err + return + } + + if mh.StreamID%2 != 1 { + // enforce odd client initiated identifiers. + select { + case responses <- response{ + // even though we've had an invalid stream id, we send it + // back on the same stream id so the client knows which + // stream id was bad. + id: mh.StreamID, + resp: &Response{ + Status: status.New(codes.InvalidArgument, "StreamID must be odd for client initiated streams").Proto(), + }, + }: + case <-done: + } + + continue + } + + select { + case requests <- request{ + id: mh.StreamID, + req: &req, + }: + case <-done: + } + } + }() + + for { + select { + case request := <-requests: + go func(id uint32) { + p, status := s.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload) + resp := &Response{ + Status: status.Proto(), + Payload: p, + } + + select { + case responses <- response{ + id: id, + resp: resp, + }: + case <-done: + } + }(request.id) + case response := <-responses: + p, err := s.codec.Marshal(response.resp) + if err != nil { + log.L.WithError(err).Error("failed marshaling response") + return + } + if err := ch.send(ctx, response.id, messageTypeResponse, p); err != nil { + log.L.WithError(err).Error("failed sending message on channel") + return + } + case err := <-recvErr: + log.L.WithError(err).Error("error receiving message") + return + } + } +} diff --git a/vendor/github.com/stevvooe/ttrpc/services.go b/vendor/github.com/stevvooe/ttrpc/services.go new file mode 100644 index 000000000..b9a749e3d --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/services.go @@ -0,0 +1,134 @@ +package ttrpc + +import ( + "context" + "io" + "os" + "path" + + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type Method func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) + +type ServiceDesc struct { + Methods map[string]Method + + // TODO(stevvooe): Add stream support. +} + +type serviceSet struct { + services map[string]ServiceDesc +} + +func newServiceSet() *serviceSet { + return &serviceSet{ + services: make(map[string]ServiceDesc), + } +} + +func (s *serviceSet) register(name string, methods map[string]Method) { + if _, ok := s.services[name]; ok { + panic(errors.Errorf("duplicate service %v registered", name)) + } + + s.services[name] = ServiceDesc{ + Methods: methods, + } +} + +func (s *serviceSet) call(ctx context.Context, serviceName, methodName string, p []byte) ([]byte, *status.Status) { + p, err := s.dispatch(ctx, serviceName, methodName, p) + st, ok := status.FromError(err) + if !ok { + st = status.New(convertCode(err), err.Error()) + } + + return p, st +} + +func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName string, p []byte) ([]byte, error) { + method, err := s.resolve(serviceName, methodName) + if err != nil { + return nil, err + } + + unmarshal := func(obj interface{}) error { + switch v := obj.(type) { + case proto.Message: + if err := proto.Unmarshal(p, v); err != nil { + return status.Errorf(codes.Internal, "ttrpc: error unmarshaling payload: %v", err.Error()) + } + default: + return status.Errorf(codes.Internal, "ttrpc: error unsupported request type: %T", v) + } + return nil + } + + resp, err := method(ctx, unmarshal) + if err != nil { + return nil, err + } + + switch v := resp.(type) { + case proto.Message: + r, err := proto.Marshal(v) + if err != nil { + return nil, status.Errorf(codes.Internal, "ttrpc: error marshaling payload: %v", err.Error()) + } + + return r, nil + default: + return nil, status.Errorf(codes.Internal, "ttrpc: error unsupported response type: %T", v) + } +} + +func (s *serviceSet) resolve(service, method string) (Method, error) { + srv, ok := s.services[service] + if !ok { + return nil, status.Errorf(codes.NotFound, "service %v", service) + } + + mthd, ok := srv.Methods[method] + if !ok { + return nil, status.Errorf(codes.NotFound, "method %v", method) + } + + return mthd, nil +} + +// convertCode maps stdlib go errors into grpc space. +// +// This is ripped from the grpc-go code base. +func convertCode(err error) codes.Code { + switch err { + case nil: + return codes.OK + case io.EOF: + return codes.OutOfRange + case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF: + return codes.FailedPrecondition + case os.ErrInvalid: + return codes.InvalidArgument + case context.Canceled: + return codes.Canceled + case context.DeadlineExceeded: + return codes.DeadlineExceeded + } + switch { + case os.IsExist(err): + return codes.AlreadyExists + case os.IsNotExist(err): + return codes.NotFound + case os.IsPermission(err): + return codes.PermissionDenied + } + return codes.Unknown +} + +func fullPath(service, method string) string { + return "/" + path.Join("/", service, method) +} diff --git a/vendor/github.com/stevvooe/ttrpc/types.go b/vendor/github.com/stevvooe/ttrpc/types.go new file mode 100644 index 000000000..a522b0cf2 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/types.go @@ -0,0 +1,26 @@ +package ttrpc + +import ( + "fmt" + + spb "google.golang.org/genproto/googleapis/rpc/status" +) + +type Request struct { + Service string `protobuf:"bytes,1,opt,name=service,proto3"` + Method string `protobuf:"bytes,2,opt,name=method,proto3"` + Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"` +} + +func (r *Request) Reset() { *r = Request{} } +func (r *Request) String() string { return fmt.Sprintf("%+#v", r) } +func (r *Request) ProtoMessage() {} + +type Response struct { + Status *spb.Status `protobuf:"bytes,1,opt,name=status,proto3"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3"` +} + +func (r *Response) Reset() { *r = Response{} } +func (r *Response) String() string { return fmt.Sprintf("%+#v", r) } +func (r *Response) ProtoMessage() {}