Cleanup shim manager
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
@@ -63,7 +63,7 @@ func loadAddress(path string) (string, error) {
|
||||
return string(data), nil
|
||||
}
|
||||
|
||||
func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask, err error) {
|
||||
func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstance, err error) {
|
||||
address, err := loadAddress(filepath.Join(bundle.Path, "address"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -117,24 +117,21 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask,
|
||||
client.Close()
|
||||
}
|
||||
}()
|
||||
s := &shimTask{
|
||||
shim: &shim{
|
||||
bundle: bundle,
|
||||
client: client,
|
||||
},
|
||||
task: task.NewTaskClient(client),
|
||||
shim := &shim{
|
||||
bundle: bundle,
|
||||
client: client,
|
||||
}
|
||||
ctx, cancel := timeout.WithContext(ctx, loadTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Check connectivity
|
||||
// Check connectivity, TaskService is the only required service, so create a temp one to check connection.
|
||||
s := newShimTask(shim)
|
||||
if _, err := s.PID(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
return shim, nil
|
||||
}
|
||||
|
||||
func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.NSMap[runtime.Task], events *exchange.Exchange, binaryCall *binary) {
|
||||
func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.NSMap[ShimInstance], events *exchange.Exchange, binaryCall *binary) {
|
||||
ctx = namespaces.WithNamespace(ctx, ns)
|
||||
ctx, cancel := timeout.WithContext(ctx, cleanupTimeout)
|
||||
defer cancel()
|
||||
@@ -186,10 +183,8 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.NSMap[
|
||||
})
|
||||
}
|
||||
|
||||
// ShimProcess represents a shim instance managed by the shim service.
|
||||
type ShimProcess interface {
|
||||
runtime.Process
|
||||
|
||||
// ShimInstance represents running shim process managed by ShimManager.
|
||||
type ShimInstance interface {
|
||||
// ID of the shim.
|
||||
ID() string
|
||||
// Namespace of this shim.
|
||||
@@ -198,6 +193,8 @@ type ShimProcess interface {
|
||||
Bundle() string
|
||||
// Client returns the underlying TTRPC client for this shim.
|
||||
Client() *ttrpc.Client
|
||||
// Delete will close the client and remove bundle from disk.
|
||||
Delete(ctx context.Context) error
|
||||
}
|
||||
|
||||
type shim struct {
|
||||
@@ -205,6 +202,8 @@ type shim struct {
|
||||
client *ttrpc.Client
|
||||
}
|
||||
|
||||
var _ ShimInstance = (*shim)(nil)
|
||||
|
||||
// ID of the shim/task
|
||||
func (s *shim) ID() string {
|
||||
return s.bundle.ID
|
||||
@@ -218,16 +217,16 @@ func (s *shim) Bundle() string {
|
||||
return s.bundle.Path
|
||||
}
|
||||
|
||||
func (s *shim) Close() error {
|
||||
return s.client.Close()
|
||||
func (s *shim) Client() *ttrpc.Client {
|
||||
return s.client
|
||||
}
|
||||
|
||||
func (s *shim) delete(ctx context.Context) error {
|
||||
func (s *shim) Delete(ctx context.Context) error {
|
||||
var (
|
||||
result *multierror.Error
|
||||
)
|
||||
|
||||
if err := s.Close(); err != nil {
|
||||
if err := s.client.Close(); err != nil {
|
||||
result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err))
|
||||
}
|
||||
|
||||
@@ -247,12 +246,15 @@ var _ runtime.Task = &shimTask{}
|
||||
|
||||
// shimTask wraps shim process and adds task service client for compatibility with existing shim manager.
|
||||
type shimTask struct {
|
||||
*shim
|
||||
ShimInstance
|
||||
task task.TaskService
|
||||
}
|
||||
|
||||
func (s *shimTask) Client() *ttrpc.Client {
|
||||
return s.client
|
||||
func newShimTask(shim ShimInstance) *shimTask {
|
||||
return &shimTask{
|
||||
ShimInstance: shim,
|
||||
task: task.NewTaskClient(shim.Client()),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *shimTask) Shutdown(ctx context.Context) error {
|
||||
@@ -319,7 +321,7 @@ func (s *shimTask) delete(ctx context.Context, sandboxed bool, removeTask func(c
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.shim.delete(ctx); err != nil {
|
||||
if err := s.ShimInstance.Delete(ctx); err != nil {
|
||||
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete shim")
|
||||
}
|
||||
|
||||
@@ -345,7 +347,7 @@ func (s *shimTask) Create(ctx context.Context, opts runtime.CreateOpts) (runtime
|
||||
}
|
||||
request := &task.CreateTaskRequest{
|
||||
ID: s.ID(),
|
||||
Bundle: s.bundle.Path,
|
||||
Bundle: s.Bundle(),
|
||||
Stdin: opts.IO.Stdin,
|
||||
Stdout: opts.IO.Stdout,
|
||||
Stderr: opts.IO.Stderr,
|
||||
|
||||
Reference in New Issue
Block a user