Move metadata plugin registration to seperate package
Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
		| @@ -22,6 +22,7 @@ import ( | |||||||
| 	_ "github.com/containerd/containerd/events/plugin" | 	_ "github.com/containerd/containerd/events/plugin" | ||||||
| 	_ "github.com/containerd/containerd/gc/scheduler" | 	_ "github.com/containerd/containerd/gc/scheduler" | ||||||
| 	_ "github.com/containerd/containerd/leases/plugin" | 	_ "github.com/containerd/containerd/leases/plugin" | ||||||
|  | 	_ "github.com/containerd/containerd/metadata/plugin" | ||||||
| 	_ "github.com/containerd/containerd/runtime/restart/monitor" | 	_ "github.com/containerd/containerd/runtime/restart/monitor" | ||||||
| 	_ "github.com/containerd/containerd/runtime/v2" | 	_ "github.com/containerd/containerd/runtime/v2" | ||||||
| 	_ "github.com/containerd/containerd/services/containers" | 	_ "github.com/containerd/containerd/services/containers" | ||||||
|   | |||||||
| @@ -48,6 +48,7 @@ import ( | |||||||
| 	_ "github.com/containerd/containerd/events/plugin" | 	_ "github.com/containerd/containerd/events/plugin" | ||||||
| 	_ "github.com/containerd/containerd/gc/scheduler" | 	_ "github.com/containerd/containerd/gc/scheduler" | ||||||
| 	_ "github.com/containerd/containerd/leases/plugin" | 	_ "github.com/containerd/containerd/leases/plugin" | ||||||
|  | 	_ "github.com/containerd/containerd/metadata/plugin" | ||||||
| 	_ "github.com/containerd/containerd/runtime/v2" | 	_ "github.com/containerd/containerd/runtime/v2" | ||||||
| 	_ "github.com/containerd/containerd/runtime/v2/runc/options" | 	_ "github.com/containerd/containerd/runtime/v2/runc/options" | ||||||
| 	_ "github.com/containerd/containerd/services/containers" | 	_ "github.com/containerd/containerd/services/containers" | ||||||
|   | |||||||
							
								
								
									
										179
									
								
								metadata/plugin/plugin.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										179
									
								
								metadata/plugin/plugin.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,179 @@ | |||||||
|  | /* | ||||||
|  |    Copyright The containerd Authors. | ||||||
|  |  | ||||||
|  |    Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  |    you may not use this file except in compliance with the License. | ||||||
|  |    You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |        http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  |    Unless required by applicable law or agreed to in writing, software | ||||||
|  |    distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  |    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  |    See the License for the specific language governing permissions and | ||||||
|  |    limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package plugin | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"os" | ||||||
|  | 	"path/filepath" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/containerd/containerd/content" | ||||||
|  | 	"github.com/containerd/containerd/errdefs" | ||||||
|  | 	"github.com/containerd/containerd/log" | ||||||
|  | 	"github.com/containerd/containerd/metadata" | ||||||
|  | 	"github.com/containerd/containerd/pkg/timeout" | ||||||
|  | 	"github.com/containerd/containerd/plugin" | ||||||
|  | 	"github.com/containerd/containerd/snapshots" | ||||||
|  | 	bolt "go.etcd.io/bbolt" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	boltOpenTimeout = "io.containerd.timeout.bolt.open" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func init() { | ||||||
|  | 	timeout.Set(boltOpenTimeout, 0) // set to 0 means to wait indefinitely for bolt.Open | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // BoltConfig defines the configuration values for the bolt plugin, which is | ||||||
|  | // loaded here, rather than back registered in the metadata package. | ||||||
|  | type BoltConfig struct { | ||||||
|  | 	// ContentSharingPolicy sets the sharing policy for content between | ||||||
|  | 	// namespaces. | ||||||
|  | 	// | ||||||
|  | 	// The default mode "shared" will make blobs available in all | ||||||
|  | 	// namespaces once it is pulled into any namespace. The blob will be pulled | ||||||
|  | 	// into the namespace if a writer is opened with the "Expected" digest that | ||||||
|  | 	// is already present in the backend. | ||||||
|  | 	// | ||||||
|  | 	// The alternative mode, "isolated" requires that clients prove they have | ||||||
|  | 	// access to the content by providing all of the content to the ingest | ||||||
|  | 	// before the blob is added to the namespace. | ||||||
|  | 	// | ||||||
|  | 	// Both modes share backing data, while "shared" will reduce total | ||||||
|  | 	// bandwidth across namespaces, at the cost of allowing access to any blob | ||||||
|  | 	// just by knowing its digest. | ||||||
|  | 	ContentSharingPolicy string `toml:"content_sharing_policy"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	// SharingPolicyShared represents the "shared" sharing policy | ||||||
|  | 	SharingPolicyShared = "shared" | ||||||
|  | 	// SharingPolicyIsolated represents the "isolated" sharing policy | ||||||
|  | 	SharingPolicyIsolated = "isolated" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Validate validates if BoltConfig is valid | ||||||
|  | func (bc *BoltConfig) Validate() error { | ||||||
|  | 	switch bc.ContentSharingPolicy { | ||||||
|  | 	case SharingPolicyShared, SharingPolicyIsolated: | ||||||
|  | 		return nil | ||||||
|  | 	default: | ||||||
|  | 		return fmt.Errorf("unknown policy: %s: %w", bc.ContentSharingPolicy, errdefs.ErrInvalidArgument) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func init() { | ||||||
|  | 	plugin.Register(&plugin.Registration{ | ||||||
|  | 		Type: plugin.MetadataPlugin, | ||||||
|  | 		ID:   "bolt", | ||||||
|  | 		Requires: []plugin.Type{ | ||||||
|  | 			plugin.ContentPlugin, | ||||||
|  | 			plugin.SnapshotPlugin, | ||||||
|  | 		}, | ||||||
|  | 		Config: &BoltConfig{ | ||||||
|  | 			ContentSharingPolicy: SharingPolicyShared, | ||||||
|  | 		}, | ||||||
|  | 		InitFn: func(ic *plugin.InitContext) (interface{}, error) { | ||||||
|  | 			if err := os.MkdirAll(ic.Root, 0711); err != nil { | ||||||
|  | 				return nil, err | ||||||
|  | 			} | ||||||
|  | 			cs, err := ic.Get(plugin.ContentPlugin) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return nil, err | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			snapshottersRaw, err := ic.GetByType(plugin.SnapshotPlugin) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return nil, err | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			snapshotters := make(map[string]snapshots.Snapshotter) | ||||||
|  | 			for name, sn := range snapshottersRaw { | ||||||
|  | 				sn, err := sn.Instance() | ||||||
|  | 				if err != nil { | ||||||
|  | 					if !plugin.IsSkipPlugin(err) { | ||||||
|  | 						log.G(ic.Context).WithError(err). | ||||||
|  | 							Warnf("could not use snapshotter %v in metadata plugin", name) | ||||||
|  | 					} | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				snapshotters[name] = sn.(snapshots.Snapshotter) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			shared := true | ||||||
|  | 			ic.Meta.Exports["policy"] = SharingPolicyShared | ||||||
|  | 			if cfg, ok := ic.Config.(*BoltConfig); ok { | ||||||
|  | 				if cfg.ContentSharingPolicy != "" { | ||||||
|  | 					if err := cfg.Validate(); err != nil { | ||||||
|  | 						return nil, err | ||||||
|  | 					} | ||||||
|  | 					if cfg.ContentSharingPolicy == SharingPolicyIsolated { | ||||||
|  | 						ic.Meta.Exports["policy"] = SharingPolicyIsolated | ||||||
|  | 						shared = false | ||||||
|  | 					} | ||||||
|  |  | ||||||
|  | 					log.G(ic.Context).WithField("policy", cfg.ContentSharingPolicy).Info("metadata content store policy set") | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			path := filepath.Join(ic.Root, "meta.db") | ||||||
|  | 			ic.Meta.Exports["path"] = path | ||||||
|  |  | ||||||
|  | 			options := *bolt.DefaultOptions | ||||||
|  | 			// Reading bbolt's freelist sometimes fails when the file has a data corruption. | ||||||
|  | 			// Disabling freelist sync reduces the chance of the breakage. | ||||||
|  | 			// https://github.com/etcd-io/bbolt/pull/1 | ||||||
|  | 			// https://github.com/etcd-io/bbolt/pull/6 | ||||||
|  | 			options.NoFreelistSync = true | ||||||
|  | 			// Without the timeout, bbolt.Open would block indefinitely due to flock(2). | ||||||
|  | 			options.Timeout = timeout.Get(boltOpenTimeout) | ||||||
|  |  | ||||||
|  | 			doneCh := make(chan struct{}) | ||||||
|  | 			go func() { | ||||||
|  | 				t := time.NewTimer(10 * time.Second) | ||||||
|  | 				defer t.Stop() | ||||||
|  | 				select { | ||||||
|  | 				case <-t.C: | ||||||
|  | 					log.G(ic.Context).WithField("plugin", "bolt").Warn("waiting for response from boltdb open") | ||||||
|  | 				case <-doneCh: | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 			}() | ||||||
|  | 			db, err := bolt.Open(path, 0644, &options) | ||||||
|  | 			close(doneCh) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return nil, err | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			dbopts := []metadata.DBOpt{ | ||||||
|  | 				//	metadata.WithEventsPublisher(ic.Events), | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			if !shared { | ||||||
|  | 				dbopts = append(dbopts, metadata.WithPolicyIsolated) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			mdb := metadata.NewDB(db, cs.(content.Store), snapshotters, dbopts...) | ||||||
|  | 			if err := mdb.Init(ic.Context); err != nil { | ||||||
|  | 				return nil, err | ||||||
|  | 			} | ||||||
|  | 			return mdb, nil | ||||||
|  | 		}, | ||||||
|  | 	}) | ||||||
|  | } | ||||||
| @@ -168,44 +168,6 @@ type ProxyPlugin struct { | |||||||
| 	Address string `toml:"address"` | 	Address string `toml:"address"` | ||||||
| } | } | ||||||
|  |  | ||||||
| // BoltConfig defines the configuration values for the bolt plugin, which is |  | ||||||
| // loaded here, rather than back registered in the metadata package. |  | ||||||
| type BoltConfig struct { |  | ||||||
| 	// ContentSharingPolicy sets the sharing policy for content between |  | ||||||
| 	// namespaces. |  | ||||||
| 	// |  | ||||||
| 	// The default mode "shared" will make blobs available in all |  | ||||||
| 	// namespaces once it is pulled into any namespace. The blob will be pulled |  | ||||||
| 	// into the namespace if a writer is opened with the "Expected" digest that |  | ||||||
| 	// is already present in the backend. |  | ||||||
| 	// |  | ||||||
| 	// The alternative mode, "isolated" requires that clients prove they have |  | ||||||
| 	// access to the content by providing all of the content to the ingest |  | ||||||
| 	// before the blob is added to the namespace. |  | ||||||
| 	// |  | ||||||
| 	// Both modes share backing data, while "shared" will reduce total |  | ||||||
| 	// bandwidth across namespaces, at the cost of allowing access to any blob |  | ||||||
| 	// just by knowing its digest. |  | ||||||
| 	ContentSharingPolicy string `toml:"content_sharing_policy"` |  | ||||||
| } |  | ||||||
|  |  | ||||||
| const ( |  | ||||||
| 	// SharingPolicyShared represents the "shared" sharing policy |  | ||||||
| 	SharingPolicyShared = "shared" |  | ||||||
| 	// SharingPolicyIsolated represents the "isolated" sharing policy |  | ||||||
| 	SharingPolicyIsolated = "isolated" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| // Validate validates if BoltConfig is valid |  | ||||||
| func (bc *BoltConfig) Validate() error { |  | ||||||
| 	switch bc.ContentSharingPolicy { |  | ||||||
| 	case SharingPolicyShared, SharingPolicyIsolated: |  | ||||||
| 		return nil |  | ||||||
| 	default: |  | ||||||
| 		return fmt.Errorf("unknown policy: %s: %w", bc.ContentSharingPolicy, errdefs.ErrInvalidArgument) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Decode unmarshals a plugin specific configuration by plugin id | // Decode unmarshals a plugin specific configuration by plugin id | ||||||
| func (c *Config) Decode(p *plugin.Registration) (interface{}, error) { | func (c *Config) Decode(p *plugin.Registration) (interface{}, error) { | ||||||
| 	id := p.URI() | 	id := p.URI() | ||||||
|   | |||||||
| @@ -36,26 +36,22 @@ import ( | |||||||
|  |  | ||||||
| 	csapi "github.com/containerd/containerd/api/services/content/v1" | 	csapi "github.com/containerd/containerd/api/services/content/v1" | ||||||
| 	ssapi "github.com/containerd/containerd/api/services/snapshots/v1" | 	ssapi "github.com/containerd/containerd/api/services/snapshots/v1" | ||||||
| 	"github.com/containerd/containerd/content" |  | ||||||
| 	"github.com/containerd/containerd/content/local" | 	"github.com/containerd/containerd/content/local" | ||||||
| 	csproxy "github.com/containerd/containerd/content/proxy" | 	csproxy "github.com/containerd/containerd/content/proxy" | ||||||
| 	"github.com/containerd/containerd/defaults" | 	"github.com/containerd/containerd/defaults" | ||||||
| 	"github.com/containerd/containerd/diff" | 	"github.com/containerd/containerd/diff" | ||||||
| 	"github.com/containerd/containerd/events/exchange" | 	"github.com/containerd/containerd/events/exchange" | ||||||
| 	"github.com/containerd/containerd/log" | 	"github.com/containerd/containerd/log" | ||||||
| 	"github.com/containerd/containerd/metadata" |  | ||||||
| 	"github.com/containerd/containerd/pkg/dialer" | 	"github.com/containerd/containerd/pkg/dialer" | ||||||
| 	"github.com/containerd/containerd/pkg/timeout" | 	"github.com/containerd/containerd/pkg/timeout" | ||||||
| 	"github.com/containerd/containerd/plugin" | 	"github.com/containerd/containerd/plugin" | ||||||
| 	srvconfig "github.com/containerd/containerd/services/server/config" | 	srvconfig "github.com/containerd/containerd/services/server/config" | ||||||
| 	"github.com/containerd/containerd/snapshots" |  | ||||||
| 	ssproxy "github.com/containerd/containerd/snapshots/proxy" | 	ssproxy "github.com/containerd/containerd/snapshots/proxy" | ||||||
| 	"github.com/containerd/containerd/sys" | 	"github.com/containerd/containerd/sys" | ||||||
| 	"github.com/containerd/ttrpc" | 	"github.com/containerd/ttrpc" | ||||||
| 	metrics "github.com/docker/go-metrics" | 	metrics "github.com/docker/go-metrics" | ||||||
| 	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" | 	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" | ||||||
| 	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" | 	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" | ||||||
| 	bolt "go.etcd.io/bbolt" |  | ||||||
| 	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" | 	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" | ||||||
| 	"google.golang.org/grpc" | 	"google.golang.org/grpc" | ||||||
| 	"google.golang.org/grpc/backoff" | 	"google.golang.org/grpc/backoff" | ||||||
| @@ -63,14 +59,6 @@ import ( | |||||||
| 	"google.golang.org/grpc/credentials/insecure" | 	"google.golang.org/grpc/credentials/insecure" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( |  | ||||||
| 	boltOpenTimeout = "io.containerd.timeout.bolt.open" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| func init() { |  | ||||||
| 	timeout.Set(boltOpenTimeout, 0) // set to 0 means to wait indefinitely for bolt.Open |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // CreateTopLevelDirectories creates the top-level root and state directories. | // CreateTopLevelDirectories creates the top-level root and state directories. | ||||||
| func CreateTopLevelDirectories(config *srvconfig.Config) error { | func CreateTopLevelDirectories(config *srvconfig.Config) error { | ||||||
| 	switch { | 	switch { | ||||||
| @@ -395,99 +383,6 @@ func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Regis | |||||||
| 			return local.NewStore(ic.Root) | 			return local.NewStore(ic.Root) | ||||||
| 		}, | 		}, | ||||||
| 	}) | 	}) | ||||||
| 	plugin.Register(&plugin.Registration{ |  | ||||||
| 		Type: plugin.MetadataPlugin, |  | ||||||
| 		ID:   "bolt", |  | ||||||
| 		Requires: []plugin.Type{ |  | ||||||
| 			plugin.ContentPlugin, |  | ||||||
| 			plugin.SnapshotPlugin, |  | ||||||
| 		}, |  | ||||||
| 		Config: &srvconfig.BoltConfig{ |  | ||||||
| 			ContentSharingPolicy: srvconfig.SharingPolicyShared, |  | ||||||
| 		}, |  | ||||||
| 		InitFn: func(ic *plugin.InitContext) (interface{}, error) { |  | ||||||
| 			if err := os.MkdirAll(ic.Root, 0711); err != nil { |  | ||||||
| 				return nil, err |  | ||||||
| 			} |  | ||||||
| 			cs, err := ic.Get(plugin.ContentPlugin) |  | ||||||
| 			if err != nil { |  | ||||||
| 				return nil, err |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			snapshottersRaw, err := ic.GetByType(plugin.SnapshotPlugin) |  | ||||||
| 			if err != nil { |  | ||||||
| 				return nil, err |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			snapshotters := make(map[string]snapshots.Snapshotter) |  | ||||||
| 			for name, sn := range snapshottersRaw { |  | ||||||
| 				sn, err := sn.Instance() |  | ||||||
| 				if err != nil { |  | ||||||
| 					if !plugin.IsSkipPlugin(err) { |  | ||||||
| 						log.G(ic.Context).WithError(err). |  | ||||||
| 							Warnf("could not use snapshotter %v in metadata plugin", name) |  | ||||||
| 					} |  | ||||||
| 					continue |  | ||||||
| 				} |  | ||||||
| 				snapshotters[name] = sn.(snapshots.Snapshotter) |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			shared := true |  | ||||||
| 			ic.Meta.Exports["policy"] = srvconfig.SharingPolicyShared |  | ||||||
| 			if cfg, ok := ic.Config.(*srvconfig.BoltConfig); ok { |  | ||||||
| 				if cfg.ContentSharingPolicy != "" { |  | ||||||
| 					if err := cfg.Validate(); err != nil { |  | ||||||
| 						return nil, err |  | ||||||
| 					} |  | ||||||
| 					if cfg.ContentSharingPolicy == srvconfig.SharingPolicyIsolated { |  | ||||||
| 						ic.Meta.Exports["policy"] = srvconfig.SharingPolicyIsolated |  | ||||||
| 						shared = false |  | ||||||
| 					} |  | ||||||
|  |  | ||||||
| 					log.L.WithField("policy", cfg.ContentSharingPolicy).Info("metadata content store policy set") |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			path := filepath.Join(ic.Root, "meta.db") |  | ||||||
| 			ic.Meta.Exports["path"] = path |  | ||||||
|  |  | ||||||
| 			options := *bolt.DefaultOptions |  | ||||||
| 			// Reading bbolt's freelist sometimes fails when the file has a data corruption. |  | ||||||
| 			// Disabling freelist sync reduces the chance of the breakage. |  | ||||||
| 			// https://github.com/etcd-io/bbolt/pull/1 |  | ||||||
| 			// https://github.com/etcd-io/bbolt/pull/6 |  | ||||||
| 			options.NoFreelistSync = true |  | ||||||
| 			// Without the timeout, bbolt.Open would block indefinitely due to flock(2). |  | ||||||
| 			options.Timeout = timeout.Get(boltOpenTimeout) |  | ||||||
|  |  | ||||||
| 			doneCh := make(chan struct{}) |  | ||||||
| 			go func() { |  | ||||||
| 				t := time.NewTimer(10 * time.Second) |  | ||||||
| 				defer t.Stop() |  | ||||||
| 				select { |  | ||||||
| 				case <-t.C: |  | ||||||
| 					log.G(ctx).WithField("plugin", "bolt").Warn("waiting for response from boltdb open") |  | ||||||
| 				case <-doneCh: |  | ||||||
| 					return |  | ||||||
| 				} |  | ||||||
| 			}() |  | ||||||
| 			db, err := bolt.Open(path, 0644, &options) |  | ||||||
| 			close(doneCh) |  | ||||||
| 			if err != nil { |  | ||||||
| 				return nil, err |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			var dbopts []metadata.DBOpt |  | ||||||
| 			if !shared { |  | ||||||
| 				dbopts = append(dbopts, metadata.WithPolicyIsolated) |  | ||||||
| 			} |  | ||||||
| 			mdb := metadata.NewDB(db, cs.(content.Store), snapshotters, dbopts...) |  | ||||||
| 			if err := mdb.Init(ic.Context); err != nil { |  | ||||||
| 				return nil, err |  | ||||||
| 			} |  | ||||||
| 			return mdb, nil |  | ||||||
| 		}, |  | ||||||
| 	}) |  | ||||||
|  |  | ||||||
| 	clients := &proxyClients{} | 	clients := &proxyClients{} | ||||||
| 	for name, pp := range config.ProxyPlugins { | 	for name, pp := range config.ProxyPlugins { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Derek McGowan
					Derek McGowan