@@ -32,6 +32,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
|
||||
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
|
||||
|
||||
"github.com/kubernetes-incubator/cri-containerd/cmd/cri-containerd/options"
|
||||
osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os"
|
||||
"github.com/kubernetes-incubator/cri-containerd/pkg/registrar"
|
||||
containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
|
||||
@@ -56,18 +57,12 @@ type CRIContainerdService interface {
|
||||
|
||||
// criContainerdService implements CRIContainerdService.
|
||||
type criContainerdService struct {
|
||||
// serverAddress is the grpc server unix path.
|
||||
serverAddress string
|
||||
// config contains all configurations.
|
||||
config options.Config
|
||||
// server is the grpc server.
|
||||
server *grpc.Server
|
||||
// os is an interface for all required os operations.
|
||||
os osinterface.OS
|
||||
// rootDir is the directory for managing cri-containerd files.
|
||||
rootDir string
|
||||
// sandboxImage is the image to use for sandbox container.
|
||||
sandboxImage string
|
||||
// snapshotter is the snapshotter to use in containerd.
|
||||
snapshotter string
|
||||
// sandboxStore stores all resources associated with sandboxes.
|
||||
sandboxStore *sandboxstore.Store
|
||||
// sandboxNameIndex stores all sandbox names and make sure each name
|
||||
@@ -93,44 +88,29 @@ type criContainerdService struct {
|
||||
client *containerd.Client
|
||||
// streamServer is the streaming server serves container streaming request.
|
||||
streamServer streaming.Server
|
||||
// cgroupPath in which the cri-containerd is placed in
|
||||
cgroupPath string
|
||||
// eventMonitor is the monitor monitors containerd events.
|
||||
eventMonitor *eventMonitor
|
||||
}
|
||||
|
||||
// NewCRIContainerdService returns a new instance of CRIContainerdService
|
||||
// TODO(random-liu): Add cri-containerd server config to get rid of the long arg list.
|
||||
func NewCRIContainerdService(
|
||||
serverAddress,
|
||||
containerdEndpoint,
|
||||
containerdSnapshotter,
|
||||
rootDir,
|
||||
networkPluginBinDir,
|
||||
networkPluginConfDir,
|
||||
streamAddress,
|
||||
streamPort string,
|
||||
cgroupPath string,
|
||||
sandboxImage string) (CRIContainerdService, error) {
|
||||
func NewCRIContainerdService(config options.Config) (CRIContainerdService, error) {
|
||||
// TODO(random-liu): [P2] Recover from runtime state and checkpoint.
|
||||
|
||||
client, err := containerd.New(containerdEndpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
|
||||
client, err := containerd.New(config.ContainerdEndpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize containerd client with endpoint %q: %v", containerdEndpoint, err)
|
||||
return nil, fmt.Errorf("failed to initialize containerd client with endpoint %q: %v",
|
||||
config.ContainerdEndpoint, err)
|
||||
}
|
||||
if cgroupPath != "" {
|
||||
_, err := loadCgroup(cgroupPath)
|
||||
if config.CgroupPath != "" {
|
||||
_, err := loadCgroup(config.CgroupPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load cgroup for cgroup path %v: %v", cgroupPath, err)
|
||||
return nil, fmt.Errorf("failed to load cgroup for cgroup path %v: %v", config.CgroupPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
c := &criContainerdService{
|
||||
serverAddress: serverAddress,
|
||||
config: config,
|
||||
os: osinterface.RealOS{},
|
||||
rootDir: rootDir,
|
||||
sandboxImage: sandboxImage,
|
||||
snapshotter: containerdSnapshotter,
|
||||
sandboxStore: sandboxstore.NewStore(),
|
||||
containerStore: containerstore.NewStore(),
|
||||
imageStore: imagestore.NewStore(),
|
||||
@@ -140,17 +120,16 @@ func NewCRIContainerdService(
|
||||
imageStoreService: client.ImageService(),
|
||||
contentStoreService: client.ContentStore(),
|
||||
client: client,
|
||||
cgroupPath: cgroupPath,
|
||||
}
|
||||
|
||||
netPlugin, err := ocicni.InitCNI(networkPluginConfDir, networkPluginBinDir)
|
||||
netPlugin, err := ocicni.InitCNI(config.NetworkPluginConfDir, config.NetworkPluginBinDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize cni plugin: %v", err)
|
||||
}
|
||||
c.netPlugin = netPlugin
|
||||
|
||||
// prepare streaming server
|
||||
c.streamServer, err = newStreamServer(c, streamAddress, streamPort)
|
||||
c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create stream server: %v", err)
|
||||
}
|
||||
@@ -187,13 +166,13 @@ func (c *criContainerdService) Run() error {
|
||||
// Start grpc server.
|
||||
// Unlink to cleanup the previous socket file.
|
||||
glog.V(2).Info("Start grpc server")
|
||||
err := syscall.Unlink(c.serverAddress)
|
||||
err := syscall.Unlink(c.config.SocketPath)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to unlink socket file %q: %v", c.serverAddress, err)
|
||||
return fmt.Errorf("failed to unlink socket file %q: %v", c.config.SocketPath, err)
|
||||
}
|
||||
l, err := net.Listen(unixProtocol, c.serverAddress)
|
||||
l, err := net.Listen(unixProtocol, c.config.SocketPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to listen on %q: %v", c.serverAddress, err)
|
||||
return fmt.Errorf("failed to listen on %q: %v", c.config.SocketPath, err)
|
||||
}
|
||||
grpcServerCloseCh := make(chan struct{})
|
||||
go func() {
|
||||
|
||||
Reference in New Issue
Block a user