Improve layer decompression speed by using pigz
Signed-off-by: Maksym Pavlenko <makpav@amazon.com>
This commit is contained in:
parent
d09a1c6a95
commit
4d7d63f390
@ -20,9 +20,15 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
@ -37,6 +43,16 @@ const (
|
|||||||
Gzip
|
Gzip
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const disablePigzEnv = "CONTAINERD_DISABLE_PIGZ"
|
||||||
|
|
||||||
|
var unpigzPath string
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
if unpigzPath = detectPigz(); unpigzPath != "" {
|
||||||
|
log.L.Debug("using pigz for decompression")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
bufioReader32KPool = &sync.Pool{
|
bufioReader32KPool = &sync.Pool{
|
||||||
New: func() interface{} { return bufio.NewReaderSize(nil, 32*1024) },
|
New: func() interface{} { return bufio.NewReaderSize(nil, 32*1024) },
|
||||||
@ -120,11 +136,18 @@ func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
|
|||||||
readBufWrapper := &readCloserWrapper{buf, compression, closer}
|
readBufWrapper := &readCloserWrapper{buf, compression, closer}
|
||||||
return readBufWrapper, nil
|
return readBufWrapper, nil
|
||||||
case Gzip:
|
case Gzip:
|
||||||
gzReader, err := gzip.NewReader(buf)
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
gzReader, err := gzipDecompress(ctx, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cancel()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
readBufWrapper := &readCloserWrapper{gzReader, compression, closer}
|
|
||||||
|
readBufWrapper := &readCloserWrapper{gzReader, compression, func() error {
|
||||||
|
cancel()
|
||||||
|
return closer()
|
||||||
|
}}
|
||||||
|
|
||||||
return readBufWrapper, nil
|
return readBufWrapper, nil
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
|
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
|
||||||
@ -151,3 +174,61 @@ func (compression *Compression) Extension() string {
|
|||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func gzipDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) {
|
||||||
|
if unpigzPath == "" {
|
||||||
|
return gzip.NewReader(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func cmdStream(cmd *exec.Cmd, in io.Reader) (io.ReadCloser, error) {
|
||||||
|
reader, writer := io.Pipe()
|
||||||
|
|
||||||
|
cmd.Stdin = in
|
||||||
|
cmd.Stdout = writer
|
||||||
|
|
||||||
|
var errBuf bytes.Buffer
|
||||||
|
cmd.Stderr = &errBuf
|
||||||
|
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := cmd.Wait(); err != nil {
|
||||||
|
writer.CloseWithError(fmt.Errorf("%s: %s", err, errBuf.String()))
|
||||||
|
} else {
|
||||||
|
writer.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return reader, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func detectPigz() string {
|
||||||
|
path, err := exec.LookPath("unpigz")
|
||||||
|
if err != nil {
|
||||||
|
log.L.WithError(err).Debug("unpigz not found, falling back to go gzip")
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if pigz disabled via CONTAINERD_DISABLE_PIGZ env variable
|
||||||
|
value := os.Getenv(disablePigzEnv)
|
||||||
|
if value == "" {
|
||||||
|
return path
|
||||||
|
}
|
||||||
|
|
||||||
|
disable, err := strconv.ParseBool(value)
|
||||||
|
if err != nil {
|
||||||
|
log.L.WithError(err).Warnf("could not parse %s: %s", disablePigzEnv, value)
|
||||||
|
return path
|
||||||
|
}
|
||||||
|
|
||||||
|
if disable {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return path
|
||||||
|
}
|
||||||
|
@ -18,8 +18,15 @@ package compression
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
|
"crypto/md5"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -42,7 +49,7 @@ func generateData(t *testing.T, size int) []byte {
|
|||||||
return append(part0Data, append(part1Data, part2Data...)...)
|
return append(part0Data, append(part1Data, part2Data...)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCompressDecompress(t *testing.T, size int, compression Compression) {
|
func testCompressDecompress(t *testing.T, size int, compression Compression) DecompressReadCloser {
|
||||||
orig := generateData(t, size)
|
orig := generateData(t, size)
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
compressor, err := CompressStream(&b, compression)
|
compressor, err := CompressStream(&b, compression)
|
||||||
@ -72,12 +79,173 @@ func testCompressDecompress(t *testing.T, size int, compression Compression) {
|
|||||||
if !bytes.Equal(orig, decompressed) {
|
if !bytes.Equal(orig, decompressed) {
|
||||||
t.Fatal("strange decompressed data")
|
t.Fatal("strange decompressed data")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return decompressor
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCompressDecompressGzip(t *testing.T) {
|
func TestCompressDecompressGzip(t *testing.T) {
|
||||||
testCompressDecompress(t, 1024*1024, Gzip)
|
oldUnpigzPath := unpigzPath
|
||||||
|
unpigzPath = ""
|
||||||
|
defer func() { unpigzPath = oldUnpigzPath }()
|
||||||
|
|
||||||
|
decompressor := testCompressDecompress(t, 1024*1024, Gzip)
|
||||||
|
wrapper := decompressor.(*readCloserWrapper)
|
||||||
|
_, ok := wrapper.Reader.(*gzip.Reader)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("unexpected compressor type: %T", wrapper.Reader)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompressDecompressPigz(t *testing.T) {
|
||||||
|
if _, err := exec.LookPath("unpigz"); err != nil {
|
||||||
|
t.Skip("pigz not installed")
|
||||||
|
}
|
||||||
|
|
||||||
|
decompressor := testCompressDecompress(t, 1024*1024, Gzip)
|
||||||
|
wrapper := decompressor.(*readCloserWrapper)
|
||||||
|
_, ok := wrapper.Reader.(*io.PipeReader)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("unexpected compressor type: %T", wrapper.Reader)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCompressDecompressUncompressed(t *testing.T) {
|
func TestCompressDecompressUncompressed(t *testing.T) {
|
||||||
testCompressDecompress(t, 1024*1024, Uncompressed)
|
testCompressDecompress(t, 1024*1024, Uncompressed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDetectPigz(t *testing.T) {
|
||||||
|
// Create fake PATH with unpigz executable, make sure detectPigz can find it
|
||||||
|
tempPath, err := ioutil.TempDir("", "containerd_temp_")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
filename := "unpigz"
|
||||||
|
if runtime.GOOS == "windows" {
|
||||||
|
filename = "unpigz.exe"
|
||||||
|
}
|
||||||
|
|
||||||
|
fullPath := filepath.Join(tempPath, filename)
|
||||||
|
|
||||||
|
if err := ioutil.WriteFile(fullPath, []byte(""), 0111); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer os.RemoveAll(tempPath)
|
||||||
|
|
||||||
|
oldPath := os.Getenv("PATH")
|
||||||
|
os.Setenv("PATH", tempPath)
|
||||||
|
defer os.Setenv("PATH", oldPath)
|
||||||
|
|
||||||
|
if pigzPath := detectPigz(); pigzPath == "" {
|
||||||
|
t.Fatal("failed to detect pigz path")
|
||||||
|
} else if pigzPath != fullPath {
|
||||||
|
t.Fatalf("wrong pigz found: %s != %s", pigzPath, fullPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
os.Setenv(disablePigzEnv, "1")
|
||||||
|
defer os.Unsetenv(disablePigzEnv)
|
||||||
|
|
||||||
|
if pigzPath := detectPigz(); pigzPath != "" {
|
||||||
|
t.Fatalf("disable via %s doesn't work", disablePigzEnv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCmdStream(t *testing.T) {
|
||||||
|
out, err := cmdStream(exec.Command("sh", "-c", "echo hello; exit 0"), nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
buf, err := ioutil.ReadAll(out)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to read from stdout: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(buf) != "hello\n" {
|
||||||
|
t.Fatalf("unexpected command output ('%s' != '%s')", string(buf), "hello\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCmdStreamBad(t *testing.T) {
|
||||||
|
out, err := cmdStream(exec.Command("sh", "-c", "echo hello; echo >&2 bad result; exit 1"), nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to start command: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if buf, err := ioutil.ReadAll(out); err == nil {
|
||||||
|
t.Fatal("command should have failed")
|
||||||
|
} else if err.Error() != "exit status 1: bad result\n" {
|
||||||
|
t.Fatalf("wrong error: %s", err.Error())
|
||||||
|
} else if string(buf) != "hello\n" {
|
||||||
|
t.Fatalf("wrong output: %s", string(buf))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateCompressedData(b *testing.B, sizeInMb int) []byte {
|
||||||
|
sizeInBytes := sizeInMb * 1024 * 1024
|
||||||
|
data, _ := ioutil.ReadFile("testdata/test.json")
|
||||||
|
|
||||||
|
for len(data) < sizeInBytes {
|
||||||
|
data = append(data, data...)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.SetBytes(int64(len(data)))
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
compressor, err := CompressStream(&buf, Gzip)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n, err := compressor.Write(data); err != nil || n != len(data) {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
compressor.Close()
|
||||||
|
return buf.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func benchmarkDecompression(sizeInMb int) func(*testing.B) {
|
||||||
|
buf := make([]byte, 32*1024)
|
||||||
|
return func(b *testing.B) {
|
||||||
|
compressed := generateCompressedData(b, sizeInMb)
|
||||||
|
hash := md5.New()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
decompressor, err := DecompressStream(bytes.NewReader(compressed))
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = io.CopyBuffer(hash, decompressor, buf); err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
decompressor.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkGzipDecompression(b *testing.B) {
|
||||||
|
oldUnpigzPath := unpigzPath
|
||||||
|
unpigzPath = ""
|
||||||
|
defer func() { unpigzPath = oldUnpigzPath }()
|
||||||
|
|
||||||
|
b.Run("gzip-32mb", benchmarkDecompression(32))
|
||||||
|
b.Run("gzip-64mb", benchmarkDecompression(64))
|
||||||
|
b.Run("gzip-128mb", benchmarkDecompression(128))
|
||||||
|
b.Run("gzip-256mb", benchmarkDecompression(256))
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkPigzDecompression(b *testing.B) {
|
||||||
|
if _, err := exec.LookPath("unpigz"); err != nil {
|
||||||
|
b.Skip("pigz not installed")
|
||||||
|
}
|
||||||
|
|
||||||
|
b.Run("pigz-32mb", benchmarkDecompression(32))
|
||||||
|
b.Run("pigz-64mb", benchmarkDecompression(64))
|
||||||
|
b.Run("pigz-128mb", benchmarkDecompression(128))
|
||||||
|
b.Run("pigz-256mb", benchmarkDecompression(256))
|
||||||
|
}
|
||||||
|
5902
archive/compression/testdata/test.json
vendored
Normal file
5902
archive/compression/testdata/test.json
vendored
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user