Cleanup local transfer interface
Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
parent
a91b05d99c
commit
fe01cad201
@ -56,7 +56,7 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Verify image before pulling.
|
// Verify image before pulling.
|
||||||
for vfName, vf := range ts.verifiers {
|
for vfName, vf := range ts.config.Verifiers {
|
||||||
log := log.G(ctx).WithFields(logrus.Fields{
|
log := log.G(ctx).WithFields(logrus.Fields{
|
||||||
"name": name,
|
"name": name,
|
||||||
"digest": desc.Digest.String(),
|
"digest": desc.Digest.String(),
|
||||||
|
@ -36,10 +36,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type localTransferService struct {
|
type localTransferService struct {
|
||||||
leases leases.Manager
|
|
||||||
content content.Store
|
content content.Store
|
||||||
images images.Store
|
images images.Store
|
||||||
verifiers map[string]imageverifier.ImageVerifier
|
|
||||||
// limiter for upload
|
// limiter for upload
|
||||||
limiterU *semaphore.Weighted
|
limiterU *semaphore.Weighted
|
||||||
// limiter for download operation
|
// limiter for download operation
|
||||||
@ -47,13 +45,11 @@ type localTransferService struct {
|
|||||||
config TransferConfig
|
config TransferConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTransferService(lm leases.Manager, cs content.Store, is images.Store, vfs map[string]imageverifier.ImageVerifier, tc *TransferConfig) transfer.Transferrer {
|
func NewTransferService(cs content.Store, is images.Store, tc TransferConfig) transfer.Transferrer {
|
||||||
ts := &localTransferService{
|
ts := &localTransferService{
|
||||||
leases: lm,
|
|
||||||
content: cs,
|
content: cs,
|
||||||
images: is,
|
images: is,
|
||||||
verifiers: vfs,
|
config: tc,
|
||||||
config: *tc,
|
|
||||||
}
|
}
|
||||||
if tc.MaxConcurrentUploadedLayers > 0 {
|
if tc.MaxConcurrentUploadedLayers > 0 {
|
||||||
ts.limiterU = semaphore.NewWeighted(int64(tc.MaxConcurrentUploadedLayers))
|
ts.limiterU = semaphore.NewWeighted(int64(tc.MaxConcurrentUploadedLayers))
|
||||||
@ -142,7 +138,10 @@ func (ts *localTransferService) withLease(ctx context.Context, opts ...leases.Op
|
|||||||
return ctx, nop, nil
|
return ctx, nop, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ls := ts.leases
|
ls := ts.config.Leases
|
||||||
|
if ls == nil {
|
||||||
|
return ctx, nop, nil
|
||||||
|
}
|
||||||
|
|
||||||
if len(opts) == 0 {
|
if len(opts) == 0 {
|
||||||
// Use default lease configuration if no options provided
|
// Use default lease configuration if no options provided
|
||||||
@ -164,6 +163,9 @@ func (ts *localTransferService) withLease(ctx context.Context, opts ...leases.Op
|
|||||||
}
|
}
|
||||||
|
|
||||||
type TransferConfig struct {
|
type TransferConfig struct {
|
||||||
|
// Leases manager is used to create leases during operations if none, exists
|
||||||
|
Leases leases.Manager
|
||||||
|
|
||||||
// MaxConcurrentDownloads is the max concurrent content downloads for pull.
|
// MaxConcurrentDownloads is the max concurrent content downloads for pull.
|
||||||
MaxConcurrentDownloads int
|
MaxConcurrentDownloads int
|
||||||
// MaxConcurrentUploadedLayers is the max concurrent uploads for push
|
// MaxConcurrentUploadedLayers is the max concurrent uploads for push
|
||||||
@ -182,6 +184,9 @@ type TransferConfig struct {
|
|||||||
// UnpackPlatforms are used to specify supported combination of platforms and snapshotters
|
// UnpackPlatforms are used to specify supported combination of platforms and snapshotters
|
||||||
UnpackPlatforms []unpack.Platform
|
UnpackPlatforms []unpack.Platform
|
||||||
|
|
||||||
|
// ImageVerifiers verify the image before saving into the image store.
|
||||||
|
Verifiers map[string]imageverifier.ImageVerifier
|
||||||
|
|
||||||
// RegistryConfigPath is a path to the root directory containing registry-specific configurations
|
// RegistryConfigPath is a path to the root directory containing registry-specific configurations
|
||||||
RegistryConfigPath string
|
RegistryConfigPath string
|
||||||
}
|
}
|
||||||
|
@ -57,23 +57,27 @@ func init() {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ms := m.(*metadata.DB)
|
ms := m.(*metadata.DB)
|
||||||
|
|
||||||
|
var lc local.TransferConfig
|
||||||
|
|
||||||
l, err := ic.GetSingle(plugins.LeasePlugin)
|
l, err := ic.GetSingle(plugins.LeasePlugin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
lc.Leases = l.(leases.Manager)
|
||||||
|
|
||||||
vfs := make(map[string]imageverifier.ImageVerifier)
|
|
||||||
vps, err := ic.GetByType(plugins.ImageVerifierPlugin)
|
vps, err := ic.GetByType(plugins.ImageVerifierPlugin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if len(vps) > 0 {
|
||||||
|
lc.Verifiers = make(map[string]imageverifier.ImageVerifier)
|
||||||
for name, vp := range vps {
|
for name, vp := range vps {
|
||||||
vfs[name] = vp.(imageverifier.ImageVerifier)
|
lc.Verifiers[name] = vp.(imageverifier.ImageVerifier)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set configuration based on default or user input
|
// Set configuration based on default or user input
|
||||||
var lc local.TransferConfig
|
|
||||||
lc.MaxConcurrentDownloads = config.MaxConcurrentDownloads
|
lc.MaxConcurrentDownloads = config.MaxConcurrentDownloads
|
||||||
lc.MaxConcurrentUploadedLayers = config.MaxConcurrentUploadedLayers
|
lc.MaxConcurrentUploadedLayers = config.MaxConcurrentUploadedLayers
|
||||||
|
|
||||||
@ -140,7 +144,7 @@ func init() {
|
|||||||
}
|
}
|
||||||
lc.RegistryConfigPath = config.RegistryConfigPath
|
lc.RegistryConfigPath = config.RegistryConfigPath
|
||||||
|
|
||||||
return local.NewTransferService(l.(leases.Manager), ms.ContentStore(), metadata.NewImageStore(ms), vfs, &lc), nil
|
return local.NewTransferService(ms.ContentStore(), metadata.NewImageStore(ms), lc), nil
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user