Use checksums instead of fsyncs to manage discovery cache corruption

Part of the API discovery cache uses an HTTP RoundTripper that
transparently caches responses to disk. The upstream implementation of
the disk cache is hard coded to call Sync() on every file it writes.
This has noticably poor performance on modern Macs, which ask their disk
controllers to flush all the way to persistant storage because Go uses
the `F_FULLFSYNC` fnctl. Apple recommends minimizing this behaviour in
order to avoid degrading performance and increasing disk wear.

The content of the discovery cache is not critical; it is indeed just a
cache and can be recreated by hitting the API servers' discovery
endpoints. This commit replaces upstream httpcache's diskcache
implementation with a similar implementation that can use CRC-32
checksums to detect corrupted cache entries at read-time. When such an
entry is detected (e.g. because it was only partially flushed to
permanent storage before the host lost power) the cache will report a
miss. This causes httpcache to fall back to its underlying HTTP
transport (i.e. the real API server) and re-cache the resulting value.

Apart from adding CRC-32 checksums and avoiding calling fsync this
implementation differs from upstream httpcache's diskcache package in
that it uses FNV-32a hashes rather than MD5 hashes of cache keys in
order to generate filenames.

Signed-off-by: Nic Cope <nicc@rk0n.org>
This commit is contained in:
Nic Cope 2022-06-28 19:15:49 -07:00
parent eace469065
commit 7a2c6a432f
4 changed files with 203 additions and 69 deletions

View File

@ -17,12 +17,15 @@ limitations under the License.
package disk
import (
"encoding/binary"
"fmt"
"hash/crc32"
"hash/fnv"
"net/http"
"os"
"path/filepath"
"github.com/gregjones/httpcache"
"github.com/gregjones/httpcache/diskcache"
"github.com/peterbourgon/diskv"
"k8s.io/klog/v2"
)
@ -41,7 +44,7 @@ func newCacheRoundTripper(cacheDir string, rt http.RoundTripper) http.RoundTripp
BasePath: cacheDir,
TempDir: filepath.Join(cacheDir, ".diskv-temp"),
})
t := httpcache.NewTransport(diskcache.NewWithDiskv(d))
t := httpcache.NewTransport(&crcDiskCache{disk: d})
t.Transport = rt
return &cacheRoundTripper{rt: t}
@ -63,3 +66,54 @@ func (rt *cacheRoundTripper) CancelRequest(req *http.Request) {
}
func (rt *cacheRoundTripper) WrappedRoundTripper() http.RoundTripper { return rt.rt.Transport }
// A crcDiskCache is a cache backend for github.com/gregjones/httpcache. It is
// similar to httpcache's diskcache package, but uses checksums to ensure cache
// integrity rather than fsyncing each cache entry in order to avoid performance
// degradation on MacOS.
//
// See https://github.com/kubernetes/kubernetes/issues/110753 for more.
type crcDiskCache struct {
disk *diskv.Diskv
}
// Get the requested key from the cache on disk. If Get encounters an error, or
// the returned value is not a CRC-32 checksum followed by bytes with a matching
// checksum it will return false to indicate a cache miss.
func (c *crcDiskCache) Get(key string) ([]byte, bool) {
b, err := c.disk.Read(sanitize(key))
if err != nil || len(b) < binary.MaxVarintLen32 {
return []byte{}, false
}
response := b[binary.MaxVarintLen32:]
sum, _ := binary.Uvarint(b[:binary.MaxVarintLen32])
if crc32.ChecksumIEEE(response) != uint32(sum) {
return []byte{}, false
}
return response, true
}
// Set writes the response to a file on disk. The filename will be the FNV-32a
// hash of the key. The file will contain the CRC-32 checksum of the response
// bytes, followed by said response bytes.
func (c *crcDiskCache) Set(key string, response []byte) {
sum := make([]byte, binary.MaxVarintLen32)
_ = binary.PutUvarint(sum, uint64(crc32.ChecksumIEEE(response)))
_ = c.disk.Write(sanitize(key), append(sum, response...)) // Nothing we can do with this error.
}
func (c *crcDiskCache) Delete(key string) {
_ = c.disk.Erase(sanitize(key)) // Nothing we can do with this error.
}
// Sanitize an httpcache key such that it can be used as a diskv key, which must
// be a valid filename. The httpcache key will either be the requested URL (if
// the request method was GET) or "<method> <url>" for other methods, per the
// httpcache.cacheKey function.
func sanitize(key string) string {
h := fnv.New32a()
_, _ = h.Write([]byte(key)) // Writing to a hash never returns an error.
return fmt.Sprintf("%X", h.Sum32())
}

