Move archive to pkg/archive

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2024-01-17 09:54:18 -08:00
parent fcd39ccc53
commit 8e14c39e80
39 changed files with 23 additions and 23 deletions

View File

@@ -0,0 +1,80 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compression
import (
"fmt"
"io"
"net/http"
"os/exec"
"testing"
"github.com/stretchr/testify/require"
)
const benchmarkTestDataURL = "https://git.io/fADcl"
func BenchmarkDecompression(b *testing.B) {
resp, err := http.Get(benchmarkTestDataURL)
require.NoError(b, err)
data, err := io.ReadAll(resp.Body)
require.NoError(b, err)
resp.Body.Close()
const mib = 1024 * 1024
sizes := []int{32, 64, 128, 256}
for _, sizeInMiB := range sizes {
size := sizeInMiB * mib
for len(data) < size {
data = append(data, data...)
}
data = data[0:size]
gz := testCompress(b, data, Gzip)
zstd := testCompress(b, data, Zstd)
b.Run(fmt.Sprintf("size=%dMiB", sizeInMiB), func(b *testing.B) {
original := gzipPath
defer func() {
gzipPath = original
}()
b.Run("zstd", func(b *testing.B) {
testDecompress(b, zstd)
})
gzipPath = ""
b.Run("gzipPureGo", func(b *testing.B) {
testDecompress(b, gz)
})
gzipPath, err = exec.LookPath("igzip")
if err == nil {
b.Run("igzip", func(b *testing.B) {
testDecompress(b, gz)
})
}
gzipPath, err = exec.LookPath("unpigz")
if err == nil {
b.Run("unpigz", func(b *testing.B) {
testDecompress(b, gz)
})
}
})
}
}

View File

