Add checkpoint and restore to client package
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
f105db9626
commit
a8c5542ba8
13
.travis.yml
13
.travis.yml
@ -18,6 +18,15 @@ addons:
|
||||
- btrfs-tools
|
||||
- libseccomp-dev
|
||||
- libapparmor-dev
|
||||
- libnl-3-dev
|
||||
- libnet-dev
|
||||
- protobuf-c-compiler
|
||||
- protobuf-compiler
|
||||
- python-minimal
|
||||
- libcap-dev
|
||||
- libaio-dev
|
||||
- libprotobuf-c0-dev
|
||||
- libprotobuf-dev
|
||||
|
||||
env:
|
||||
- TRAVIS_GOOS=windows TRAVIS_CGO_ENABLED=1
|
||||
@ -31,6 +40,10 @@ install:
|
||||
- export PATH=$PATH:/tmp/protobuf/bin/
|
||||
- go get -u github.com/vbatts/git-validation
|
||||
- sudo wget https://github.com/crosbymichael/runc/releases/download/ctd-1/runc -O /bin/runc; sudo chmod +x /bin/runc
|
||||
- wget https://github.com/xemul/criu/archive/v3.0.tar.gz -O /tmp/criu.tar.gz
|
||||
- tar -C /tmp/ -zxf /tmp/criu.tar.gz
|
||||
- cd /tmp/criu-3.0 && sudo make install-criu
|
||||
- cd $TRAVIS_BUILD_DIR
|
||||
|
||||
script:
|
||||
- export GOOS=$TRAVIS_GOOS
|
||||
|
193
checkpoint_test.go
Normal file
193
checkpoint_test.go
Normal file
@ -0,0 +1,193 @@
|
||||
package containerd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"syscall"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCheckpointRestore(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip()
|
||||
}
|
||||
client, err := New(address)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
var (
|
||||
ctx = context.Background()
|
||||
id = "CheckpointRestore"
|
||||
)
|
||||
image, err := client.GetImage(ctx, testImage)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
spec, err := GenerateSpec(WithImageConfig(ctx, image), WithProcessArgs("sleep", "100"))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
container, err := client.NewContainer(ctx, id, WithSpec(spec), WithImage(image), WithNewRootFS(id, image))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer container.Delete(ctx)
|
||||
|
||||
task, err := container.NewTask(ctx, empty())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer task.Delete(ctx)
|
||||
|
||||
statusC := make(chan uint32, 1)
|
||||
go func() {
|
||||
status, err := task.Wait(ctx)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
statusC <- status
|
||||
}()
|
||||
|
||||
if err := task.Start(ctx); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
checkpoint, err := task.Checkpoint(ctx, WithExit)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
<-statusC
|
||||
|
||||
if _, err := task.Delete(ctx); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if task, err = container.NewTask(ctx, empty(), WithTaskCheckpoint(checkpoint)); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer task.Delete(ctx)
|
||||
|
||||
go func() {
|
||||
status, err := task.Wait(ctx)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
statusC <- status
|
||||
}()
|
||||
|
||||
if err := task.Start(ctx); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
<-statusC
|
||||
}
|
||||
|
||||
func TestCheckpointRestoreNewContainer(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip()
|
||||
}
|
||||
client, err := New(address)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
const id = "CheckpointRestoreNewContainer"
|
||||
ctx := context.Background()
|
||||
image, err := client.GetImage(ctx, testImage)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
spec, err := GenerateSpec(WithImageConfig(ctx, image), WithProcessArgs("sleep", "100"))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
container, err := client.NewContainer(ctx, id, WithSpec(spec), WithImage(image), WithNewRootFS(id, image))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer container.Delete(ctx)
|
||||
|
||||
task, err := container.NewTask(ctx, empty())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer task.Delete(ctx)
|
||||
|
||||
statusC := make(chan uint32, 1)
|
||||
go func() {
|
||||
status, err := task.Wait(ctx)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
statusC <- status
|
||||
}()
|
||||
|
||||
if err := task.Start(ctx); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
checkpoint, err := task.Checkpoint(ctx, WithExit)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
<-statusC
|
||||
|
||||
if err := container.Delete(ctx); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if _, err := task.Delete(ctx); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if container, err = client.NewContainer(ctx, id, WithCheckpoint(checkpoint, id)); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if task, err = container.NewTask(ctx, empty(), WithTaskCheckpoint(checkpoint)); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer task.Delete(ctx)
|
||||
|
||||
go func() {
|
||||
status, err := task.Wait(ctx)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
statusC <- status
|
||||
}()
|
||||
|
||||
if err := task.Start(ctx); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
<-statusC
|
||||
}
|
12
client.go
12
client.go
@ -27,11 +27,9 @@ import (
|
||||
imagesservice "github.com/containerd/containerd/services/images"
|
||||
snapshotservice "github.com/containerd/containerd/services/snapshot"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
protobuf "github.com/gogo/protobuf/types"
|
||||
"github.com/opencontainers/image-spec/identity"
|
||||
"github.com/opencontainers/image-spec/specs-go/v1"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
@ -176,18 +174,10 @@ func WithImage(i Image) NewContainerOpts {
|
||||
|
||||
// NewContainer will create a new container in container with the provided id
|
||||
// the id must be unique within the namespace
|
||||
func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, opts ...NewContainerOpts) (Container, error) {
|
||||
data, err := json.Marshal(spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
|
||||
container := containers.Container{
|
||||
ID: id,
|
||||
Runtime: c.runtime,
|
||||
Spec: &protobuf.Any{
|
||||
TypeUrl: specs.Version,
|
||||
Value: data,
|
||||
},
|
||||
}
|
||||
for _, o := range opts {
|
||||
if err := o(ctx, c, &container); err != nil {
|
||||
|
@ -18,10 +18,14 @@ const (
|
||||
testImage = "docker.io/library/alpine:latest"
|
||||
)
|
||||
|
||||
var address string
|
||||
var (
|
||||
address string
|
||||
noDaemon bool
|
||||
)
|
||||
|
||||
func init() {
|
||||
flag.StringVar(&address, "address", "/run/containerd/containerd.sock", "The address to the containerd socket for use in the tests")
|
||||
flag.BoolVar(&noDaemon, "no-daemon", false, "Do not start a dedicated daemon for the tests")
|
||||
flag.Parse()
|
||||
}
|
||||
|
||||
@ -29,17 +33,23 @@ func TestMain(m *testing.M) {
|
||||
if testing.Short() {
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
// setup a new containerd daemon if !testing.Short
|
||||
cmd := exec.Command("containerd",
|
||||
"--root", defaultRoot,
|
||||
"--state", defaultState,
|
||||
var (
|
||||
cmd *exec.Cmd
|
||||
buf = bytes.NewBuffer(nil)
|
||||
)
|
||||
buf := bytes.NewBuffer(nil)
|
||||
cmd.Stderr = buf
|
||||
if err := cmd.Start(); err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
if !noDaemon {
|
||||
// setup a new containerd daemon if !testing.Short
|
||||
cmd = exec.Command("containerd",
|
||||
"--root", defaultRoot,
|
||||
"--state", defaultState,
|
||||
)
|
||||
cmd.Stderr = buf
|
||||
if err := cmd.Start(); err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
client, err := New(address)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
@ -62,20 +72,22 @@ func TestMain(m *testing.M) {
|
||||
// run the test
|
||||
status := m.Run()
|
||||
|
||||
// tear down the daemon and resources created
|
||||
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
}
|
||||
if _, err := cmd.Process.Wait(); err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
}
|
||||
if err := os.RemoveAll(defaultRoot); err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
// only print containerd logs if the test failed
|
||||
if status != 0 {
|
||||
fmt.Fprintln(os.Stderr, buf.String())
|
||||
if !noDaemon {
|
||||
// tear down the daemon and resources created
|
||||
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
}
|
||||
if _, err := cmd.Process.Wait(); err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
}
|
||||
if err := os.RemoveAll(defaultRoot); err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
// only print containerd logs if the test failed
|
||||
if status != 0 {
|
||||
fmt.Fprintln(os.Stderr, buf.String())
|
||||
}
|
||||
}
|
||||
os.Exit(status)
|
||||
}
|
||||
|
29
container.go
29
container.go
@ -14,7 +14,7 @@ import (
|
||||
type Container interface {
|
||||
ID() string
|
||||
Delete(context.Context) error
|
||||
NewTask(context.Context, IOCreation) (Task, error)
|
||||
NewTask(context.Context, IOCreation, ...NewTaskOpts) (Task, error)
|
||||
Spec() (*specs.Spec, error)
|
||||
Task() Task
|
||||
LoadTask(context.Context, IOAttach) (Task, error)
|
||||
@ -71,7 +71,9 @@ func (c *container) Task() Task {
|
||||
return c.task
|
||||
}
|
||||
|
||||
func (c *container) NewTask(ctx context.Context, ioCreate IOCreation) (Task, error) {
|
||||
type NewTaskOpts func(context.Context, *Client, *execution.CreateRequest) error
|
||||
|
||||
func (c *container) NewTask(ctx context.Context, ioCreate IOCreation, opts ...NewTaskOpts) (Task, error) {
|
||||
i, err := ioCreate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -97,15 +99,28 @@ func (c *container) NewTask(ctx context.Context, ioCreate IOCreation) (Task, err
|
||||
})
|
||||
}
|
||||
}
|
||||
response, err := c.client.TaskService().Create(ctx, request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for _, o := range opts {
|
||||
if err := o(ctx, c.client, request); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
t := &task{
|
||||
client: c.client,
|
||||
io: i,
|
||||
containerID: response.ContainerID,
|
||||
pid: response.Pid,
|
||||
containerID: c.ID(),
|
||||
pidSync: make(chan struct{}),
|
||||
}
|
||||
|
||||
if request.Checkpoint != nil {
|
||||
// we need to defer the create call to start
|
||||
t.deferred = request
|
||||
} else {
|
||||
response, err := c.client.TaskService().Create(ctx, request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.pid = response.Pid
|
||||
close(t.pidSync)
|
||||
}
|
||||
c.task = t
|
||||
return t, nil
|
||||
|
127
container_linux.go
Normal file
127
container_linux.go
Normal file
@ -0,0 +1,127 @@
|
||||
package containerd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/containerd/containerd/api/services/containers"
|
||||
"github.com/containerd/containerd/api/services/execution"
|
||||
"github.com/containerd/containerd/api/types/descriptor"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
protobuf "github.com/gogo/protobuf/types"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/opencontainers/image-spec/identity"
|
||||
"github.com/opencontainers/image-spec/specs-go/v1"
|
||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||
)
|
||||
|
||||
func WithSpec(spec *specs.Spec) NewContainerOpts {
|
||||
return func(ctx context.Context, client *Client, c *containers.Container) error {
|
||||
data, err := json.Marshal(spec)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Spec = &protobuf.Any{
|
||||
TypeUrl: spec.Version,
|
||||
Value: data,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithCheckpoint(desc v1.Descriptor, rootfsID string) NewContainerOpts {
|
||||
// set image and rw, and spec
|
||||
return func(ctx context.Context, client *Client, c *containers.Container) error {
|
||||
id := desc.Digest
|
||||
store := client.ContentStore()
|
||||
index, err := decodeIndex(ctx, store, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var rw *v1.Descriptor
|
||||
for _, m := range index.Manifests {
|
||||
switch m.MediaType {
|
||||
case v1.MediaTypeImageLayer:
|
||||
fk := m
|
||||
rw = &fk
|
||||
case images.MediaTypeDockerSchema2Manifest:
|
||||
config, err := images.Config(ctx, store, m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
diffIDs, err := images.RootFS(ctx, store, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := client.SnapshotService().Prepare(ctx, rootfsID, identity.ChainID(diffIDs).String()); err != nil {
|
||||
if !snapshot.IsExist(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
c.Image = index.Annotations["image.name"]
|
||||
case images.MediaTypeContainerd1CheckpointConfig:
|
||||
r, err := store.Reader(ctx, m.Digest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := ioutil.ReadAll(r)
|
||||
r.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Spec = &protobuf.Any{
|
||||
TypeUrl: specs.Version,
|
||||
Value: data,
|
||||
}
|
||||
}
|
||||
}
|
||||
if rw != nil {
|
||||
// apply the rw snapshot to the new rw layer
|
||||
mounts, err := client.SnapshotService().Mounts(ctx, rootfsID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := client.DiffService().Apply(ctx, *rw, mounts); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
c.RootFS = rootfsID
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithTaskCheckpoint(desc v1.Descriptor) NewTaskOpts {
|
||||
return func(ctx context.Context, c *Client, r *execution.CreateRequest) error {
|
||||
id := desc.Digest
|
||||
index, err := decodeIndex(ctx, c.ContentStore(), id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, m := range index.Manifests {
|
||||
if m.MediaType == images.MediaTypeContainerd1Checkpoint {
|
||||
r.Checkpoint = &descriptor.Descriptor{
|
||||
MediaType: m.MediaType,
|
||||
Size_: m.Size,
|
||||
Digest: m.Digest,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("checkpoint not found in index %s", id)
|
||||
}
|
||||
}
|
||||
|
||||
func decodeIndex(ctx context.Context, store content.Store, id digest.Digest) (*v1.Index, error) {
|
||||
var index v1.Index
|
||||
r, err := store.Reader(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = json.NewDecoder(r).Decode(&index)
|
||||
r.Close()
|
||||
return &index, err
|
||||
}
|
@ -53,7 +53,7 @@ func TestNewContainer(t *testing.T) {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
container, err := client.NewContainer(context.Background(), id, spec)
|
||||
container, err := client.NewContainer(context.Background(), id, WithSpec(spec))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
@ -96,7 +96,7 @@ func TestContainerStart(t *testing.T) {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
container, err := client.NewContainer(ctx, id, spec, WithImage(image), WithNewRootFS(id, image))
|
||||
container, err := client.NewContainer(ctx, id, WithSpec(spec), WithImage(image), WithNewRootFS(id, image))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
@ -165,7 +165,7 @@ func TestContainerOutput(t *testing.T) {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
container, err := client.NewContainer(ctx, id, spec, WithImage(image), WithNewRootFS(id, image))
|
||||
container, err := client.NewContainer(ctx, id, WithSpec(spec), WithImage(image), WithNewRootFS(id, image))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
@ -235,7 +235,7 @@ func TestContainerExec(t *testing.T) {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
container, err := client.NewContainer(ctx, id, spec, WithImage(image), WithNewRootFS(id, image))
|
||||
container, err := client.NewContainer(ctx, id, WithSpec(spec), WithImage(image), WithNewRootFS(id, image))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
@ -320,7 +320,7 @@ func TestContainerProcesses(t *testing.T) {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
container, err := client.NewContainer(ctx, id, spec, WithImage(image), WithNewRootFS(id, image))
|
||||
container, err := client.NewContainer(ctx, id, WithSpec(spec), WithImage(image), WithNewRootFS(id, image))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
@ -391,7 +391,7 @@ func TestContainerCloseStdin(t *testing.T) {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
container, err := client.NewContainer(ctx, id, spec, WithImage(image), WithNewRootFS(id, image))
|
||||
container, err := client.NewContainer(ctx, id, WithSpec(spec), WithImage(image), WithNewRootFS(id, image))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
@ -473,7 +473,7 @@ func TestContainerAttach(t *testing.T) {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
container, err := client.NewContainer(ctx, id, spec, WithImage(image), WithNewRootFS(id, image))
|
||||
container, err := client.NewContainer(ctx, id, WithSpec(spec), WithImage(image), WithNewRootFS(id, image))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
|
@ -33,34 +33,7 @@ type Store interface {
|
||||
// The caller can then use the descriptor to resolve and process the
|
||||
// configuration of the image.
|
||||
func (image *Image) Config(ctx context.Context, provider content.Provider) (ocispec.Descriptor, error) {
|
||||
var configDesc ocispec.Descriptor
|
||||
return configDesc, Walk(ctx, HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||
switch image.Target.MediaType {
|
||||
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
|
||||
rc, err := provider.Reader(ctx, image.Target.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
p, err := ioutil.ReadAll(rc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var manifest ocispec.Manifest
|
||||
if err := json.Unmarshal(p, &manifest); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
configDesc = manifest.Config
|
||||
|
||||
return nil, nil
|
||||
default:
|
||||
return nil, errors.New("could not resolve config")
|
||||
}
|
||||
|
||||
}), image.Target)
|
||||
return Config(ctx, provider, image.Target)
|
||||
}
|
||||
|
||||
// RootFS returns the unpacked diffids that make up and images rootfs.
|
||||
@ -112,12 +85,43 @@ func (image *Image) Size(ctx context.Context, provider content.Provider) (int64,
|
||||
}), image.Target)
|
||||
}
|
||||
|
||||
func Config(ctx context.Context, provider content.Provider, image ocispec.Descriptor) (ocispec.Descriptor, error) {
|
||||
var configDesc ocispec.Descriptor
|
||||
return configDesc, Walk(ctx, HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||
switch image.MediaType {
|
||||
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
|
||||
rc, err := provider.Reader(ctx, image.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
p, err := ioutil.ReadAll(rc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var manifest ocispec.Manifest
|
||||
if err := json.Unmarshal(p, &manifest); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
configDesc = manifest.Config
|
||||
|
||||
return nil, nil
|
||||
default:
|
||||
return nil, errors.New("could not resolve config")
|
||||
}
|
||||
|
||||
}), image)
|
||||
}
|
||||
|
||||
// RootFS returns the unpacked diffids that make up and images rootfs.
|
||||
//
|
||||
// These are used to verify that a set of layers unpacked to the expected
|
||||
// values.
|
||||
func RootFS(ctx context.Context, provider content.Provider, desc ocispec.Descriptor) ([]digest.Digest, error) {
|
||||
p, err := content.ReadBlob(ctx, provider, desc.Digest)
|
||||
func RootFS(ctx context.Context, provider content.Provider, configDesc ocispec.Descriptor) ([]digest.Digest, error) {
|
||||
p, err := content.ReadBlob(ctx, provider, configDesc.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ type Runtime struct {
|
||||
monitor plugin.TaskMonitor
|
||||
}
|
||||
|
||||
func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (plugin.Task, error) {
|
||||
func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (t plugin.Task, err error) {
|
||||
path, err := r.newBundle(id, opts.Spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -145,6 +145,7 @@ func newInitProcess(context context.Context, path string, r *shimapi.CreateReque
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
copyWaitGroup.Wait()
|
||||
pid, err := runc.ReadPidFile(pidFile)
|
||||
if err != nil {
|
||||
|
@ -70,13 +70,13 @@ func (m *Monitor) Start(c *exec.Cmd) error {
|
||||
c: c,
|
||||
ExitCh: make(chan int, 1),
|
||||
}
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
// start the process
|
||||
if err := rc.c.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Lock()
|
||||
m.cmds[rc.c.Process.Pid] = rc
|
||||
m.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -156,11 +156,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
|
||||
s.mu.Unlock()
|
||||
state, err := c.State(ctx)
|
||||
if err != nil {
|
||||
s.mu.Lock()
|
||||
delete(s.tasks, r.ContainerID)
|
||||
runtime.Delete(ctx, c)
|
||||
s.mu.Unlock()
|
||||
return nil, err
|
||||
log.G(ctx).Error(err)
|
||||
}
|
||||
return &api.CreateResponse{
|
||||
ContainerID: r.ContainerID,
|
||||
|
142
task.go
142
task.go
@ -1,11 +1,20 @@
|
||||
package containerd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"runtime"
|
||||
"syscall"
|
||||
|
||||
"github.com/containerd/containerd/api/services/containers"
|
||||
"github.com/containerd/containerd/api/services/execution"
|
||||
taskapi "github.com/containerd/containerd/api/types/task"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/rootfs"
|
||||
"github.com/opencontainers/image-spec/specs-go/v1"
|
||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||
)
|
||||
|
||||
@ -21,6 +30,8 @@ const (
|
||||
Pausing TaskStatus = "pausing"
|
||||
)
|
||||
|
||||
type CheckpointOpts func(*execution.CheckpointRequest) error
|
||||
|
||||
type Task interface {
|
||||
Pid() uint32
|
||||
Delete(context.Context) (uint32, error)
|
||||
@ -35,6 +46,7 @@ type Task interface {
|
||||
CloseStdin(context.Context) error
|
||||
Resize(ctx context.Context, w, h uint32) error
|
||||
IO() *IO
|
||||
Checkpoint(context.Context, ...CheckpointOpts) (v1.Descriptor, error)
|
||||
}
|
||||
|
||||
type Process interface {
|
||||
@ -55,6 +67,9 @@ type task struct {
|
||||
io *IO
|
||||
containerID string
|
||||
pid uint32
|
||||
|
||||
deferred *execution.CreateRequest
|
||||
pidSync chan struct{}
|
||||
}
|
||||
|
||||
// Pid returns the pid or process id for the task
|
||||
@ -63,6 +78,16 @@ func (t *task) Pid() uint32 {
|
||||
}
|
||||
|
||||
func (t *task) Start(ctx context.Context) error {
|
||||
if t.deferred != nil {
|
||||
response, err := t.client.TaskService().Create(ctx, t.deferred)
|
||||
t.deferred = nil
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.pid = response.Pid
|
||||
close(t.pidSync)
|
||||
return nil
|
||||
}
|
||||
_, err := t.client.TaskService().Start(ctx, &execution.StartRequest{
|
||||
ContainerID: t.containerID,
|
||||
})
|
||||
@ -110,6 +135,7 @@ func (t *task) Wait(ctx context.Context) (uint32, error) {
|
||||
if err != nil {
|
||||
return UnknownExitStatus, err
|
||||
}
|
||||
<-t.pidSync
|
||||
for {
|
||||
e, err := events.Recv()
|
||||
if err != nil {
|
||||
@ -186,3 +212,119 @@ func (t *task) Resize(ctx context.Context, w, h uint32) error {
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func WithExit(r *execution.CheckpointRequest) error {
|
||||
r.Exit = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointOpts) (d v1.Descriptor, err error) {
|
||||
request := &execution.CheckpointRequest{
|
||||
ContainerID: t.containerID,
|
||||
}
|
||||
for _, o := range opts {
|
||||
if err := o(request); err != nil {
|
||||
return d, err
|
||||
}
|
||||
}
|
||||
// if we are not exiting the container after the checkpoint, make sure we pause it and resume after
|
||||
// all other filesystem operations are completed
|
||||
if !request.Exit {
|
||||
if err := t.Pause(ctx); err != nil {
|
||||
return d, err
|
||||
}
|
||||
defer t.Resume(ctx)
|
||||
}
|
||||
cr, err := t.client.ContainerService().Get(ctx, &containers.GetContainerRequest{
|
||||
ID: t.containerID,
|
||||
})
|
||||
if err != nil {
|
||||
return d, err
|
||||
}
|
||||
var index v1.Index
|
||||
if err := t.checkpointTask(ctx, &index, request); err != nil {
|
||||
return d, err
|
||||
}
|
||||
if err := t.checkpointImage(ctx, &index, cr.Container.Image); err != nil {
|
||||
return d, err
|
||||
}
|
||||
if err := t.checkpointRWSnapshot(ctx, &index, cr.Container.RootFS); err != nil {
|
||||
return d, err
|
||||
}
|
||||
index.Annotations = make(map[string]string)
|
||||
index.Annotations["image.name"] = cr.Container.Image
|
||||
return t.writeIndex(ctx, &index)
|
||||
}
|
||||
|
||||
func (t *task) checkpointTask(ctx context.Context, index *v1.Index, request *execution.CheckpointRequest) error {
|
||||
response, err := t.client.TaskService().Checkpoint(ctx, request)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// add the checkpoint descriptors to the index
|
||||
for _, d := range response.Descriptors {
|
||||
index.Manifests = append(index.Manifests, v1.Descriptor{
|
||||
MediaType: d.MediaType,
|
||||
Size: d.Size_,
|
||||
Digest: d.Digest,
|
||||
Platform: &v1.Platform{
|
||||
OS: runtime.GOOS,
|
||||
Architecture: runtime.GOARCH,
|
||||
},
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *task) checkpointRWSnapshot(ctx context.Context, index *v1.Index, id string) error {
|
||||
rw, err := rootfs.Diff(ctx, id, fmt.Sprintf("checkpoint-rw-%s", id), t.client.SnapshotService(), t.client.DiffService())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rw.Platform = &v1.Platform{
|
||||
OS: runtime.GOOS,
|
||||
Architecture: runtime.GOARCH,
|
||||
}
|
||||
index.Manifests = append(index.Manifests, rw)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *task) checkpointImage(ctx context.Context, index *v1.Index, image string) error {
|
||||
if image == "" {
|
||||
return fmt.Errorf("cannot checkpoint image with empty name")
|
||||
}
|
||||
ir, err := t.client.ImageService().Get(ctx, image)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
index.Manifests = append(index.Manifests, ir.Target)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *task) writeIndex(ctx context.Context, index *v1.Index) (v1.Descriptor, error) {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
if err := json.NewEncoder(buf).Encode(index); err != nil {
|
||||
return v1.Descriptor{}, err
|
||||
}
|
||||
return writeContent(ctx, t.client.ContentStore(), v1.MediaTypeImageIndex, t.containerID, buf)
|
||||
}
|
||||
|
||||
func writeContent(ctx context.Context, store content.Store, mediaType, ref string, r io.Reader) (d v1.Descriptor, err error) {
|
||||
writer, err := store.Writer(ctx, ref, 0, "")
|
||||
if err != nil {
|
||||
return d, err
|
||||
}
|
||||
defer writer.Close()
|
||||
size, err := io.Copy(writer, r)
|
||||
if err != nil {
|
||||
return d, err
|
||||
}
|
||||
if err := writer.Commit(0, ""); err != nil {
|
||||
return d, err
|
||||
}
|
||||
return v1.Descriptor{
|
||||
MediaType: mediaType,
|
||||
Digest: writer.Digest(),
|
||||
Size: size,
|
||||
}, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user