@@ -24,18 +24,21 @@ import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
"github.com/containerd/containerd/api/types"
|
||||
tasktypes "github.com/containerd/containerd/api/types/task"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/events/exchange"
|
||||
"github.com/containerd/containerd/identifiers"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/runtime"
|
||||
client "github.com/containerd/containerd/runtime/v2/shim"
|
||||
"github.com/containerd/containerd/runtime/v2/task"
|
||||
"github.com/containerd/ttrpc"
|
||||
ptypes "github.com/gogo/protobuf/types"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func loadAddress(path string) (string, error) {
|
||||
@@ -46,7 +49,7 @@ func loadAddress(path string) (string, error) {
|
||||
return string(data), nil
|
||||
}
|
||||
|
||||
func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList) (_ *shim, err error) {
|
||||
func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList, onClose func()) (_ *shim, err error) {
|
||||
address, err := loadAddress(filepath.Join(bundle.Path, "address"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -55,6 +58,11 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
}
|
||||
}()
|
||||
f, err := openShimLog(ctx, bundle)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "open shim log pipe")
|
||||
@@ -74,7 +82,12 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
|
||||
}
|
||||
}()
|
||||
|
||||
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { _ = conn.Close() }))
|
||||
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose))
|
||||
defer func() {
|
||||
if err != nil {
|
||||
client.Close()
|
||||
}
|
||||
}()
|
||||
s := &shim{
|
||||
client: client,
|
||||
task: task.NewTaskClient(client),
|
||||
@@ -88,6 +101,52 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) {
|
||||
ctx = namespaces.WithNamespace(ctx, ns)
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"id": id,
|
||||
"namespace": ns,
|
||||
}).Warn("cleaning up after shim disconnected")
|
||||
response, err := binaryCall.Delete(ctx)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).WithFields(logrus.Fields{
|
||||
"id": id,
|
||||
"namespace": ns,
|
||||
}).Warn("failed to clean up after shim disconnected")
|
||||
}
|
||||
|
||||
var (
|
||||
pid uint32
|
||||
exitStatus uint32
|
||||
exitedAt time.Time
|
||||
)
|
||||
if response != nil {
|
||||
pid = response.Pid
|
||||
exitStatus = response.Status
|
||||
exitedAt = response.Timestamp
|
||||
} else {
|
||||
exitStatus = 255
|
||||
exitedAt = time.Now()
|
||||
}
|
||||
events.Publish(ctx, runtime.TaskExitEventTopic, &eventstypes.TaskExit{
|
||||
ContainerID: id,
|
||||
ID: id,
|
||||
Pid: pid,
|
||||
ExitStatus: exitStatus,
|
||||
ExitedAt: exitedAt,
|
||||
})
|
||||
|
||||
events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
|
||||
ContainerID: id,
|
||||
Pid: pid,
|
||||
ExitStatus: exitStatus,
|
||||
ExitedAt: exitedAt,
|
||||
})
|
||||
}
|
||||
|
||||
type shim struct {
|
||||
bundle *Bundle
|
||||
client *ttrpc.Client
|
||||
@@ -119,19 +178,9 @@ func (s *shim) Shutdown(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (s *shim) waitShutdown(ctx context.Context) error {
|
||||
dead := make(chan struct{})
|
||||
go func() {
|
||||
if err := s.Shutdown(ctx); err != nil {
|
||||
log.G(ctx).WithError(err).Error("shim shutdown error")
|
||||
}
|
||||
close(dead)
|
||||
}()
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
return errors.New("failed to shutdown shim in time")
|
||||
case <-dead:
|
||||
return nil
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
return s.Shutdown(ctx)
|
||||
}
|
||||
|
||||
// ID of the shim/task
|
||||
@@ -154,15 +203,15 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
|
||||
if err != nil {
|
||||
return nil, errdefs.FromGRPC(err)
|
||||
}
|
||||
if err := s.waitShutdown(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.bundle.Delete(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// remove self from the runtime task list
|
||||
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
|
||||
s.rtTasks.Delete(ctx, s.ID())
|
||||
if err := s.waitShutdown(ctx); err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed to shutdown shim")
|
||||
}
|
||||
if err := s.bundle.Delete(); err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed to delete bundle")
|
||||
}
|
||||
return &runtime.Exit{
|
||||
Status: response.ExitStatus,
|
||||
Timestamp: response.ExitedAt,
|
||||
|
||||
Reference in New Issue
Block a user