@@ -0,0 +1,326 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compression
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"sync"
"github.com/containerd/log"
"github.com/klauspost/compress/zstd"
)
type (
// Compression is the state represents if compressed or not.
Compression int
)
const (
// Uncompressed represents the uncompressed.
Uncompressed Compression = iota
// Gzip is gzip compression algorithm.
Gzip
// Zstd is zstd compression algorithm.
Zstd
)
const (
disablePigzEnv = "CONTAINERD_DISABLE_PIGZ"
disableIgzipEnv = "CONTAINERD_DISABLE_IGZIP"
)
var (
initGzip sync.Once
gzipPath string
)
var (
bufioReader32KPool = &sync.Pool{
New: func() interface{} { return bufio.NewReaderSize(nil, 32*1024) },
}
)
// DecompressReadCloser include the stream after decompress and the compress method detected.
type DecompressReadCloser interface {
io.ReadCloser
// GetCompression returns the compress method which is used before decompressing
GetCompression() Compression
}
type readCloserWrapper struct {
io.Reader
compression Compression
closer func() error
}
func (r *readCloserWrapper) Close() error {
if r.closer != nil {
return r.closer()
}
return nil
}
func (r *readCloserWrapper) GetCompression() Compression {
return r.compression
}
type writeCloserWrapper struct {
io.Writer
closer func() error
}
func (w *writeCloserWrapper) Close() error {
if w.closer != nil {
w.closer()
}
return nil
}
type bufferedReader struct {
buf *bufio.Reader
}
func newBufferedReader(r io.Reader) *bufferedReader {
buf := bufioReader32KPool.Get().(*bufio.Reader)
buf.Reset(r)
return &bufferedReader{buf}
}
func (r *bufferedReader) Read(p []byte) (n int, err error) {
if r.buf == nil {
return 0, io.EOF
}
n, err = r.buf.Read(p)
if err == io.EOF {
r.buf.Reset(nil)
bufioReader32KPool.Put(r.buf)
r.buf = nil
}
return
}
func (r *bufferedReader) Peek(n int) ([]byte, error) {
if r.buf == nil {
return nil, io.EOF
}
return r.buf.Peek(n)
}
const (
zstdMagicSkippableStart = 0x184D2A50
zstdMagicSkippableMask = 0xFFFFFFF0
)
var (
gzipMagic = []byte{0x1F, 0x8B, 0x08}
zstdMagic = []byte{0x28, 0xb5, 0x2f, 0xfd}
)
type matcher = func([]byte) bool
func magicNumberMatcher(m []byte) matcher {
return func(source []byte) bool {
return bytes.HasPrefix(source, m)
}
}
// zstdMatcher detects zstd compression algorithm.
// There are two frame formats defined by Zstandard: Zstandard frames and Skippable frames.
// See https://datatracker.ietf.org/doc/html/rfc8878#section-3 for more details.
func zstdMatcher() matcher {
return func(source []byte) bool {
if bytes.HasPrefix(source, zstdMagic) {
// Zstandard frame
return true
}
// skippable frame
if len(source) < 8 {
return false
}
// magic number from 0x184D2A50 to 0x184D2A5F.
if binary.LittleEndian.Uint32(source[:4])&zstdMagicSkippableMask == zstdMagicSkippableStart {
return true
}
return false
}
}
// DetectCompression detects the compression algorithm of the source.
func DetectCompression(source []byte) Compression {
for compression, fn := range map[Compression]matcher{
Gzip: magicNumberMatcher(gzipMagic),
Zstd: zstdMatcher(),
} {
if fn(source) {
return compression
}
}
return Uncompressed
}
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
buf := newBufferedReader(archive)
bs, err := buf.Peek(10)
if err != nil && err != io.EOF {
// Note: we'll ignore any io.EOF error because there are some odd
// cases where the layer.tar file will be empty (zero bytes) and
// that results in an io.EOF from the Peek() call. So, in those
// cases we'll just treat it as a non-compressed stream and
// that means just create an empty layer.
// See Issue docker/docker#18170
return nil, err
}
switch compression := DetectCompression(bs); compression {
case Uncompressed:
return &readCloserWrapper{
Reader: buf,
compression: compression,
}, nil
case Gzip:
ctx, cancel := context.WithCancel(context.Background())
gzReader, err := gzipDecompress(ctx, buf)
if err != nil {
cancel()
return nil, err
}
return &readCloserWrapper{
Reader: gzReader,
compression: compression,
closer: func() error {
cancel()
return gzReader.Close()
},
}, nil
case Zstd:
zstdReader, err := zstd.NewReader(buf)
if err != nil {
return nil, err
}
return &readCloserWrapper{
Reader: zstdReader,
compression: compression,
closer: func() error {
zstdReader.Close()
return nil
},
}, nil
default:
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
}
}
// CompressStream compresses the dest with specified compression algorithm.
func CompressStream(dest io.Writer, compression Compression) (io.WriteCloser, error) {
switch compression {
case Uncompressed:
return &writeCloserWrapper{dest, nil}, nil
case Gzip:
return gzip.NewWriter(dest), nil
case Zstd:
return zstd.NewWriter(dest)
default:
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
}
}
// Extension returns the extension of a file that uses the specified compression algorithm.
func (compression *Compression) Extension() string {
switch *compression {
case Gzip:
return "gz"
case Zstd:
return "zst"
}
return ""
}
func gzipDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) {
initGzip.Do(func() {
if gzipPath = detectCommand("igzip", disableIgzipEnv); gzipPath != "" {
log.L.Debug("using igzip for decompression")
return
}
if gzipPath = detectCommand("unpigz", disablePigzEnv); gzipPath != "" {
log.L.Debug("using unpigz for decompression")
}
})
if gzipPath == "" {
return gzip.NewReader(buf)
}
return cmdStream(exec.CommandContext(ctx, gzipPath, "-d", "-c"), buf)
}
func cmdStream(cmd *exec.Cmd, in io.Reader) (io.ReadCloser, error) {
reader, writer := io.Pipe()
cmd.Stdin = in
cmd.Stdout = writer
var errBuf bytes.Buffer
cmd.Stderr = &errBuf
if err := cmd.Start(); err != nil {
return nil, err
}
go func() {
if err := cmd.Wait(); err != nil {
writer.CloseWithError(fmt.Errorf("%s: %s", err, errBuf.String()))
} else {
writer.Close()
}
}()
return reader, nil
}
func detectCommand(path, disableEnvName string) string {
// Check if this command is disabled via the env variable
value := os.Getenv(disableEnvName)
if value != "" {
disable, err := strconv.ParseBool(value)
if err != nil {
log.L.WithError(err).Warnf("could not parse %s: %s", disableEnvName, value)
}
if disable {
return ""
}
}
path, err := exec.LookPath(path)
if err != nil {
log.L.WithError(err).Debugf("%s not found", path)
return ""
}
return path
}

