From cd72819b53ff30976570d0a8b80e1a4cf96f2095 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 19 Jan 2018 17:28:01 -0800 Subject: [PATCH] archive, cio, cmd, linux: use buffer pools To avoid buffer bloat in long running processes, we try to use buffer pools where possible. This is meant to address shim memory usage issues, but may not be the root cause. Signed-off-by: Stephen J Day --- archive/tar.go | 10 ++++------ archive/tar_windows.go | 8 ++++++-- cio/io.go | 7 +++++++ cio/io_unix.go | 15 ++++++++++++--- cio/io_windows.go | 18 +++++++++++++++--- cmd/containerd/main.go | 6 ++---- cmd/containerd/main_unix.go | 5 ++--- cmd/ctr/commands/shim/io_unix.go | 11 ++++++++++- linux/bundle.go | 10 ++-------- linux/proc/io.go | 21 ++++++++++++++++++--- linux/proc/utils.go | 5 ++++- linux/shim/service.go | 10 +++++++++- linux/shim/service_linux.go | 8 ++++++-- linux/shim/service_unix.go | 10 ++++++++-- 14 files changed, 105 insertions(+), 39 deletions(-) diff --git a/archive/tar.go b/archive/tar.go index 0e32d7105..4cef2c0dd 100644 --- a/archive/tar.go +++ b/archive/tar.go @@ -19,7 +19,7 @@ import ( "github.com/pkg/errors" ) -var bufferPool = &sync.Pool{ +var bufPool = &sync.Pool{ New: func() interface{} { buffer := make([]byte, 32*1024) return &buffer @@ -527,9 +527,7 @@ func (cw *changeWriter) HandleChange(k fs.ChangeKind, p string, f os.FileInfo, e } defer file.Close() - buf := bufferPool.Get().(*[]byte) - n, err := io.CopyBuffer(cw.tw, file, *buf) - bufferPool.Put(buf) + n, err := copyBuffered(context.TODO(), cw.tw, file) if err != nil { return errors.Wrap(err, "failed to copy") } @@ -589,8 +587,8 @@ func (cw *changeWriter) includeParents(hdr *tar.Header) error { } func copyBuffered(ctx context.Context, dst io.Writer, src io.Reader) (written int64, err error) { - buf := bufferPool.Get().(*[]byte) - defer bufferPool.Put(buf) + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) for { select { diff --git a/archive/tar_windows.go b/archive/tar_windows.go index ea2403e4f..333fa3ec9 100644 --- a/archive/tar_windows.go +++ b/archive/tar_windows.go @@ -386,6 +386,10 @@ func writeBackupStreamFromTarFile(w io.Writer, t *tar.Reader, hdr *tar.Header) ( return nil, err } } + + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + if hdr.Typeflag == tar.TypeReg || hdr.Typeflag == tar.TypeRegA { bhdr := winio.BackupHeader{ Id: winio.BackupData, @@ -395,7 +399,7 @@ func writeBackupStreamFromTarFile(w io.Writer, t *tar.Reader, hdr *tar.Header) ( if err != nil { return nil, err } - _, err = io.Copy(bw, t) + _, err = io.CopyBuffer(bw, t, *buf) if err != nil { return nil, err } @@ -418,7 +422,7 @@ func writeBackupStreamFromTarFile(w io.Writer, t *tar.Reader, hdr *tar.Header) ( if err != nil { return nil, err } - _, err = io.Copy(bw, t) + _, err = io.CopyBuffer(bw, t, *buf) if err != nil { return nil, err } diff --git a/cio/io.go b/cio/io.go index 1b4a4dc25..d2ab06857 100644 --- a/cio/io.go +++ b/cio/io.go @@ -8,6 +8,13 @@ import ( "sync" ) +var bufPool = sync.Pool{ + New: func() interface{} { + buffer := make([]byte, 32<<10) + return &buffer + }, +} + // Config holds the IO configurations. type Config struct { // Terminal is true if one has been allocated diff --git a/cio/io_unix.go b/cio/io_unix.go index 005fb0ce9..1eadcd50a 100644 --- a/cio/io_unix.go +++ b/cio/io_unix.go @@ -47,7 +47,10 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) { if fifos.Stdin != "" { go func() { - io.Copy(pipes.Stdin, ioset.Stdin) + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + + io.CopyBuffer(pipes.Stdin, ioset.Stdin, *p) pipes.Stdin.Close() }() } @@ -55,7 +58,10 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) { var wg = &sync.WaitGroup{} wg.Add(1) go func() { - io.Copy(ioset.Stdout, pipes.Stdout) + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + + io.CopyBuffer(ioset.Stdout, pipes.Stdout, *p) pipes.Stdout.Close() wg.Done() }() @@ -63,7 +69,10 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) { if !fifos.Terminal { wg.Add(1) go func() { - io.Copy(ioset.Stderr, pipes.Stderr) + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + + io.CopyBuffer(ioset.Stderr, pipes.Stderr, *p) pipes.Stderr.Close() wg.Done() }() diff --git a/cio/io_windows.go b/cio/io_windows.go index 017c9a11f..4c1eefac6 100644 --- a/cio/io_windows.go +++ b/cio/io_windows.go @@ -47,7 +47,11 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) { log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.Stdin) return } - io.Copy(c, ioset.Stdin) + + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + + io.CopyBuffer(c, ioset.Stdin, *p) c.Close() l.Close() }() @@ -73,7 +77,11 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) { log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Stdout) return } - io.Copy(ioset.Stdout, c) + + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + + io.CopyBuffer(ioset.Stdout, c, *p) c.Close() l.Close() }() @@ -99,7 +107,11 @@ func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) { log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Stderr) return } - io.Copy(ioset.Stderr, c) + + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + + io.CopyBuffer(ioset.Stderr, c, *p) c.Close() l.Close() }() diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index d9836be9d..32f8cc249 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -10,10 +10,6 @@ import ( "os/signal" "time" - "google.golang.org/grpc/grpclog" - - gocontext "golang.org/x/net/context" - "github.com/containerd/containerd/log" "github.com/containerd/containerd/server" "github.com/containerd/containerd/sys" @@ -21,6 +17,8 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" + gocontext "golang.org/x/net/context" + "google.golang.org/grpc/grpclog" ) const usage = ` diff --git a/cmd/containerd/main_unix.go b/cmd/containerd/main_unix.go index 461eb722c..aacd1453f 100644 --- a/cmd/containerd/main_unix.go +++ b/cmd/containerd/main_unix.go @@ -7,11 +7,10 @@ import ( "os" "runtime" - "github.com/sirupsen/logrus" - "golang.org/x/sys/unix" - "github.com/containerd/containerd/log" "github.com/containerd/containerd/server" + "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" ) const defaultConfigPath = "/etc/containerd/config.toml" diff --git a/cmd/ctr/commands/shim/io_unix.go b/cmd/ctr/commands/shim/io_unix.go index f328f006b..29aeec63c 100644 --- a/cmd/ctr/commands/shim/io_unix.go +++ b/cmd/ctr/commands/shim/io_unix.go @@ -12,6 +12,13 @@ import ( "golang.org/x/sys/unix" ) +var bufPool = sync.Pool{ + New: func() interface{} { + buffer := make([]byte, 32<<10) + return &buffer + }, +} + func prepareStdio(stdin, stdout, stderr string, console bool) (wg *sync.WaitGroup, err error) { wg = &sync.WaitGroup{} ctx := gocontext.Background() @@ -26,7 +33,9 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (wg *sync.WaitGrou } }(f) go func(w io.WriteCloser) { - io.Copy(w, os.Stdin) + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(w, os.Stdin, *p) w.Close() }(f) diff --git a/linux/bundle.go b/linux/bundle.go index 629d7f5bf..fbed32224 100644 --- a/linux/bundle.go +++ b/linux/bundle.go @@ -3,9 +3,8 @@ package linux import ( - "bytes" "context" - "io" + "io/ioutil" "os" "path/filepath" @@ -52,12 +51,7 @@ func newBundle(id, path, workDir string, spec []byte) (b *bundle, err error) { if err := os.Mkdir(filepath.Join(path, "rootfs"), 0711); err != nil { return nil, err } - f, err := os.Create(filepath.Join(path, configFilename)) - if err != nil { - return nil, err - } - defer f.Close() - _, err = io.Copy(f, bytes.NewReader(spec)) + err = ioutil.WriteFile(filepath.Join(path, configFilename), spec, 0666) return &bundle{ id: id, path: path, diff --git a/linux/proc/io.go b/linux/proc/io.go index e78b38300..c839fbd7c 100644 --- a/linux/proc/io.go +++ b/linux/proc/io.go @@ -13,6 +13,13 @@ import ( runc "github.com/containerd/go-runc" ) +var bufPool = sync.Pool{ + New: func() interface{} { + buffer := make([]byte, 32<<10) + return &buffer + }, +} + func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error { for name, dest := range map[string]func(wc io.WriteCloser, rc io.Closer){ stdout: func(wc io.WriteCloser, rc io.Closer) { @@ -20,7 +27,9 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w cwg.Add(1) go func() { cwg.Done() - io.Copy(wc, rio.Stdout()) + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(wc, rio.Stdout(), *p) wg.Done() wc.Close() rc.Close() @@ -31,7 +40,10 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w cwg.Add(1) go func() { cwg.Done() - io.Copy(wc, rio.Stderr()) + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + + io.CopyBuffer(wc, rio.Stderr(), *p) wg.Done() wc.Close() rc.Close() @@ -59,7 +71,10 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w cwg.Add(1) go func() { cwg.Done() - io.Copy(rio.Stdin(), f) + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + + io.CopyBuffer(rio.Stdin(), f, *p) rio.Stdin().Close() f.Close() }() diff --git a/linux/proc/utils.go b/linux/proc/utils.go index 1197957b5..95cfd2338 100644 --- a/linux/proc/utils.go +++ b/linux/proc/utils.go @@ -66,7 +66,10 @@ func copyFile(to, from string) error { return err } defer tt.Close() - _, err = io.Copy(tt, ff) + + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + _, err = io.CopyBuffer(tt, ff, *p) return err } diff --git a/linux/shim/service.go b/linux/shim/service.go index 96fc0ec4a..a8c0b7b1f 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -29,7 +29,15 @@ import ( "google.golang.org/grpc/status" ) -var empty = &ptypes.Empty{} +var ( + empty = &ptypes.Empty{} + bufPool = sync.Pool{ + New: func() interface{} { + buffer := make([]byte, 32<<10) + return &buffer + }, + } +) // Config contains shim specific configuration type Config struct { diff --git a/linux/shim/service_linux.go b/linux/shim/service_linux.go index bbe9d188a..de9f3e5fd 100644 --- a/linux/shim/service_linux.go +++ b/linux/shim/service_linux.go @@ -33,7 +33,9 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console cwg.Add(1) go func() { cwg.Done() - io.Copy(epollConsole, in) + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(epollConsole, in, *p) }() } @@ -49,7 +51,9 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console cwg.Add(1) go func() { cwg.Done() - io.Copy(outw, epollConsole) + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(outw, epollConsole, *p) epollConsole.Close() outr.Close() outw.Close() diff --git a/linux/shim/service_unix.go b/linux/shim/service_unix.go index d4419e56a..9c97aeb57 100644 --- a/linux/shim/service_unix.go +++ b/linux/shim/service_unix.go @@ -24,7 +24,10 @@ func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, cwg.Add(1) go func() { cwg.Done() - io.Copy(console, in) + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + + io.CopyBuffer(console, in, *p) }() } outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0) @@ -39,7 +42,10 @@ func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, cwg.Add(1) go func() { cwg.Done() - io.Copy(outw, console) + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + + io.CopyBuffer(outw, console, *p) console.Close() outr.Close() outw.Close()