Fix spec generation for task execution
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
923236004a
commit
bf9ad0c57f
57
client.go
57
client.go
@ -6,6 +6,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd/api/services/containers"
|
"github.com/containerd/containerd/api/services/containers"
|
||||||
@ -34,30 +35,43 @@ import (
|
|||||||
"google.golang.org/grpc/grpclog"
|
"google.golang.org/grpc/grpclog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// New returns a new containerd client that is connected to the containerd
|
func init() {
|
||||||
// instance provided by address
|
|
||||||
func New(address string) (*Client, error) {
|
|
||||||
// reset the grpc logger so that it does not output in the STDIO of the calling process
|
// reset the grpc logger so that it does not output in the STDIO of the calling process
|
||||||
grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
|
grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
|
||||||
|
}
|
||||||
|
|
||||||
opts := []grpc.DialOption{
|
type NewClientOpts func(c *Client) error
|
||||||
|
|
||||||
|
// New returns a new containerd client that is connected to the containerd
|
||||||
|
// instance provided by address
|
||||||
|
func New(address string, opts ...NewClientOpts) (*Client, error) {
|
||||||
|
gopts := []grpc.DialOption{
|
||||||
grpc.WithInsecure(),
|
grpc.WithInsecure(),
|
||||||
grpc.WithTimeout(100 * time.Second),
|
grpc.WithTimeout(100 * time.Second),
|
||||||
grpc.WithDialer(dialer),
|
grpc.WithDialer(dialer),
|
||||||
}
|
}
|
||||||
conn, err := grpc.Dial(dialAddress(address), opts...)
|
conn, err := grpc.Dial(dialAddress(address), gopts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "failed to dial %q", address)
|
return nil, errors.Wrapf(err, "failed to dial %q", address)
|
||||||
}
|
}
|
||||||
return &Client{
|
c := &Client{
|
||||||
conn: conn,
|
conn: conn,
|
||||||
}, nil
|
Runtime: runtime.GOOS,
|
||||||
|
}
|
||||||
|
for _, o := range opts {
|
||||||
|
if err := o(c); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Client is the client to interact with containerd and its various services
|
// Client is the client to interact with containerd and its various services
|
||||||
// using a uniform interface
|
// using a uniform interface
|
||||||
type Client struct {
|
type Client struct {
|
||||||
conn *grpc.ClientConn
|
conn *grpc.ClientConn
|
||||||
|
|
||||||
|
Runtime string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Containers returns all containers created in containerd
|
// Containers returns all containers created in containerd
|
||||||
@ -97,9 +111,9 @@ func NewContainerWithExistingRootFS(id string) NewContainerOpts {
|
|||||||
|
|
||||||
// NewContainerWithNewRootFS allocates a new snapshot to be used by the container as the
|
// NewContainerWithNewRootFS allocates a new snapshot to be used by the container as the
|
||||||
// root filesystem in read-write mode
|
// root filesystem in read-write mode
|
||||||
func NewContainerWithNewRootFS(id string, image v1.Descriptor) NewContainerOpts {
|
func NewContainerWithNewRootFS(id string, image *Image) NewContainerOpts {
|
||||||
return func(ctx context.Context, client *Client, c *containers.Container) error {
|
return func(ctx context.Context, client *Client, c *containers.Container) error {
|
||||||
diffIDs, err := images.RootFS(ctx, client.content(), image)
|
diffIDs, err := image.i.RootFS(ctx, client.content())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -113,9 +127,9 @@ func NewContainerWithNewRootFS(id string, image v1.Descriptor) NewContainerOpts
|
|||||||
|
|
||||||
// NewContainerWithNewReadonlyRootFS allocates a new snapshot to be used by the container as the
|
// NewContainerWithNewReadonlyRootFS allocates a new snapshot to be used by the container as the
|
||||||
// root filesystem in read-only mode
|
// root filesystem in read-only mode
|
||||||
func NewContainerWithNewReadonlyRootFS(id string, image v1.Descriptor) NewContainerOpts {
|
func NewContainerWithNewReadonlyRootFS(id string, image *Image) NewContainerOpts {
|
||||||
return func(ctx context.Context, client *Client, c *containers.Container) error {
|
return func(ctx context.Context, client *Client, c *containers.Container) error {
|
||||||
diffIDs, err := images.RootFS(ctx, client.content(), image)
|
diffIDs, err := image.i.RootFS(ctx, client.content())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -127,6 +141,13 @@ func NewContainerWithNewReadonlyRootFS(id string, image v1.Descriptor) NewContai
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewContainerWithRuntime(name string) NewContainerOpts {
|
||||||
|
return func(ctx context.Context, client *Client, c *containers.Container) error {
|
||||||
|
c.Runtime = name
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewContainer will create a new container in container with the provided id
|
// NewContainer will create a new container in container with the provided id
|
||||||
// the id must be unique within the namespace
|
// the id must be unique within the namespace
|
||||||
func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, opts ...NewContainerOpts) (*Container, error) {
|
func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, opts ...NewContainerOpts) (*Container, error) {
|
||||||
@ -135,7 +156,8 @@ func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
container := containers.Container{
|
container := containers.Container{
|
||||||
ID: id,
|
ID: id,
|
||||||
|
Runtime: c.Runtime,
|
||||||
Spec: &protobuf.Any{
|
Spec: &protobuf.Any{
|
||||||
TypeUrl: specs.Version,
|
TypeUrl: specs.Version,
|
||||||
Value: data,
|
Value: data,
|
||||||
@ -237,7 +259,11 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (*Image
|
|||||||
}
|
}
|
||||||
store := c.content()
|
store := c.content()
|
||||||
|
|
||||||
name, desc, fetcher, err := pullCtx.Resolver.Resolve(ctx, ref)
|
name, desc, err := pullCtx.Resolver.Resolve(ctx, ref)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
fetcher, err := pullCtx.Resolver.Fetcher(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -263,7 +289,8 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...PullOpts) (*Image
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &Image{
|
return &Image{
|
||||||
i: i,
|
client: c,
|
||||||
|
i: i,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,9 +43,13 @@ func (c *Container) Spec() (*specs.Spec, error) {
|
|||||||
func (c *Container) Delete(ctx context.Context) error {
|
func (c *Container) Delete(ctx context.Context) error {
|
||||||
// TODO: should the client be the one removing resources attached
|
// TODO: should the client be the one removing resources attached
|
||||||
// to the container at the moment before we have GC?
|
// to the container at the moment before we have GC?
|
||||||
_, err := c.client.containers().Delete(ctx, &containers.DeleteContainerRequest{
|
err := c.client.snapshotter().Remove(ctx, c.c.RootFS)
|
||||||
|
|
||||||
|
if _, cerr := c.client.containers().Delete(ctx, &containers.DeleteContainerRequest{
|
||||||
ID: c.c.ID,
|
ID: c.c.ID,
|
||||||
})
|
}); err == nil {
|
||||||
|
err = cerr
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
2
image.go
2
image.go
@ -3,5 +3,7 @@ package containerd
|
|||||||
import "github.com/containerd/containerd/images"
|
import "github.com/containerd/containerd/images"
|
||||||
|
|
||||||
type Image struct {
|
type Image struct {
|
||||||
|
client *Client
|
||||||
|
|
||||||
i images.Image
|
i images.Image
|
||||||
}
|
}
|
||||||
|
11
spec.go
11
spec.go
@ -14,13 +14,6 @@ func WithImageRef(ref string) SpecOpts {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithHostname(id string) SpecOpts {
|
|
||||||
return func(s *specs.Spec) error {
|
|
||||||
s.Hostname = id
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithArgs(args ...string) SpecOpts {
|
func WithArgs(args ...string) SpecOpts {
|
||||||
return func(s *specs.Spec) error {
|
return func(s *specs.Spec) error {
|
||||||
s.Process.Args = args
|
s.Process.Args = args
|
||||||
@ -30,8 +23,8 @@ func WithArgs(args ...string) SpecOpts {
|
|||||||
|
|
||||||
// GenerateSpec will generate a default spec from the provided image
|
// GenerateSpec will generate a default spec from the provided image
|
||||||
// for use as a containerd container
|
// for use as a containerd container
|
||||||
func GenerateSpec(opts ...SpecOpts) (*specs.Spec, error) {
|
func GenerateSpec(id string, opts ...SpecOpts) (*specs.Spec, error) {
|
||||||
s, err := createDefaultSpec()
|
s, err := createDefaultSpec(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
34
spec_unix.go
34
spec_unix.go
@ -1,11 +1,14 @@
|
|||||||
package containerd
|
package containerd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/images"
|
||||||
"github.com/opencontainers/image-spec/specs-go/v1"
|
"github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||||
)
|
)
|
||||||
@ -54,7 +57,7 @@ func defaultNamespaces() []specs.LinuxNamespace {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createDefaultSpec() (*specs.Spec, error) {
|
func createDefaultSpec(id string) (*specs.Spec, error) {
|
||||||
s := &specs.Spec{
|
s := &specs.Spec{
|
||||||
Version: specs.Version,
|
Version: specs.Version,
|
||||||
Platform: specs.Platform{
|
Platform: specs.Platform{
|
||||||
@ -64,6 +67,7 @@ func createDefaultSpec() (*specs.Spec, error) {
|
|||||||
Root: specs.Root{
|
Root: specs.Root{
|
||||||
Path: defaultRootfsPath,
|
Path: defaultRootfsPath,
|
||||||
},
|
},
|
||||||
|
Hostname: id,
|
||||||
Process: specs.Process{
|
Process: specs.Process{
|
||||||
Cwd: "/",
|
Cwd: "/",
|
||||||
NoNewPrivileges: true,
|
NoNewPrivileges: true,
|
||||||
@ -180,16 +184,40 @@ func WithHostNamespace(ns specs.LinuxNamespaceType) SpecOpts {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithImage(config *v1.ImageConfig) SpecOpts {
|
func WithImage(ctx context.Context, image *Image) SpecOpts {
|
||||||
return func(s *specs.Spec) error {
|
return func(s *specs.Spec) error {
|
||||||
|
store := image.client.content()
|
||||||
|
ic, err := image.i.Config(ctx, store)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
ociimage v1.Image
|
||||||
|
config v1.ImageConfig
|
||||||
|
)
|
||||||
|
switch ic.MediaType {
|
||||||
|
case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config:
|
||||||
|
r, err := store.Reader(ctx, ic.Digest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(r).Decode(&ociimage); err != nil {
|
||||||
|
r.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.Close()
|
||||||
|
config = ociimage.Config
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown image config media type %s", ic.MediaType)
|
||||||
|
}
|
||||||
env := []string{
|
env := []string{
|
||||||
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
|
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
|
||||||
}
|
}
|
||||||
s.Process.Env = append(env, config.Env...)
|
s.Process.Env = append(env, config.Env...)
|
||||||
cmd := config.Cmd
|
|
||||||
var (
|
var (
|
||||||
uid, gid uint32
|
uid, gid uint32
|
||||||
)
|
)
|
||||||
|
cmd := config.Cmd
|
||||||
s.Process.Args = append(config.Entrypoint, cmd...)
|
s.Process.Args = append(config.Entrypoint, cmd...)
|
||||||
if config.User != "" {
|
if config.User != "" {
|
||||||
parts := strings.Split(config.User, ":")
|
parts := strings.Split(config.User, ":")
|
||||||
|
52
task.go
52
task.go
@ -10,6 +10,7 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/containerd/containerd/api/services/execution"
|
"github.com/containerd/containerd/api/services/execution"
|
||||||
|
"github.com/containerd/containerd/api/types/task"
|
||||||
"github.com/containerd/fifo"
|
"github.com/containerd/fifo"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -31,9 +32,9 @@ func (i *IO) Close() error {
|
|||||||
|
|
||||||
type IOCreation func() (*IO, error)
|
type IOCreation func() (*IO, error)
|
||||||
|
|
||||||
// STDIO returns an IO implementation to be used for a task
|
// Stdio returns an IO implementation to be used for a task
|
||||||
// that outputs the container's IO as the current processes STDIO
|
// that outputs the container's IO as the current processes Stdio
|
||||||
func STDIO() (*IO, error) {
|
func Stdio() (*IO, error) {
|
||||||
paths, err := fifoPaths()
|
paths, err := fifoPaths()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -57,7 +58,11 @@ func STDIO() (*IO, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func fifoPaths() (*fifoSet, error) {
|
func fifoPaths() (*fifoSet, error) {
|
||||||
dir, err := ioutil.TempDir(filepath.Join(os.TempDir(), "containerd"), "")
|
root := filepath.Join(os.TempDir(), "containerd")
|
||||||
|
if err := os.MkdirAll(root, 0700); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
dir, err := ioutil.TempDir(root, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -135,16 +140,21 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error
|
|||||||
}(f)
|
}(f)
|
||||||
}
|
}
|
||||||
return &wgCloser{
|
return &wgCloser{
|
||||||
wg: wg,
|
wg: wg,
|
||||||
|
dir: fifos.dir,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type wgCloser struct {
|
type wgCloser struct {
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
|
dir string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *wgCloser) Close() error {
|
func (g *wgCloser) Close() error {
|
||||||
g.wg.Wait()
|
g.wg.Wait()
|
||||||
|
if g.dir != "" {
|
||||||
|
return os.RemoveAll(g.dir)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -161,8 +171,16 @@ func (t *Task) Pid() uint32 {
|
|||||||
return t.pid
|
return t.pid
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) Kill(ctx context.Context, s os.Signal) error {
|
func (t *Task) Start(ctx context.Context) error {
|
||||||
|
_, err := t.client.tasks().Start(ctx, &execution.StartRequest{
|
||||||
|
ContainerID: t.containerID,
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Task) Kill(ctx context.Context, s syscall.Signal) error {
|
||||||
_, err := t.client.tasks().Kill(ctx, &execution.KillRequest{
|
_, err := t.client.tasks().Kill(ctx, &execution.KillRequest{
|
||||||
|
Signal: uint32(s),
|
||||||
ContainerID: t.containerID,
|
ContainerID: t.containerID,
|
||||||
PidOrAll: &execution.KillRequest_All{
|
PidOrAll: &execution.KillRequest_All{
|
||||||
All: true,
|
All: true,
|
||||||
@ -171,6 +189,26 @@ func (t *Task) Kill(ctx context.Context, s os.Signal) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait is a blocking call that will wait for the task to exit and return the exit status
|
||||||
|
func (t *Task) Wait(ctx context.Context) (uint32, error) {
|
||||||
|
events, err := t.client.tasks().Events(ctx, &execution.EventsRequest{})
|
||||||
|
if err != nil {
|
||||||
|
return 255, err
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
e, err := events.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return 255, err
|
||||||
|
}
|
||||||
|
if e.Type != task.Event_EXIT {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if e.ID == t.containerID && e.Pid == t.pid {
|
||||||
|
return e.ExitStatus, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Delete deletes the task and its runtime state
|
// Delete deletes the task and its runtime state
|
||||||
// it returns the exit status of the task and any errors that were encountered
|
// it returns the exit status of the task and any errors that were encountered
|
||||||
// during cleanup
|
// during cleanup
|
||||||
|
Loading…
Reference in New Issue
Block a user