View File

@@ -0,0 +1,28 @@
//go:build gofuzz
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compression
import (
"bytes"
)
func FuzzDecompressStream(data []byte) int {
_, _ = DecompressStream(bytes.NewReader(data))
return 1
}

View File

@@ -0,0 +1,228 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compression
import (
"bytes"
"compress/gzip"
"context"
"crypto/rand"
"io"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"testing"
)
func TestMain(m *testing.M) {
// Force initPigz to be called, so tests start with the same initial state
gzipDecompress(context.Background(), strings.NewReader(""))
os.Exit(m.Run())
}
// generateData generates data that composed of 2 random parts
// and single zero-filled part within them.
// Typically, the compression ratio would be about 67%.
func generateData(t testing.TB, size int) []byte {
part0 := size / 3 // random
part2 := size / 3 // random
part1 := size - part0 - part2 // zero-filled
part0Data := make([]byte, part0)
if _, err := rand.Read(part0Data); err != nil {
t.Fatal(err)
}
part1Data := make([]byte, part1)
part2Data := make([]byte, part2)
if _, err := rand.Read(part2Data); err != nil {
t.Fatal(err)
}
return append(part0Data, append(part1Data, part2Data...)...)
}
func testCompress(t testing.TB, orig []byte, compression Compression) []byte {
size := len(orig)
var b bytes.Buffer
compressor, err := CompressStream(&b, compression)
if err != nil {
t.Fatal(err)
}
if n, err := compressor.Write(orig); err != nil || n != size {
t.Fatal(err)
}
compressor.Close()
return b.Bytes()
}
func testDecompress(t testing.TB, compressed []byte) ([]byte, DecompressReadCloser) {
decompressor, err := DecompressStream(bytes.NewReader(compressed))
if err != nil {
t.Fatal(err)
}
decompressed, err := io.ReadAll(decompressor)
if err != nil {
t.Fatal(err)
}
return decompressed, decompressor
}
func testCompressDecompress(t testing.TB, size int, compression Compression) DecompressReadCloser {
orig := generateData(t, size)
compressed := testCompress(t, orig, compression)
t.Logf("compressed %d bytes to %d bytes (%.2f%%)",
len(orig), len(compressed), 100.0*float32(len(compressed))/float32(len(orig)))
if compared := bytes.Compare(orig, compressed); (compression == Uncompressed && compared != 0) ||
(compression != Uncompressed && compared == 0) {
t.Fatal("strange compressed data")
}
decompressed, decompressor := testDecompress(t, compressed)
if !bytes.Equal(orig, decompressed) {
t.Fatal("strange decompressed data")
}
return decompressor
}
func TestCompressDecompressGzip(t *testing.T) {
oldUnpigzPath := gzipPath
gzipPath = ""
defer func() { gzipPath = oldUnpigzPath }()
decompressor := testCompressDecompress(t, 1024*1024, Gzip)
wrapper := decompressor.(*readCloserWrapper)
_, ok := wrapper.Reader.(*gzip.Reader)
if !ok {
t.Fatalf("unexpected compressor type: %T", wrapper.Reader)
}
}
func TestCompressDecompressPigz(t *testing.T) {
if _, err := exec.LookPath("unpigz"); err != nil {
t.Skip("pigz not installed")
}
decompressor := testCompressDecompress(t, 1024*1024, Gzip)
wrapper := decompressor.(*readCloserWrapper)
_, ok := wrapper.Reader.(*io.PipeReader)
if !ok {
t.Fatalf("unexpected compressor type: %T", wrapper.Reader)
}
}
func TestCompressDecompressUncompressed(t *testing.T) {
testCompressDecompress(t, 1024*1024, Uncompressed)
}
func TestDetectPigz(t *testing.T) {
// Create fake PATH with unpigz executable, make sure detectPigz can find it
tempPath := t.TempDir()
filename := "unpigz"
if runtime.GOOS == "windows" {
filename = "unpigz.exe"
}
fullPath := filepath.Join(tempPath, filename)
if err := os.WriteFile(fullPath, []byte(""), 0111); err != nil {
t.Fatal(err)
}
t.Setenv("PATH", tempPath)
if pigzPath := detectCommand("unpigz", disablePigzEnv); pigzPath == "" {
t.Fatal("failed to detect pigz path")
} else if pigzPath != fullPath {
t.Fatalf("wrong pigz found: %s != %s", pigzPath, fullPath)
}
t.Setenv(disablePigzEnv, "1")
if pigzPath := detectCommand("unpigz", disablePigzEnv); pigzPath != "" {
t.Fatalf("disable via %s doesn't work", disablePigzEnv)
}
}
func TestCmdStream(t *testing.T) {
out, err := cmdStream(exec.Command("sh", "-c", "echo hello; exit 0"), nil)
if err != nil {
t.Fatal(err)
}
buf, err := io.ReadAll(out)
if err != nil {
t.Fatalf("failed to read from stdout: %s", err)
}
if string(buf) != "hello\n" {
t.Fatalf("unexpected command output ('%s' != '%s')", string(buf), "hello\n")
}
}
func TestCmdStreamBad(t *testing.T) {
out, err := cmdStream(exec.Command("sh", "-c", "echo hello; echo >&2 bad result; exit 1"), nil)
if err != nil {
t.Fatalf("failed to start command: %v", err)
}
if buf, err := io.ReadAll(out); err == nil {
t.Fatal("command should have failed")
} else if err.Error() != "exit status 1: bad result\n" {
t.Fatalf("wrong error: %s", err.Error())
} else if string(buf) != "hello\n" {
t.Fatalf("wrong output: %s", string(buf))
}
}
func TestDetectCompressionZstd(t *testing.T) {
for _, tc := range []struct {
source []byte
expected Compression
}{
{
// test zstd compression without skippable frames.
source: []byte{
0x28, 0xb5, 0x2f, 0xfd, // magic number of Zstandard frame: 0xFD2FB528
0x04, 0x00, 0x31, 0x00, 0x00, // frame header
0x64, 0x6f, 0x63, 0x6b, 0x65, 0x72, // data block "docker"
0x16, 0x0e, 0x21, 0xc3, // content checksum
},
expected: Zstd,
},
{
// test zstd compression with skippable frames.
source: []byte{
0x50, 0x2a, 0x4d, 0x18, // magic number of skippable frame: 0x184D2A50 to 0x184D2A5F
0x04, 0x00, 0x00, 0x00, // frame size
0x5d, 0x00, 0x00, 0x00, // user data
0x28, 0xb5, 0x2f, 0xfd, // magic number of Zstandard frame: 0xFD2FB528
0x04, 0x00, 0x31, 0x00, 0x00, // frame header
0x64, 0x6f, 0x63, 0x6b, 0x65, 0x72, // data block "docker"
0x16, 0x0e, 0x21, 0xc3, // content checksum
},
expected: Zstd,
},
} {
compression := DetectCompression(tc.source)
if compression != tc.expected {
t.Fatalf("Unexpected compression %v, expected %v", compression, tc.expected)
}
}
}