View File

@ -18,6 +18,8 @@ package disk
import (
"bytes"
"encoding/binary"
"hash/crc32"
"io/ioutil"
"net/http"
"net/url"
@ -25,7 +27,6 @@ import (
"path/filepath"
"testing"
"github.com/gregjones/httpcache/diskcache"
"github.com/peterbourgon/diskv"
"github.com/stretchr/testify/assert"
)
@ -42,9 +43,6 @@ func (rt *testRoundTripper) RoundTrip(req *http.Request) (*http.Response, error)
return rt.Response, rt.Err
}
// NOTE(negz): We're adding a benchmark for an external dependency in order to
// prove that one that will be added in a subsequent commit improves write
// performance.
func BenchmarkDiskCache(b *testing.B) {
cacheDir, err := ioutil.TempDir("", "cache-rt")
if err != nil {
@ -65,7 +63,7 @@ func BenchmarkDiskCache(b *testing.B) {
b.Fatal(err)
}
c := diskcache.NewWithDiskv(d)
c := crcDiskCache{disk: d}
for n := 0; n < b.N; n++ {
c.Set(k, v)
@ -179,3 +177,147 @@ func TestCacheRoundTripperPathPerm(t *testing.T) {
})
assert.NoError(err)
}
func TestCRCDiskCache(t *testing.T) {
assert := assert.New(t)
// Ensure that we'll return a cache miss if the backing file doesn't exist.
t.Run("NoSuchKey", func(t *testing.T) {
cacheDir, err := ioutil.TempDir("", "cache-crc")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(cacheDir)
d := diskv.New(diskv.Options{BasePath: cacheDir, TempDir: filepath.Join(cacheDir, ".diskv-temp")})
crc := &crcDiskCache{disk: d}
key := "testing"
got, ok := crc.Get(key)
assert.False(ok)
assert.Equal([]byte{}, got)
})
// Ensure that we'll return a cache miss if the backing file is empty.
t.Run("EmptyFile", func(t *testing.T) {
cacheDir, err := ioutil.TempDir("", "cache-crc")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(cacheDir)
d := diskv.New(diskv.Options{BasePath: cacheDir, TempDir: filepath.Join(cacheDir, ".diskv-temp")})
crc := &crcDiskCache{disk: d}
key := "testing"
f, err := os.Create(filepath.Join(cacheDir, sanitize(key)))
if err != nil {
t.Fatal(err)
}
f.Close()
got, ok := crc.Get(key)
assert.False(ok)
assert.Equal([]byte{}, got)
})
// Ensure that we'll return a cache miss if the backing has an invalid
// checksum.
t.Run("InvalidChecksum", func(t *testing.T) {
cacheDir, err := ioutil.TempDir("", "cache-crc")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(cacheDir)
d := diskv.New(diskv.Options{BasePath: cacheDir, TempDir: filepath.Join(cacheDir, ".diskv-temp")})
crc := &crcDiskCache{disk: d}
key := "testing"
value := []byte("testing")
mismatchedValue := []byte("testink")
sum := make([]byte, binary.MaxVarintLen32)
binary.PutUvarint(sum, uint64(crc32.ChecksumIEEE(value)))
// Create a file with the checksum of 'value' followed by the bytes of
// 'mismatchedValue'.
f, err := os.Create(filepath.Join(cacheDir, sanitize(key)))
if err != nil {
t.Fatal(err)
}
f.Write(sum)
f.Write(mismatchedValue)
f.Close()
// The mismatched checksum should result in a cache miss.
got, ok := crc.Get(key)
assert.False(ok)
assert.Equal([]byte{}, got)
})
// Ensure that our disk cache will happily cache over the top of an existing
// value. We depend on this behaviour to recover from corrupted cache
// entries. When Get detects a bad checksum it will return a cache miss.
// This should cause httpcache to fall back to its underlying transport and
// to subsequently cache the new value, overwriting the corrupt one.
t.Run("OverwriteExistingKey", func(t *testing.T) {
cacheDir, err := ioutil.TempDir("", "cache-crc")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(cacheDir)
d := diskv.New(diskv.Options{BasePath: cacheDir, TempDir: filepath.Join(cacheDir, ".diskv-temp")})
crc := &crcDiskCache{disk: d}
key := "testing"
value := []byte("cool value!")
// Write a value.
crc.Set(key, value)
got, ok := crc.Get(key)
// Ensure we can read back what we wrote.
assert.True(ok)
assert.Equal(value, got)
differentValue := []byte("I'm different!")
// Write a different value.
crc.Set(key, differentValue)
got, ok = crc.Get(key)
// Ensure we can read back the different value.
assert.True(ok)
assert.Equal(differentValue, got)
})
// Ensure that deleting a key does in fact delete it.
t.Run("DeleteKey", func(t *testing.T) {
cacheDir, err := ioutil.TempDir("", "cache-crc")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(cacheDir)
d := diskv.New(diskv.Options{BasePath: cacheDir, TempDir: filepath.Join(cacheDir, ".diskv-temp")})
crc := &crcDiskCache{disk: d}
key := "testing"
value := []byte("coolValue")
crc.Set(key, value)
// Ensure we successfully set the value.
got, ok := crc.Get(key)
assert.True(ok)
assert.Equal(value, got)
crc.Delete(key)
// Ensure the value is gone.
got, ok = crc.Get(key)
assert.False(ok)
assert.Equal([]byte{}, got)
// Ensure that deleting a non-existent value is a no-op.
crc.Delete(key)
})
}

View File

@ -1,61 +0,0 @@
// Package diskcache provides an implementation of httpcache.Cache that uses the diskv package
// to supplement an in-memory map with persistent storage
//
package diskcache
import (
"bytes"
"crypto/md5"
"encoding/hex"
"github.com/peterbourgon/diskv"
"io"
)
// Cache is an implementation of httpcache.Cache that supplements the in-memory map with persistent storage
type Cache struct {
d *diskv.Diskv
}
// Get returns the response corresponding to key if present
func (c *Cache) Get(key string) (resp []byte, ok bool) {
key = keyToFilename(key)
resp, err := c.d.Read(key)
if err != nil {
return []byte{}, false
}
return resp, true
}
// Set saves a response to the cache as key
func (c *Cache) Set(key string, resp []byte) {
key = keyToFilename(key)
c.d.WriteStream(key, bytes.NewReader(resp), true)
}
// Delete removes the response with key from the cache
func (c *Cache) Delete(key string) {
key = keyToFilename(key)
c.d.Erase(key)
}
func keyToFilename(key string) string {
h := md5.New()
io.WriteString(h, key)
return hex.EncodeToString(h.Sum(nil))
}
// New returns a new Cache that will store files in basePath
func New(basePath string) *Cache {
return &Cache{
d: diskv.New(diskv.Options{
BasePath: basePath,
CacheSizeMax: 100 * 1024 * 1024, // 100MB
}),
}
}
// NewWithDiskv returns a new Cache using the provided Diskv as underlying
// storage.
func NewWithDiskv(d *diskv.Diskv) *Cache {
return &Cache{d}
}

1
vendor/modules.txt vendored
View File

@ -507,7 +507,6 @@ github.com/gorilla/websocket
# github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 => github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7
## explicit
github.com/gregjones/httpcache
github.com/gregjones/httpcache/diskcache
# github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 => github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
## explicit; go 1.14
github.com/grpc-ecosystem/go-grpc-middleware