Merge pull request #1283 from mlaventure/resurrect-state-dir

Resurrect State directory
This commit is contained in:
Phil Estes 2017-08-07 10:41:21 -04:00 committed by GitHub
commit e9b86af848
15 changed files with 89 additions and 25 deletions

View File

@ -58,6 +58,10 @@ func main() {
Name: "address,a", Name: "address,a",
Usage: "grpc address back to containerd", Usage: "grpc address back to containerd",
}, },
cli.StringFlag{
Name: "workdir,w",
Usage: "path used to store large temporary data",
},
} }
app.Before = func(context *cli.Context) error { app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") { if context.GlobalBool("debug") {
@ -84,6 +88,7 @@ func main() {
sv, err := shim.NewService( sv, err := shim.NewService(
path, path,
context.GlobalString("namespace"), context.GlobalString("namespace"),
context.GlobalString("workdir"),
&remoteEventsPublisher{client: e}, &remoteEventsPublisher{client: e},
) )
if err != nil { if err != nil {

View File

@ -6,7 +6,8 @@ import (
func defaultConfig() *server.Config { func defaultConfig() *server.Config {
return &server.Config{ return &server.Config{
Root: "/var/lib/containerd", Root: server.DefaultRootDir,
State: server.DefaultStateDir,
GRPC: server.GRPCConfig{ GRPC: server.GRPCConfig{
Address: server.DefaultAddress, Address: server.DefaultAddress,
}, },

View File

@ -6,7 +6,8 @@ import "github.com/containerd/containerd/server"
func defaultConfig() *server.Config { func defaultConfig() *server.Config {
return &server.Config{ return &server.Config{
Root: "/var/lib/containerd", Root: server.DefaultRootDir,
State: server.DefaultStateDir,
GRPC: server.GRPCConfig{ GRPC: server.GRPCConfig{
Address: server.DefaultAddress, Address: server.DefaultAddress,
}, },

View File

@ -1,15 +1,11 @@
package main package main
import ( import "github.com/containerd/containerd/server"
"os"
"path/filepath"
"github.com/containerd/containerd/server"
)
func defaultConfig() *server.Config { func defaultConfig() *server.Config {
return &server.Config{ return &server.Config{
Root: filepath.Join(os.Getenv("programfiles"), "containerd", "root"), Root: server.DefaultRootDir,
State: server.DefaultStateDir,
GRPC: server.GRPCConfig{ GRPC: server.GRPCConfig{
Address: server.DefaultAddress, Address: server.DefaultAddress,
}, },

View File

@ -25,7 +25,7 @@ func loadBundle(path, namespace string, events *events.Exchange) *bundle {
} }
// newBundle creates a new bundle on disk at the provided path for the given id // newBundle creates a new bundle on disk at the provided path for the given id
func newBundle(path, namespace, id string, spec []byte, events *events.Exchange) (b *bundle, err error) { func newBundle(path, namespace, workDir, id string, spec []byte, events *events.Exchange) (b *bundle, err error) {
if err := os.MkdirAll(path, 0711); err != nil { if err := os.MkdirAll(path, 0711); err != nil {
return nil, err return nil, err
} }
@ -35,6 +35,16 @@ func newBundle(path, namespace, id string, spec []byte, events *events.Exchange)
os.RemoveAll(path) os.RemoveAll(path)
} }
}() }()
workDir = filepath.Join(workDir, id)
if err := os.MkdirAll(workDir, 0711); err != nil {
return nil, err
}
defer func() {
if err != nil {
os.RemoveAll(workDir)
}
}()
if err := os.Mkdir(path, 0711); err != nil { if err := os.Mkdir(path, 0711); err != nil {
return nil, err return nil, err
} }
@ -50,6 +60,7 @@ func newBundle(path, namespace, id string, spec []byte, events *events.Exchange)
return &bundle{ return &bundle{
id: id, id: id,
path: path, path: path,
workDir: workDir,
namespace: namespace, namespace: namespace,
events: events, events: events,
}, err }, err
@ -58,6 +69,7 @@ func newBundle(path, namespace, id string, spec []byte, events *events.Exchange)
type bundle struct { type bundle struct {
id string id string
path string path string
workDir string
namespace string namespace string
events *events.Exchange events *events.Exchange
} }
@ -81,6 +93,7 @@ func (b *bundle) NewShim(ctx context.Context, binary, grpcAddress string, remote
Path: b.path, Path: b.path,
Namespace: b.namespace, Namespace: b.namespace,
CgroupPath: options.ShimCgroup, CgroupPath: options.ShimCgroup,
WorkDir: b.workDir,
}, opt) }, opt)
} }

View File

@ -72,6 +72,9 @@ func New(ic *plugin.InitContext) (interface{}, error) {
if err := os.MkdirAll(ic.Root, 0711); err != nil { if err := os.MkdirAll(ic.Root, 0711); err != nil {
return nil, err return nil, err
} }
if err := os.MkdirAll(ic.State, 0711); err != nil {
return nil, err
}
monitor, err := ic.Get(plugin.TaskMonitorPlugin) monitor, err := ic.Get(plugin.TaskMonitorPlugin)
if err != nil { if err != nil {
return nil, err return nil, err
@ -83,6 +86,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
cfg := ic.Config.(*Config) cfg := ic.Config.(*Config)
r := &Runtime{ r := &Runtime{
root: ic.Root, root: ic.Root,
state: ic.State,
remote: !cfg.NoShim, remote: !cfg.NoShim,
shim: cfg.Shim, shim: cfg.Shim,
shimDebug: cfg.ShimDebug, shimDebug: cfg.ShimDebug,
@ -107,6 +111,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
type Runtime struct { type Runtime struct {
root string root string
state string
shim string shim string
shimDebug bool shimDebug bool
runtime string runtime string
@ -133,7 +138,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
return nil, errors.Wrapf(err, "invalid task id") return nil, errors.Wrapf(err, "invalid task id")
} }
bundle, err := newBundle(filepath.Join(r.root, namespace), namespace, id, opts.Spec.Value, r.events) bundle, err := newBundle(filepath.Join(r.state, namespace), namespace, filepath.Join(r.root, namespace), id, opts.Spec.Value, r.events)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -206,7 +211,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er
} }
r.tasks.Delete(ctx, lc) r.tasks.Delete(ctx, lc)
bundle := loadBundle(filepath.Join(r.root, namespace, lc.id), namespace, r.events) bundle := loadBundle(filepath.Join(r.state, namespace, lc.id), namespace, r.events)
if err := bundle.Delete(); err != nil { if err := bundle.Delete(); err != nil {
return nil, err return nil, err
} }
@ -222,7 +227,7 @@ func (r *Runtime) Tasks(ctx context.Context) ([]runtime.Task, error) {
} }
func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) { func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
dir, err := ioutil.ReadDir(r.root) dir, err := ioutil.ReadDir(r.state)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -247,7 +252,7 @@ func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) {
} }
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
dir, err := ioutil.ReadDir(filepath.Join(r.root, ns)) dir, err := ioutil.ReadDir(filepath.Join(r.state, ns))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -257,7 +262,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
continue continue
} }
id := path.Name() id := path.Name()
bundle := loadBundle(filepath.Join(r.root, ns, id), ns, r.events) bundle := loadBundle(filepath.Join(r.state, ns, id), ns, r.events)
s, err := bundle.Connect(ctx, r.remote) s, err := bundle.Connect(ctx, r.remote)
if err != nil { if err != nil {

View File

@ -81,6 +81,7 @@ func newCommand(binary, address string, debug bool, config Config, socket *os.Fi
args := []string{ args := []string{
"--namespace", config.Namespace, "--namespace", config.Namespace,
"--address", address, "--address", address,
"--workdir", config.WorkDir,
} }
if debug { if debug {
args = append(args, "--debug") args = append(args, "--debug")
@ -152,7 +153,7 @@ func WithConnect(ctx context.Context, config Config) (shim.ShimClient, io.Closer
// WithLocal uses an in process shim // WithLocal uses an in process shim
func WithLocal(publisher events.Publisher) func(context.Context, Config) (shim.ShimClient, io.Closer, error) { func WithLocal(publisher events.Publisher) func(context.Context, Config) (shim.ShimClient, io.Closer, error) {
return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) { return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
service, err := NewService(config.Path, config.Namespace, publisher) service, err := NewService(config.Path, config.Namespace, config.WorkDir, publisher)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -165,6 +166,7 @@ type Config struct {
Path string Path string
Namespace string Namespace string
CgroupPath string CgroupPath string
WorkDir string
} }
// New returns a new shim client // New returns a new shim client

View File

@ -39,6 +39,8 @@ type initProcess struct {
// the reaper interface. // the reaper interface.
mu sync.Mutex mu sync.Mutex
workDir string
id string id string
bundle string bundle string
console console.Console console console.Console
@ -54,7 +56,7 @@ type initProcess struct {
rootfs string rootfs string
} }
func newInitProcess(context context.Context, plat platform, path, namespace string, r *shimapi.CreateTaskRequest) (*initProcess, error) { func newInitProcess(context context.Context, plat platform, path, namespace, workDir string, r *shimapi.CreateTaskRequest) (*initProcess, error) {
var success bool var success bool
if err := identifiers.Validate(r.ID); err != nil { if err := identifiers.Validate(r.ID); err != nil {
@ -109,7 +111,8 @@ func newInitProcess(context context.Context, plat platform, path, namespace stri
stderr: r.Stderr, stderr: r.Stderr,
terminal: r.Terminal, terminal: r.Terminal,
}, },
rootfs: rootfs, rootfs: rootfs,
workDir: workDir,
} }
var ( var (
err error err error
@ -132,7 +135,7 @@ func newInitProcess(context context.Context, plat platform, path, namespace stri
opts := &runc.RestoreOpts{ opts := &runc.RestoreOpts{
CheckpointOpts: runc.CheckpointOpts{ CheckpointOpts: runc.CheckpointOpts{
ImagePath: r.Checkpoint, ImagePath: r.Checkpoint,
WorkDir: filepath.Join(r.Bundle, "work"), WorkDir: p.workDir,
ParentPath: r.ParentCheckpoint, ParentPath: r.ParentCheckpoint,
}, },
PidFile: pidFile, PidFile: pidFile,
@ -310,10 +313,10 @@ func (p *initProcess) Checkpoint(context context.Context, r *shimapi.CheckpointT
if !options.Exit { if !options.Exit {
actions = append(actions, runc.LeaveRunning) actions = append(actions, runc.LeaveRunning)
} }
work := filepath.Join(p.bundle, "work") work := filepath.Join(p.workDir, "criu-work")
defer os.RemoveAll(work) defer os.RemoveAll(work)
if err := p.runtime.Checkpoint(context, p.id, &runc.CheckpointOpts{ if err := p.runtime.Checkpoint(context, p.id, &runc.CheckpointOpts{
WorkDir: work, WorkDir: p.workDir,
ImagePath: r.Path, ImagePath: r.Path,
AllowOpenTCP: options.OpenTcp, AllowOpenTCP: options.OpenTcp,
AllowExternalUnixSockets: options.ExternalUnixSockets, AllowExternalUnixSockets: options.ExternalUnixSockets,

View File

@ -30,7 +30,7 @@ var empty = &google_protobuf.Empty{}
const RuncRoot = "/run/containerd/runc" const RuncRoot = "/run/containerd/runc"
// NewService returns a new shim service that can be used via GRPC // NewService returns a new shim service that can be used via GRPC
func NewService(path, namespace string, publisher events.Publisher) (*Service, error) { func NewService(path, namespace, workDir string, publisher events.Publisher) (*Service, error) {
if namespace == "" { if namespace == "" {
return nil, fmt.Errorf("shim namespace cannot be empty") return nil, fmt.Errorf("shim namespace cannot be empty")
} }
@ -41,6 +41,7 @@ func NewService(path, namespace string, publisher events.Publisher) (*Service, e
events: make(chan interface{}, 4096), events: make(chan interface{}, 4096),
namespace: namespace, namespace: namespace,
context: context, context: context,
workDir: workDir,
} }
if err := s.initPlatform(); err != nil { if err := s.initPlatform(); err != nil {
return nil, errors.Wrap(err, "failed to initialized platform behavior") return nil, errors.Wrap(err, "failed to initialized platform behavior")
@ -69,11 +70,12 @@ type Service struct {
namespace string namespace string
context context.Context context context.Context
workDir string
platform platform platform platform
} }
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) { func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
process, err := newInitProcess(ctx, s.platform, s.path, s.namespace, r) process, err := newInitProcess(ctx, s.platform, s.path, s.namespace, s.workDir, r)
if err != nil { if err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }

View File

@ -9,16 +9,18 @@ import (
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
) )
func NewContext(ctx context.Context, plugins map[PluginType]map[string]interface{}, root, id string) *InitContext { func NewContext(ctx context.Context, plugins map[PluginType]map[string]interface{}, root, state, id string) *InitContext {
return &InitContext{ return &InitContext{
plugins: plugins, plugins: plugins,
Root: filepath.Join(root, id), Root: filepath.Join(root, id),
State: filepath.Join(state, id),
Context: log.WithModule(ctx, id), Context: log.WithModule(ctx, id),
} }
} }
type InitContext struct { type InitContext struct {
Root string Root string
State string
Address string Address string
Context context.Context Context context.Context
Config interface{} Config interface{}

View File

@ -11,6 +11,8 @@ import (
type Config struct { type Config struct {
// Root is the path to a directory where containerd will store persistent data // Root is the path to a directory where containerd will store persistent data
Root string `toml:"root"` Root string `toml:"root"`
// State is the path to a directory where containerd will store transient data
State string `toml:"state"`
// GRPC configuration settings // GRPC configuration settings
GRPC GRPCConfig `toml:"grpc"` GRPC GRPCConfig `toml:"grpc"`
// Debug and profiling settings // Debug and profiling settings

View File

@ -36,9 +36,15 @@ func New(ctx context.Context, config *Config) (*Server, error) {
if config.Root == "" { if config.Root == "" {
return nil, errors.New("root must be specified") return nil, errors.New("root must be specified")
} }
if config.State == "" {
return nil, errors.New("state must be specified")
}
if err := os.MkdirAll(config.Root, 0711); err != nil { if err := os.MkdirAll(config.Root, 0711); err != nil {
return nil, err return nil, err
} }
if err := os.MkdirAll(config.State, 0711); err != nil {
return nil, err
}
if err := apply(ctx, config); err != nil { if err := apply(ctx, config); err != nil {
return nil, err return nil, err
} }
@ -66,6 +72,7 @@ func New(ctx context.Context, config *Config) (*Server, error) {
ctx, ctx,
initialized, initialized,
config.Root, config.Root,
config.State,
id, id,
) )
initContext.Events = s.events initContext.Events = s.events

View File

@ -9,6 +9,12 @@ import (
) )
const ( const (
// DefaultRootDir is the default location used by containerd to store
// persistent data
DefaultRootDir = "/var/lib/containerd"
// DefaultStateDir is the default location used by containerd to store
// transient data
DefaultStateDir = "/run/containerd"
// DefaultAddress is the default unix socket address // DefaultAddress is the default unix socket address
DefaultAddress = "/run/containerd/containerd.sock" DefaultAddress = "/run/containerd/containerd.sock"
// DefaultDebuggAddress is the default unix socket address for pprof data // DefaultDebuggAddress is the default unix socket address for pprof data

View File

@ -5,6 +5,12 @@ package server
import "context" import "context"
const ( const (
// DefaultRootDir is the default location used by containerd to store
// persistent data
DefaultRootDir = "/var/lib/containerd"
// DefaultStateDir is the default location used by containerd to store
// transient data
DefaultStateDir = "/run/containerd"
// DefaultAddress is the default unix socket address // DefaultAddress is the default unix socket address
DefaultAddress = "/run/containerd/containerd.sock" DefaultAddress = "/run/containerd/containerd.sock"
// DefaultDebuggAddress is the default unix socket address for pprof data // DefaultDebuggAddress is the default unix socket address for pprof data

View File

@ -2,7 +2,20 @@
package server package server
import "context" import (
"context"
"os"
"path/filepath"
)
var (
// DefaultRootDir is the default location used by containerd to store
// persistent data
DefaultRootDir = filepath.Join(os.Getenv("programfiles"), "containerd", "root")
// DefaultStateDir is the default location used by containerd to store
// transient data
DefaultStateDir = filepath.Join(os.Getenv("programfiles"), "containerd", "state")
)
const ( const (
// DefaultAddress is the default winpipe address // DefaultAddress is the default winpipe address