310 lines
7.3 KiB
Go
310 lines
7.3 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package metadata
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/boltdb/bolt"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/filters"
|
|
"github.com/containerd/containerd/leases"
|
|
"github.com/containerd/containerd/metadata/boltutil"
|
|
"github.com/containerd/containerd/namespaces"
|
|
digest "github.com/opencontainers/go-digest"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// LeaseManager manages the create/delete lifecyle of leases
|
|
// and also returns existing leases
|
|
type LeaseManager struct {
|
|
tx *bolt.Tx
|
|
}
|
|
|
|
// NewLeaseManager creates a new lease manager for managing leases using
|
|
// the provided database transaction.
|
|
func NewLeaseManager(tx *bolt.Tx) *LeaseManager {
|
|
return &LeaseManager{
|
|
tx: tx,
|
|
}
|
|
}
|
|
|
|
// Create creates a new lease using the provided lease
|
|
func (lm *LeaseManager) Create(ctx context.Context, opts ...leases.Opt) (leases.Lease, error) {
|
|
var l leases.Lease
|
|
for _, opt := range opts {
|
|
if err := opt(&l); err != nil {
|
|
return leases.Lease{}, err
|
|
}
|
|
}
|
|
if l.ID == "" {
|
|
return leases.Lease{}, errors.New("lease id must be provided")
|
|
}
|
|
|
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
|
if err != nil {
|
|
return leases.Lease{}, err
|
|
}
|
|
|
|
topbkt, err := createBucketIfNotExists(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
|
|
if err != nil {
|
|
return leases.Lease{}, err
|
|
}
|
|
|
|
txbkt, err := topbkt.CreateBucket([]byte(l.ID))
|
|
if err != nil {
|
|
if err == bolt.ErrBucketExists {
|
|
err = errdefs.ErrAlreadyExists
|
|
}
|
|
return leases.Lease{}, errors.Wrapf(err, "lease %q", l.ID)
|
|
}
|
|
|
|
t := time.Now().UTC()
|
|
createdAt, err := t.MarshalBinary()
|
|
if err != nil {
|
|
return leases.Lease{}, err
|
|
}
|
|
if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil {
|
|
return leases.Lease{}, err
|
|
}
|
|
|
|
if l.Labels != nil {
|
|
if err := boltutil.WriteLabels(txbkt, l.Labels); err != nil {
|
|
return leases.Lease{}, err
|
|
}
|
|
}
|
|
l.CreatedAt = t
|
|
|
|
return l, nil
|
|
}
|
|
|
|
// Delete delets the lease with the provided lease ID
|
|
func (lm *LeaseManager) Delete(ctx context.Context, lease leases.Lease, _ ...leases.DeleteOpt) error {
|
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
|
|
if topbkt == nil {
|
|
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
|
|
}
|
|
if err := topbkt.DeleteBucket([]byte(lease.ID)); err != nil {
|
|
if err == bolt.ErrBucketNotFound {
|
|
err = errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// List lists all active leases
|
|
func (lm *LeaseManager) List(ctx context.Context, fs ...string) ([]leases.Lease, error) {
|
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
filter, err := filters.ParseAll(fs...)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(errdefs.ErrInvalidArgument, err.Error())
|
|
}
|
|
|
|
var ll []leases.Lease
|
|
|
|
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
|
|
if topbkt == nil {
|
|
return ll, nil
|
|
}
|
|
|
|
if err := topbkt.ForEach(func(k, v []byte) error {
|
|
if v != nil {
|
|
return nil
|
|
}
|
|
txbkt := topbkt.Bucket(k)
|
|
|
|
l := leases.Lease{
|
|
ID: string(k),
|
|
}
|
|
|
|
if v := txbkt.Get(bucketKeyCreatedAt); v != nil {
|
|
t := &l.CreatedAt
|
|
if err := t.UnmarshalBinary(v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
labels, err := boltutil.ReadLabels(txbkt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
l.Labels = labels
|
|
|
|
if filter.Match(adaptLease(l)) {
|
|
ll = append(ll, l)
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return ll, nil
|
|
}
|
|
|
|
func addSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string) error {
|
|
lid, ok := leases.FromContext(ctx)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
namespace, ok := namespaces.Namespace(ctx)
|
|
if !ok {
|
|
panic("namespace must already be checked")
|
|
}
|
|
|
|
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
|
|
if bkt == nil {
|
|
return errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
|
|
}
|
|
|
|
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectSnapshots)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
bkt, err = bkt.CreateBucketIfNotExists([]byte(snapshotter))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bkt.Put([]byte(key), nil)
|
|
}
|
|
|
|
func removeSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string) error {
|
|
lid, ok := leases.FromContext(ctx)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
namespace, ok := namespaces.Namespace(ctx)
|
|
if !ok {
|
|
panic("namespace must already be checked")
|
|
}
|
|
|
|
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid), bucketKeyObjectSnapshots, []byte(snapshotter))
|
|
if bkt == nil {
|
|
// Key does not exist so we return nil
|
|
return nil
|
|
}
|
|
|
|
return bkt.Delete([]byte(key))
|
|
}
|
|
|
|
func addContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) error {
|
|
lid, ok := leases.FromContext(ctx)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
namespace, ok := namespaces.Namespace(ctx)
|
|
if !ok {
|
|
panic("namespace must already be required")
|
|
}
|
|
|
|
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
|
|
if bkt == nil {
|
|
return errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
|
|
}
|
|
|
|
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectContent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bkt.Put([]byte(dgst.String()), nil)
|
|
}
|
|
|
|
func removeContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) error {
|
|
lid, ok := leases.FromContext(ctx)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
namespace, ok := namespaces.Namespace(ctx)
|
|
if !ok {
|
|
panic("namespace must already be checked")
|
|
}
|
|
|
|
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid), bucketKeyObjectContent)
|
|
if bkt == nil {
|
|
// Key does not exist so we return nil
|
|
return nil
|
|
}
|
|
|
|
return bkt.Delete([]byte(dgst.String()))
|
|
}
|
|
|
|
func addIngestLease(ctx context.Context, tx *bolt.Tx, ref string) (bool, error) {
|
|
lid, ok := leases.FromContext(ctx)
|
|
if !ok {
|
|
return false, nil
|
|
}
|
|
|
|
namespace, ok := namespaces.Namespace(ctx)
|
|
if !ok {
|
|
panic("namespace must already be required")
|
|
}
|
|
|
|
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
|
|
if bkt == nil {
|
|
return false, errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
|
|
}
|
|
|
|
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectIngests)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if err := bkt.Put([]byte(ref), nil); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func removeIngestLease(ctx context.Context, tx *bolt.Tx, ref string) error {
|
|
lid, ok := leases.FromContext(ctx)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
namespace, ok := namespaces.Namespace(ctx)
|
|
if !ok {
|
|
panic("namespace must already be checked")
|
|
}
|
|
|
|
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid), bucketKeyObjectIngests)
|
|
if bkt == nil {
|
|
// Key does not exist so we return nil
|
|
return nil
|
|
}
|
|
|
|
return bkt.Delete([]byte(ref))
|
|
}
|