use epoll to manage console i/o in linux
this adds a `platform` interface for shim service to manage platform-specific behaviors such as I/O (which uses epoll in linux to work around bugs with applications that closes all consoles i.e. https://github.com/opencontainers/runc/pull/1434 and https://github.com/moby/moby/issues/27202) Its expected that we only have 1 epollfd per containerd_shim to manage all processes. Since all the work are done outside of the container runtime, upgrading of runc is not required and should be done separately. Signed-off-by: Daniel Dao <dqminh89@gmail.com>
This commit is contained in:
parent
de2671b7f5
commit
8e53465842
@ -105,10 +105,11 @@ func newExecProcess(context context.Context, path string, r *shimapi.ExecProcess
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "failed to retrieve console master")
|
return nil, errors.Wrap(err, "failed to retrieve console master")
|
||||||
}
|
}
|
||||||
e.console = console
|
console, err = e.parent.platform.copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, ©WaitGroup)
|
||||||
if err := copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, ©WaitGroup); err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "failed to start console copy")
|
return nil, errors.Wrap(err, "failed to start console copy")
|
||||||
}
|
}
|
||||||
|
e.console = console
|
||||||
} else {
|
} else {
|
||||||
if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, ©WaitGroup); err != nil {
|
if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, ©WaitGroup); err != nil {
|
||||||
return nil, errors.Wrap(err, "failed to start io pipe copy")
|
return nil, errors.Wrap(err, "failed to start io pipe copy")
|
||||||
@ -142,6 +143,7 @@ func (e *execProcess) ExitedAt() time.Time {
|
|||||||
func (e *execProcess) Exited(status int) {
|
func (e *execProcess) Exited(status int) {
|
||||||
e.status = status
|
e.status = status
|
||||||
e.exited = time.Now()
|
e.exited = time.Now()
|
||||||
|
e.parent.platform.shutdownConsole(context.Background(), e.console)
|
||||||
e.Wait()
|
e.Wait()
|
||||||
if e.io != nil {
|
if e.io != nil {
|
||||||
for _, c := range e.closers {
|
for _, c := range e.closers {
|
||||||
|
@ -39,21 +39,22 @@ type initProcess struct {
|
|||||||
// the reaper interface.
|
// the reaper interface.
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
id string
|
id string
|
||||||
bundle string
|
bundle string
|
||||||
console console.Console
|
console console.Console
|
||||||
io runc.IO
|
platform platform
|
||||||
runtime *runc.Runc
|
io runc.IO
|
||||||
status int
|
runtime *runc.Runc
|
||||||
exited time.Time
|
status int
|
||||||
pid int
|
exited time.Time
|
||||||
closers []io.Closer
|
pid int
|
||||||
stdin io.Closer
|
closers []io.Closer
|
||||||
stdio stdio
|
stdin io.Closer
|
||||||
rootfs string
|
stdio stdio
|
||||||
|
rootfs string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newInitProcess(context context.Context, path, namespace string, r *shimapi.CreateTaskRequest) (*initProcess, error) {
|
func newInitProcess(context context.Context, plat platform, path, namespace string, r *shimapi.CreateTaskRequest) (*initProcess, error) {
|
||||||
var success bool
|
var success bool
|
||||||
|
|
||||||
if err := identifiers.Validate(r.ID); err != nil {
|
if err := identifiers.Validate(r.ID); err != nil {
|
||||||
@ -98,9 +99,10 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi.
|
|||||||
Root: filepath.Join(RuncRoot, namespace),
|
Root: filepath.Join(RuncRoot, namespace),
|
||||||
}
|
}
|
||||||
p := &initProcess{
|
p := &initProcess{
|
||||||
id: r.ID,
|
id: r.ID,
|
||||||
bundle: r.Bundle,
|
bundle: r.Bundle,
|
||||||
runtime: runtime,
|
runtime: runtime,
|
||||||
|
platform: plat,
|
||||||
stdio: stdio{
|
stdio: stdio{
|
||||||
stdin: r.Stdin,
|
stdin: r.Stdin,
|
||||||
stdout: r.Stdout,
|
stdout: r.Stdout,
|
||||||
@ -170,10 +172,11 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi.
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "failed to retrieve console master")
|
return nil, errors.Wrap(err, "failed to retrieve console master")
|
||||||
}
|
}
|
||||||
p.console = console
|
console, err = plat.copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, ©WaitGroup)
|
||||||
if err := copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, ©WaitGroup); err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "failed to start console copy")
|
return nil, errors.Wrap(err, "failed to start console copy")
|
||||||
}
|
}
|
||||||
|
p.console = console
|
||||||
} else {
|
} else {
|
||||||
if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, ©WaitGroup); err != nil {
|
if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, ©WaitGroup); err != nil {
|
||||||
return nil, errors.Wrap(err, "failed to start io pipe copy")
|
return nil, errors.Wrap(err, "failed to start io pipe copy")
|
||||||
@ -238,6 +241,9 @@ func (p *initProcess) Delete(context context.Context) error {
|
|||||||
return fmt.Errorf("cannot delete a running container")
|
return fmt.Errorf("cannot delete a running container")
|
||||||
}
|
}
|
||||||
p.killAll(context)
|
p.killAll(context)
|
||||||
|
if err := p.platform.shutdownConsole(context, p.console); err != nil {
|
||||||
|
log.G(context).WithError(err).Warn("Failed to shutdown container console")
|
||||||
|
}
|
||||||
p.Wait()
|
p.Wait()
|
||||||
err = p.runtime.Delete(context, p.id, nil)
|
err = p.runtime.Delete(context, p.id, nil)
|
||||||
if p.io != nil {
|
if p.io != nil {
|
||||||
|
@ -56,10 +56,20 @@ func NewService(path, namespace, address string) (*Service, error) {
|
|||||||
namespace: namespace,
|
namespace: namespace,
|
||||||
context: context,
|
context: context,
|
||||||
}
|
}
|
||||||
|
if err := s.initPlatform(); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "failed to initialized platform behavior")
|
||||||
|
}
|
||||||
go s.forward(client)
|
go s.forward(client)
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// platform handles platform-specific behavior that may differs across
|
||||||
|
// platform implementations
|
||||||
|
type platform interface {
|
||||||
|
copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error)
|
||||||
|
shutdownConsole(ctx context.Context, console console.Console) error
|
||||||
|
}
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
initProcess *initProcess
|
initProcess *initProcess
|
||||||
path string
|
path string
|
||||||
@ -72,10 +82,12 @@ type Service struct {
|
|||||||
deferredEvent interface{}
|
deferredEvent interface{}
|
||||||
namespace string
|
namespace string
|
||||||
context context.Context
|
context context.Context
|
||||||
|
|
||||||
|
platform platform
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
|
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
|
||||||
process, err := newInitProcess(ctx, s.path, s.namespace, r)
|
process, err := newInitProcess(ctx, s.platform, s.path, s.namespace, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errdefs.ToGRPC(err)
|
return nil, errdefs.ToGRPC(err)
|
||||||
}
|
}
|
||||||
|
87
linux/shim/service_linux.go
Normal file
87
linux/shim/service_linux.go
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
package shim
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/containerd/console"
|
||||||
|
"github.com/containerd/fifo"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type linuxPlatform struct {
|
||||||
|
epoller *console.Epoller
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *linuxPlatform) copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
|
||||||
|
if p.epoller == nil {
|
||||||
|
return nil, errors.New("uninitialized epoller")
|
||||||
|
}
|
||||||
|
|
||||||
|
epollConsole, err := p.epoller.Add(console)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if stdin != "" {
|
||||||
|
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cwg.Add(1)
|
||||||
|
go func() {
|
||||||
|
cwg.Done()
|
||||||
|
io.Copy(epollConsole, in)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
cwg.Add(1)
|
||||||
|
go func() {
|
||||||
|
cwg.Done()
|
||||||
|
io.Copy(outw, epollConsole)
|
||||||
|
epollConsole.Close()
|
||||||
|
outr.Close()
|
||||||
|
outw.Close()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
return epollConsole, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *linuxPlatform) shutdownConsole(ctx context.Context, cons console.Console) error {
|
||||||
|
if p.epoller == nil {
|
||||||
|
return errors.New("uninitialized epoller")
|
||||||
|
}
|
||||||
|
epollConsole, ok := cons.(*console.EpollConsole)
|
||||||
|
if !ok {
|
||||||
|
return errors.Errorf("expected EpollConsole, got %#v", cons)
|
||||||
|
}
|
||||||
|
return epollConsole.Shutdown(p.epoller.CloseConsole)
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize a single epoll fd to manage our consoles. `initPlatform` should
|
||||||
|
// only be called once.
|
||||||
|
func (s *Service) initPlatform() error {
|
||||||
|
if s.platform != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
epoller, err := console.NewEpoller()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "failed to initialize epoller")
|
||||||
|
}
|
||||||
|
s.platform = &linuxPlatform{
|
||||||
|
epoller: epoller,
|
||||||
|
}
|
||||||
|
go epoller.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
58
linux/shim/service_unix.go
Normal file
58
linux/shim/service_unix.go
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
// +build !windows,!linux
|
||||||
|
|
||||||
|
package shim
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/containerd/console"
|
||||||
|
"github.com/containerd/fifo"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type unixPlatform struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *unixPlatform) copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
|
||||||
|
if stdin != "" {
|
||||||
|
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cwg.Add(1)
|
||||||
|
go func() {
|
||||||
|
cwg.Done()
|
||||||
|
io.Copy(console, in)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
cwg.Add(1)
|
||||||
|
go func() {
|
||||||
|
cwg.Done()
|
||||||
|
io.Copy(outw, console)
|
||||||
|
console.Close()
|
||||||
|
outr.Close()
|
||||||
|
outw.Close()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
return console, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *unixPlatform) shutdownConsole(ctx context.Context, cons console.Console) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) initPlatform() error {
|
||||||
|
s.platform = &unixPlatform{}
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user