Add containers streaming API
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
@@ -18,6 +18,7 @@ package containers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
@@ -31,6 +32,7 @@ import (
|
||||
ptypes "github.com/gogo/protobuf/types"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
grpcm "google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
@@ -78,18 +80,30 @@ func (l *local) Get(ctx context.Context, req *api.GetContainerRequest, _ ...grpc
|
||||
|
||||
func (l *local) List(ctx context.Context, req *api.ListContainersRequest, _ ...grpc.CallOption) (*api.ListContainersResponse, error) {
|
||||
var resp api.ListContainersResponse
|
||||
|
||||
return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error {
|
||||
containers, err := store.List(ctx, req.Filters...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp.Containers = containersToProto(containers)
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
func (l *local) ListStream(ctx context.Context, req *api.ListContainersRequest, _ ...grpc.CallOption) (api.Containers_ListStreamClient, error) {
|
||||
stream := &localStream{
|
||||
ctx: ctx,
|
||||
}
|
||||
return stream, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error {
|
||||
containers, err := store.List(ctx, req.Filters...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stream.containers = containersToProto(containers)
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) {
|
||||
var resp api.CreateContainerResponse
|
||||
|
||||
@@ -188,3 +202,44 @@ func (l *local) withStoreView(ctx context.Context, fn func(ctx context.Context,
|
||||
func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
|
||||
return l.db.Update(l.withStore(ctx, fn))
|
||||
}
|
||||
|
||||
type localStream struct {
|
||||
ctx context.Context
|
||||
containers []api.Container
|
||||
i int
|
||||
}
|
||||
|
||||
func (s *localStream) Recv() (*api.ListContainerMessage, error) {
|
||||
if s.i >= len(s.containers) {
|
||||
return nil, io.EOF
|
||||
}
|
||||
c := s.containers[s.i]
|
||||
s.i++
|
||||
return &api.ListContainerMessage{
|
||||
Container: &c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *localStream) Context() context.Context {
|
||||
return s.ctx
|
||||
}
|
||||
|
||||
func (s *localStream) CloseSend() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *localStream) Header() (grpcm.MD, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *localStream) Trailer() grpcm.MD {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *localStream) SendMsg(m interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *localStream) RecvMsg(m interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user