Update containerd to 15f19d7a67.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2018-10-08 14:32:00 -07:00
parent e4f33828c3
commit 2f42771f77
24 changed files with 821 additions and 362 deletions

View File

@ -3,8 +3,8 @@ github.com/blang/semver v3.1.0
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895 github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895
github.com/containerd/cgroups 5e610833b72089b37d0e615de9a92dfc043757c2 github.com/containerd/cgroups 5e610833b72089b37d0e615de9a92dfc043757c2
github.com/containerd/console c12b1e7919c14469339a5d38f2f8ed9b64a9de23 github.com/containerd/console c12b1e7919c14469339a5d38f2f8ed9b64a9de23
github.com/containerd/containerd f88d3e5d6dfe9b7d7941ac5241649ad8240b9282 github.com/containerd/containerd 15f19d7a67fa322e6de0ef4c6a1bf9da0f056554
github.com/containerd/continuity 7f53d412b9eb1cbf744c2063185d703a0ee34700 github.com/containerd/continuity bd77b46c8352f74eb12c85bdc01f4b90f69d66b4
github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c
github.com/containerd/go-cni 6d7b509a054a3cb1c35ed1865d4fde2f0cb547cd github.com/containerd/go-cni 6d7b509a054a3cb1c35ed1865d4fde2f0cb547cd
github.com/containerd/go-runc 5a6d9f37cfa36b15efba46dc7ea349fa9b7143c3 github.com/containerd/go-runc 5a6d9f37cfa36b15efba46dc7ea349fa9b7143c3
@ -34,7 +34,7 @@ github.com/hashicorp/go-multierror ed905158d87462226a13fe39ddf685ea65f1c11f
github.com/json-iterator/go 1.1.5 github.com/json-iterator/go 1.1.5
github.com/matttproud/golang_protobuf_extensions v1.0.0 github.com/matttproud/golang_protobuf_extensions v1.0.0
github.com/Microsoft/go-winio v0.4.10 github.com/Microsoft/go-winio v0.4.10
github.com/Microsoft/hcsshim v0.7.4 github.com/Microsoft/hcsshim v0.7.6
github.com/modern-go/concurrent 1.0.3 github.com/modern-go/concurrent 1.0.3
github.com/modern-go/reflect2 1.0.1 github.com/modern-go/reflect2 1.0.1
github.com/opencontainers/go-digest c9281466c8b2f606084ac71339773efd177436e7 github.com/opencontainers/go-digest c9281466c8b2f606084ac71339773efd177436e7

View File

@ -6,6 +6,8 @@ import (
// HNSEndpoint represents a network endpoint in HNS // HNSEndpoint represents a network endpoint in HNS
type HNSEndpoint = hns.HNSEndpoint type HNSEndpoint = hns.HNSEndpoint
// Namespace represents a Compartment.
type Namespace = hns.Namespace
//SystemType represents the type of the system on which actions are done //SystemType represents the type of the system on which actions are done
type SystemType string type SystemType string

View File

@ -92,6 +92,36 @@ func (w *writeCloserWrapper) Close() error {
return nil return nil
} }
type bufferedReader struct {
buf *bufio.Reader
}
func newBufferedReader(r io.Reader) *bufferedReader {
buf := bufioReader32KPool.Get().(*bufio.Reader)
buf.Reset(r)
return &bufferedReader{buf}
}
func (r *bufferedReader) Read(p []byte) (n int, err error) {
if r.buf == nil {
return 0, io.EOF
}
n, err = r.buf.Read(p)
if err == io.EOF {
r.buf.Reset(nil)
bufioReader32KPool.Put(r.buf)
r.buf = nil
}
return
}
func (r *bufferedReader) Peek(n int) ([]byte, error) {
if r.buf == nil {
return nil, io.EOF
}
return r.buf.Peek(n)
}
// DetectCompression detects the compression algorithm of the source. // DetectCompression detects the compression algorithm of the source.
func DetectCompression(source []byte) Compression { func DetectCompression(source []byte) Compression {
for compression, m := range map[Compression][]byte{ for compression, m := range map[Compression][]byte{
@ -110,8 +140,7 @@ func DetectCompression(source []byte) Compression {
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive. // DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
func DecompressStream(archive io.Reader) (DecompressReadCloser, error) { func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
buf := bufioReader32KPool.Get().(*bufio.Reader) buf := newBufferedReader(archive)
buf.Reset(archive)
bs, err := buf.Peek(10) bs, err := buf.Peek(10)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
// Note: we'll ignore any io.EOF error because there are some odd // Note: we'll ignore any io.EOF error because there are some odd
@ -123,15 +152,12 @@ func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
return nil, err return nil, err
} }
closer := func() error {
buf.Reset(nil)
bufioReader32KPool.Put(buf)
return nil
}
switch compression := DetectCompression(bs); compression { switch compression := DetectCompression(bs); compression {
case Uncompressed: case Uncompressed:
readBufWrapper := &readCloserWrapper{buf, compression, closer} return &readCloserWrapper{
return readBufWrapper, nil Reader: buf,
compression: compression,
}, nil
case Gzip: case Gzip:
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
gzReader, err := gzipDecompress(ctx, buf) gzReader, err := gzipDecompress(ctx, buf)
@ -140,12 +166,15 @@ func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
return nil, err return nil, err
} }
readBufWrapper := &readCloserWrapper{gzReader, compression, func() error { return &readCloserWrapper{
Reader: gzReader,
compression: compression,
closer: func() error {
cancel() cancel()
return closer() return gzReader.Close()
}} },
}, nil
return readBufWrapper, nil
default: default:
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension()) return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
} }

View File

@ -79,11 +79,6 @@ func GetResolver(ctx gocontext.Context, clicontext *cli.Context) (remotes.Resolv
secret = rt secret = rt
} }
options.Credentials = func(host string) (string, string, error) {
// Only one host
return username, secret, nil
}
tr := &http.Transport{ tr := &http.Transport{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{ DialContext: (&net.Dialer{
@ -104,5 +99,11 @@ func GetResolver(ctx gocontext.Context, clicontext *cli.Context) (remotes.Resolv
Transport: tr, Transport: tr,
} }
credentials := func(host string) (string, string, error) {
// Only one host
return username, secret, nil
}
options.Authorizer = docker.NewAuthorizer(options.Client, credentials)
return docker.NewResolver(options), nil return docker.NewResolver(options), nil
} }

View File

@ -27,7 +27,7 @@ import (
"github.com/containerd/containerd/cmd/ctr/commands" "github.com/containerd/containerd/cmd/ctr/commands"
"github.com/containerd/containerd/contrib/nvidia" "github.com/containerd/containerd/contrib/nvidia"
"github.com/containerd/containerd/oci" "github.com/containerd/containerd/oci"
specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/urfave/cli" "github.com/urfave/cli"
) )
@ -58,6 +58,7 @@ func NewContainer(ctx gocontext.Context, client *containerd.Client, context *cli
spec containerd.NewContainerOpts spec containerd.NewContainerOpts
) )
cOpts = append(cOpts, containerd.WithContainerLabels(commands.LabelArgs(context.StringSlice("label"))))
if config { if config {
opts = append(opts, oci.WithSpecFromFile(context.String("config"))) opts = append(opts, oci.WithSpecFromFile(context.String("config")))
} else { } else {
@ -98,7 +99,8 @@ func NewContainer(ctx gocontext.Context, client *containerd.Client, context *cli
// Even when "readonly" is set, we don't use KindView snapshot here. (#1495) // Even when "readonly" is set, we don't use KindView snapshot here. (#1495)
// We pass writable snapshot to the OCI runtime, and the runtime remounts it as read-only, // We pass writable snapshot to the OCI runtime, and the runtime remounts it as read-only,
// after creating some mount points on demand. // after creating some mount points on demand.
containerd.WithNewSnapshot(id, image)) containerd.WithNewSnapshot(id, image),
containerd.WithImageStopSignal(image, "SIGTERM"))
} }
if context.Bool("readonly") { if context.Bool("readonly") {
opts = append(opts, oci.WithRootFSReadonly()) opts = append(opts, oci.WithRootFSReadonly())
@ -141,7 +143,6 @@ func NewContainer(ctx gocontext.Context, client *containerd.Client, context *cli
} }
} }
cOpts = append(cOpts, containerd.WithContainerLabels(commands.LabelArgs(context.StringSlice("label"))))
cOpts = append(cOpts, containerd.WithRuntime(context.String("runtime"), nil)) cOpts = append(cOpts, containerd.WithRuntime(context.String("runtime"), nil))
var s specs.Spec var s specs.Spec

View File

@ -18,11 +18,8 @@ package commands
import ( import (
gocontext "context" gocontext "context"
"fmt"
"os" "os"
"os/signal" "os/signal"
"strconv"
"strings"
"syscall" "syscall"
"github.com/containerd/containerd" "github.com/containerd/containerd"
@ -53,23 +50,3 @@ func StopCatch(sigc chan os.Signal) {
signal.Stop(sigc) signal.Stop(sigc)
close(sigc) close(sigc)
} }
// ParseSignal parses a given string into a syscall.Signal
// it checks that the signal exists in the platform-appropriate signalMap
func ParseSignal(rawSignal string) (syscall.Signal, error) {
s, err := strconv.Atoi(rawSignal)
if err == nil {
sig := syscall.Signal(s)
for _, msig := range signalMap {
if sig == msig {
return sig, nil
}
}
return -1, fmt.Errorf("unknown signal %q", rawSignal)
}
signal, ok := signalMap[strings.TrimPrefix(strings.ToUpper(rawSignal), "SIG")]
if !ok {
return -1, fmt.Errorf("unknown signal %q", rawSignal)
}
return signal, nil
}

View File

@ -23,6 +23,8 @@ import (
"github.com/urfave/cli" "github.com/urfave/cli"
) )
const defaultSignal = "SIGTERM"
var killCommand = cli.Command{ var killCommand = cli.Command{
Name: "kill", Name: "kill",
Usage: "signal a container (default: SIGTERM)", Usage: "signal a container (default: SIGTERM)",
@ -30,7 +32,7 @@ var killCommand = cli.Command{
Flags: []cli.Flag{ Flags: []cli.Flag{
cli.StringFlag{ cli.StringFlag{
Name: "signal, s", Name: "signal, s",
Value: "SIGTERM", Value: "",
Usage: "signal to send to the container", Usage: "signal to send to the container",
}, },
cli.StringFlag{ cli.StringFlag{
@ -47,7 +49,7 @@ var killCommand = cli.Command{
if id == "" { if id == "" {
return errors.New("container id must be provided") return errors.New("container id must be provided")
} }
signal, err := commands.ParseSignal(context.String("signal")) signal, err := containerd.ParseSignal(defaultSignal)
if err != nil { if err != nil {
return err return err
} }
@ -74,6 +76,17 @@ var killCommand = cli.Command{
if err != nil { if err != nil {
return err return err
} }
if context.String("signal") != "" {
signal, err = containerd.ParseSignal(context.String("signal"))
if err != nil {
return err
}
} else {
signal, err = containerd.GetStopSignal(ctx, container, signal)
if err != nil {
return err
}
}
task, err := container.Task(ctx, nil) task, err := container.Task(ctx, nil)
if err != nil { if err != nil {
return err return err

View File

@ -76,6 +76,23 @@ func WithContainerLabels(labels map[string]string) NewContainerOpts {
} }
} }
// WithImageStopSignal sets a well-known containerd label (StopSignalLabel)
// on the container for storing the stop signal specified in the OCI image
// config
func WithImageStopSignal(image Image, defaultSignal string) NewContainerOpts {
return func(ctx context.Context, _ *Client, c *containers.Container) error {
if c.Labels == nil {
c.Labels = make(map[string]string)
}
stopSignal, err := GetOCIStopSignal(ctx, image, defaultSignal)
if err != nil {
return err
}
c.Labels[StopSignalLabel] = stopSignal
return nil
}
}
// WithSnapshotter sets the provided snapshotter for use by the container // WithSnapshotter sets the provided snapshotter for use by the container
// //
// This option must appear before other snapshotter options to have an effect. // This option must appear before other snapshotter options to have an effect.

View File

@ -115,6 +115,10 @@ func ImportIndex(ctx context.Context, store content.Store, reader io.Reader) (oc
return idx, nil return idx, nil
} }
if mfsts == nil {
return ocispec.Descriptor{}, errors.Errorf("unrecognized image format")
}
for name, linkname := range symlinks { for name, linkname := range symlinks {
desc, ok := blobs[linkname] desc, ok := blobs[linkname]
if !ok { if !ok {
@ -123,7 +127,11 @@ func ImportIndex(ctx context.Context, store content.Store, reader io.Reader) (oc
blobs[name] = desc blobs[name] = desc
} }
var idx ocispec.Index idx := ocispec.Index{
Versioned: specs.Versioned{
SchemaVersion: 2,
},
}
for _, mfst := range mfsts { for _, mfst := range mfsts {
config, ok := blobs[mfst.Config] config, ok := blobs[mfst.Config]
if !ok { if !ok {

View File

@ -129,6 +129,13 @@ type platformManifest struct {
// Manifest resolves a manifest from the image for the given platform. // Manifest resolves a manifest from the image for the given platform.
// //
// When a manifest descriptor inside of a manifest index does not have
// a platform defined, the platform from the image config is considered.
//
// If the descriptor points to a non-index manifest, then the manifest is
// unmarshalled and returned without considering the platform inside of the
// config.
//
// TODO(stevvooe): This violates the current platform agnostic approach to this // TODO(stevvooe): This violates the current platform agnostic approach to this
// package by returning a specific manifest type. We'll need to refactor this // package by returning a specific manifest type. We'll need to refactor this
// to return a manifest descriptor or decide that we want to bring the API in // to return a manifest descriptor or decide that we want to bring the API in
@ -152,7 +159,7 @@ func Manifest(ctx context.Context, provider content.Provider, image ocispec.Desc
return nil, err return nil, err
} }
if platform != nil { if desc.Digest != image.Digest && platform != nil {
if desc.Platform != nil && !platform.Match(*desc.Platform) { if desc.Platform != nil && !platform.Match(*desc.Platform) {
return nil, nil return nil, nil
} }

View File

@ -553,7 +553,9 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
nw.l.RLock() nw.l.RLock()
defer nw.l.RUnlock() defer nw.l.RUnlock()
return update(ctx, nw.db, func(tx *bolt.Tx) error { var innerErr error
if err := update(ctx, nw.db, func(tx *bolt.Tx) error {
bkt := getIngestsBucket(tx, nw.namespace) bkt := getIngestsBucket(tx, nw.namespace)
if bkt != nil { if bkt != nil {
if err := bkt.DeleteBucket([]byte(nw.ref)); err != nil && err != bolt.ErrBucketNotFound { if err := bkt.DeleteBucket([]byte(nw.ref)); err != nil && err != bolt.ErrBucketNotFound {
@ -562,13 +564,20 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
} }
dgst, err := nw.commit(ctx, tx, size, expected, opts...) dgst, err := nw.commit(ctx, tx, size, expected, opts...)
if err != nil { if err != nil {
if !errdefs.IsAlreadyExists(err) {
return err return err
} }
innerErr = err
}
if err := removeIngestLease(ctx, tx, nw.ref); err != nil { if err := removeIngestLease(ctx, tx, nw.ref); err != nil {
return err return err
} }
return addContentLease(ctx, tx, dgst) return addContentLease(ctx, tx, dgst)
}) }); err != nil {
return err
}
return innerErr
} }
func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) (digest.Digest, error) { func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) (digest.Digest, error) {
@ -611,7 +620,7 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
bkt, err := createBlobBucket(tx, nw.namespace, actual) bkt, err := createBlobBucket(tx, nw.namespace, actual)
if err != nil { if err != nil {
if err == bolt.ErrBucketExists { if err == bolt.ErrBucketExists {
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) return actual, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
} }
return "", err return "", err
} }

View File

@ -654,6 +654,10 @@ func WithUsername(username string) SpecOpts {
// The passed in user can be either a uid or a username. // The passed in user can be either a uid or a username.
func WithAdditionalGIDs(userstr string) SpecOpts { func WithAdditionalGIDs(userstr string) SpecOpts {
return func(ctx context.Context, client Client, c *containers.Container, s *Spec) (err error) { return func(ctx context.Context, client Client, c *containers.Container, s *Spec) (err error) {
// For LCOW additional GID's not supported
if s.Windows != nil {
return nil
}
setProcess(s) setProcess(s)
setAdditionalGids := func(root string) error { setAdditionalGids := func(root string) error {
var username string var username string

View File

@ -0,0 +1,317 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/net/context/ctxhttp"
)
type dockerAuthorizer struct {
credentials func(string) (string, string, error)
client *http.Client
mu sync.Mutex
auth map[string]string
}
// NewAuthorizer creates a Docker authorizer using the provided function to
// get credentials for the token server or basic auth.
func NewAuthorizer(client *http.Client, f func(string) (string, string, error)) Authorizer {
if client == nil {
client = http.DefaultClient
}
return &dockerAuthorizer{
credentials: f,
client: client,
auth: map[string]string{},
}
}
func (a *dockerAuthorizer) Authorize(ctx context.Context, req *http.Request) error {
// TODO: Lookup matching challenge and scope rather than just host
if auth := a.getAuth(req.URL.Host); auth != "" {
req.Header.Set("Authorization", auth)
}
return nil
}
func (a *dockerAuthorizer) AddResponses(ctx context.Context, responses []*http.Response) error {
last := responses[len(responses)-1]
host := last.Request.URL.Host
for _, c := range parseAuthHeader(last.Header) {
if c.scheme == bearerAuth {
if err := invalidAuthorization(c, responses); err != nil {
// TODO: Clear token
a.setAuth(host, "")
return err
}
// TODO(dmcg): Store challenge, not token
// Move token fetching to authorize
if err := a.setTokenAuth(ctx, host, c.parameters); err != nil {
return err
}
return nil
} else if c.scheme == basicAuth {
// TODO: Resolve credentials on authorize
username, secret, err := a.credentials(host)
if err != nil {
return err
}
if username != "" && secret != "" {
auth := username + ":" + secret
a.setAuth(host, fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(auth))))
return nil
}
}
}
return errors.Wrap(errdefs.ErrNotImplemented, "failed to find supported auth scheme")
}
func (a *dockerAuthorizer) getAuth(host string) string {
a.mu.Lock()
defer a.mu.Unlock()
return a.auth[host]
}
func (a *dockerAuthorizer) setAuth(host string, auth string) bool {
a.mu.Lock()
defer a.mu.Unlock()
changed := a.auth[host] != auth
a.auth[host] = auth
return changed
}
func (a *dockerAuthorizer) setTokenAuth(ctx context.Context, host string, params map[string]string) error {
realm, ok := params["realm"]
if !ok {
return errors.New("no realm specified for token auth challenge")
}
realmURL, err := url.Parse(realm)
if err != nil {
return errors.Wrap(err, "invalid token auth challenge realm")
}
to := tokenOptions{
realm: realmURL.String(),
service: params["service"],
}
to.scopes = getTokenScopes(ctx, params)
if len(to.scopes) == 0 {
return errors.Errorf("no scope specified for token auth challenge")
}
if a.credentials != nil {
to.username, to.secret, err = a.credentials(host)
if err != nil {
return err
}
}
var token string
if to.secret != "" {
// Credential information is provided, use oauth POST endpoint
token, err = a.fetchTokenWithOAuth(ctx, to)
if err != nil {
return errors.Wrap(err, "failed to fetch oauth token")
}
} else {
// Do request anonymously
token, err = a.fetchToken(ctx, to)
if err != nil {
return errors.Wrap(err, "failed to fetch anonymous token")
}
}
a.setAuth(host, fmt.Sprintf("Bearer %s", token))
return nil
}
type tokenOptions struct {
realm string
service string
scopes []string
username string
secret string
}
type postTokenResponse struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresIn int `json:"expires_in"`
IssuedAt time.Time `json:"issued_at"`
Scope string `json:"scope"`
}
func (a *dockerAuthorizer) fetchTokenWithOAuth(ctx context.Context, to tokenOptions) (string, error) {
form := url.Values{}
form.Set("scope", strings.Join(to.scopes, " "))
form.Set("service", to.service)
// TODO: Allow setting client_id
form.Set("client_id", "containerd-client")
if to.username == "" {
form.Set("grant_type", "refresh_token")
form.Set("refresh_token", to.secret)
} else {
form.Set("grant_type", "password")
form.Set("username", to.username)
form.Set("password", to.secret)
}
resp, err := ctxhttp.PostForm(ctx, a.client, to.realm, form)
if err != nil {
return "", err
}
defer resp.Body.Close()
// Registries without support for POST may return 404 for POST /v2/token.
// As of September 2017, GCR is known to return 404.
// As of February 2018, JFrog Artifactory is known to return 401.
if (resp.StatusCode == 405 && to.username != "") || resp.StatusCode == 404 || resp.StatusCode == 401 {
return a.fetchToken(ctx, to)
} else if resp.StatusCode < 200 || resp.StatusCode >= 400 {
b, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 64000)) // 64KB
log.G(ctx).WithFields(logrus.Fields{
"status": resp.Status,
"body": string(b),
}).Debugf("token request failed")
// TODO: handle error body and write debug output
return "", errors.Errorf("unexpected status: %s", resp.Status)
}
decoder := json.NewDecoder(resp.Body)
var tr postTokenResponse
if err = decoder.Decode(&tr); err != nil {
return "", fmt.Errorf("unable to decode token response: %s", err)
}
return tr.AccessToken, nil
}
type getTokenResponse struct {
Token string `json:"token"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
IssuedAt time.Time `json:"issued_at"`
RefreshToken string `json:"refresh_token"`
}
// getToken fetches a token using a GET request
func (a *dockerAuthorizer) fetchToken(ctx context.Context, to tokenOptions) (string, error) {
req, err := http.NewRequest("GET", to.realm, nil)
if err != nil {
return "", err
}
reqParams := req.URL.Query()
if to.service != "" {
reqParams.Add("service", to.service)
}
for _, scope := range to.scopes {
reqParams.Add("scope", scope)
}
if to.secret != "" {
req.SetBasicAuth(to.username, to.secret)
}
req.URL.RawQuery = reqParams.Encode()
resp, err := ctxhttp.Do(ctx, a.client, req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
// TODO: handle error body and write debug output
return "", errors.Errorf("unexpected status: %s", resp.Status)
}
decoder := json.NewDecoder(resp.Body)
var tr getTokenResponse
if err = decoder.Decode(&tr); err != nil {
return "", fmt.Errorf("unable to decode token response: %s", err)
}
// `access_token` is equivalent to `token` and if both are specified
// the choice is undefined. Canonicalize `access_token` by sticking
// things in `token`.
if tr.AccessToken != "" {
tr.Token = tr.AccessToken
}
if tr.Token == "" {
return "", ErrNoToken
}
return tr.Token, nil
}
func invalidAuthorization(c challenge, responses []*http.Response) error {
errStr := c.parameters["error"]
if errStr == "" {
return nil
}
n := len(responses)
if n == 1 || (n > 1 && !sameRequest(responses[n-2].Request, responses[n-1].Request)) {
return nil
}
return errors.Wrapf(ErrInvalidAuthorization, "server message: %s", errStr)
}
func sameRequest(r1, r2 *http.Request) bool {
if r1.Method != r2.Method {
return false
}
if *r1.URL != *r2.URL {
return false
}
return true
}

View File

@ -18,18 +18,13 @@ package docker
import ( import (
"context" "context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
"strconv" "strconv"
"strings" "strings"
"sync"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images" "github.com/containerd/containerd/images"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/reference" "github.com/containerd/containerd/reference"
@ -51,19 +46,37 @@ var (
ErrInvalidAuthorization = errors.New("authorization failed") ErrInvalidAuthorization = errors.New("authorization failed")
) )
type dockerResolver struct { // Authorizer is used to authorize HTTP requests based on 401 HTTP responses.
credentials func(string) (string, string, error) // An Authorizer is responsible for caching tokens or credentials used by
host func(string) (string, error) // requests.
plainHTTP bool type Authorizer interface {
client *http.Client // Authorize sets the appropriate `Authorization` header on the given
tracker StatusTracker // request.
//
// If no authorization is found for the request, the request remains
// unmodified. It may also add an `Authorization` header as
// "bearer <some bearer token>"
// "basic <base64 encoded credentials>"
Authorize(context.Context, *http.Request) error
// AddResponses adds a 401 response for the authorizer to consider when
// authorizing requests. The last response should be unauthorized and
// the previous requests are used to consider redirects and retries
// that may have led to the 401.
//
// If response is not handled, returns `ErrNotImplemented`
AddResponses(context.Context, []*http.Response) error
} }
// ResolverOptions are used to configured a new Docker register resolver // ResolverOptions are used to configured a new Docker register resolver
type ResolverOptions struct { type ResolverOptions struct {
// Authorizer is used to authorize registry requests
Authorizer Authorizer
// Credentials provides username and secret given a host. // Credentials provides username and secret given a host.
// If username is empty but a secret is given, that secret // If username is empty but a secret is given, that secret
// is interpretted as a long lived token. // is interpretted as a long lived token.
// Deprecated: use Authorizer
Credentials func(string) (string, string, error) Credentials func(string) (string, string, error)
// Host provides the hostname given a namespace. // Host provides the hostname given a namespace.
@ -89,22 +102,31 @@ func DefaultHost(ns string) (string, error) {
return ns, nil return ns, nil
} }
type dockerResolver struct {
auth Authorizer
host func(string) (string, error)
plainHTTP bool
client *http.Client
tracker StatusTracker
}
// NewResolver returns a new resolver to a Docker registry // NewResolver returns a new resolver to a Docker registry
func NewResolver(options ResolverOptions) remotes.Resolver { func NewResolver(options ResolverOptions) remotes.Resolver {
tracker := options.Tracker if options.Tracker == nil {
if tracker == nil { options.Tracker = NewInMemoryTracker()
tracker = NewInMemoryTracker()
} }
host := options.Host if options.Host == nil {
if host == nil { options.Host = DefaultHost
host = DefaultHost }
if options.Authorizer == nil {
options.Authorizer = NewAuthorizer(options.Client, options.Credentials)
} }
return &dockerResolver{ return &dockerResolver{
credentials: options.Credentials, auth: options.Authorizer,
host: host, host: options.Host,
plainHTTP: options.PlainHTTP, plainHTTP: options.PlainHTTP,
client: options.Client, client: options.Client,
tracker: tracker, tracker: options.Tracker,
} }
} }
@ -273,17 +295,13 @@ type dockerBase struct {
base url.URL base url.URL
client *http.Client client *http.Client
useBasic bool auth Authorizer
username, secret string
token string
mu sync.Mutex
} }
func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) { func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) {
var ( var (
err error err error
base url.URL base url.URL
username, secret string
) )
host := refspec.Hostname() host := refspec.Hostname()
@ -300,13 +318,6 @@ func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) {
base.Scheme = "http" base.Scheme = "http"
} }
if r.credentials != nil {
username, secret, err = r.credentials(base.Host)
if err != nil {
return nil, err
}
}
prefix := strings.TrimPrefix(refspec.Locator, host+"/") prefix := strings.TrimPrefix(refspec.Locator, host+"/")
base.Path = path.Join("/v2", prefix) base.Path = path.Join("/v2", prefix)
@ -314,47 +325,33 @@ func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) {
refspec: refspec, refspec: refspec,
base: base, base: base,
client: r.client, client: r.client,
username: username, auth: r.auth,
secret: secret,
}, nil }, nil
} }
func (r *dockerBase) getToken() string {
r.mu.Lock()
defer r.mu.Unlock()
return r.token
}
func (r *dockerBase) setToken(token string) bool {
r.mu.Lock()
defer r.mu.Unlock()
changed := r.token != token
r.token = token
return changed
}
func (r *dockerBase) url(ps ...string) string { func (r *dockerBase) url(ps ...string) string {
url := r.base url := r.base
url.Path = path.Join(url.Path, path.Join(ps...)) url.Path = path.Join(url.Path, path.Join(ps...))
return url.String() return url.String()
} }
func (r *dockerBase) authorize(req *http.Request) { func (r *dockerBase) authorize(ctx context.Context, req *http.Request) error {
token := r.getToken() // Check if has header for host
if r.useBasic { if r.auth != nil {
req.SetBasicAuth(r.username, r.secret) if err := r.auth.Authorize(ctx, req); err != nil {
} else if token != "" { return err
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
} }
} }
return nil
}
func (r *dockerBase) doRequest(ctx context.Context, req *http.Request) (*http.Response, error) { func (r *dockerBase) doRequest(ctx context.Context, req *http.Request) (*http.Response, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", req.URL.String())) ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", req.URL.String()))
log.G(ctx).WithField("request.headers", req.Header).WithField("request.method", req.Method).Debug("do request") log.G(ctx).WithField("request.headers", req.Header).WithField("request.method", req.Method).Debug("do request")
r.authorize(req) if err := r.authorize(ctx, req); err != nil {
return nil, errors.Wrap(err, "failed to authorize")
}
resp, err := ctxhttp.Do(ctx, r.client, req) resp, err := ctxhttp.Do(ctx, r.client, req)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to do request") return nil, errors.Wrap(err, "failed to do request")
@ -392,23 +389,14 @@ func (r *dockerBase) retryRequest(ctx context.Context, req *http.Request, respon
last := responses[len(responses)-1] last := responses[len(responses)-1]
if last.StatusCode == http.StatusUnauthorized { if last.StatusCode == http.StatusUnauthorized {
log.G(ctx).WithField("header", last.Header.Get("WWW-Authenticate")).Debug("Unauthorized") log.G(ctx).WithField("header", last.Header.Get("WWW-Authenticate")).Debug("Unauthorized")
for _, c := range parseAuthHeader(last.Header) { if r.auth != nil {
if c.scheme == bearerAuth { if err := r.auth.AddResponses(ctx, responses); err == nil {
if err := invalidAuthorization(c, responses); err != nil { return copyRequest(req)
r.setToken("") } else if !errdefs.IsNotImplemented(err) {
return nil, err return nil, err
} }
if err := r.setTokenAuth(ctx, c.parameters); err != nil {
return nil, err
}
return copyRequest(req)
} else if c.scheme == basicAuth {
if r.username != "" && r.secret != "" {
r.useBasic = true
}
return copyRequest(req)
}
} }
return nil, nil return nil, nil
} else if last.StatusCode == http.StatusMethodNotAllowed && req.Method == http.MethodHead { } else if last.StatusCode == http.StatusMethodNotAllowed && req.Method == http.MethodHead {
// Support registries which have not properly implemented the HEAD method for // Support registries which have not properly implemented the HEAD method for
@ -424,30 +412,6 @@ func (r *dockerBase) retryRequest(ctx context.Context, req *http.Request, respon
return nil, nil return nil, nil
} }
func invalidAuthorization(c challenge, responses []*http.Response) error {
errStr := c.parameters["error"]
if errStr == "" {
return nil
}
n := len(responses)
if n == 1 || (n > 1 && !sameRequest(responses[n-2].Request, responses[n-1].Request)) {
return nil
}
return errors.Wrapf(ErrInvalidAuthorization, "server message: %s", errStr)
}
func sameRequest(r1, r2 *http.Request) bool {
if r1.Method != r2.Method {
return false
}
if *r1.URL != *r2.URL {
return false
}
return true
}
func copyRequest(req *http.Request) (*http.Request, error) { func copyRequest(req *http.Request) (*http.Request, error) {
ireq := *req ireq := *req
if ireq.GetBody != nil { if ireq.GetBody != nil {
@ -459,167 +423,3 @@ func copyRequest(req *http.Request) (*http.Request, error) {
} }
return &ireq, nil return &ireq, nil
} }
func (r *dockerBase) setTokenAuth(ctx context.Context, params map[string]string) error {
realm, ok := params["realm"]
if !ok {
return errors.New("no realm specified for token auth challenge")
}
realmURL, err := url.Parse(realm)
if err != nil {
return fmt.Errorf("invalid token auth challenge realm: %s", err)
}
to := tokenOptions{
realm: realmURL.String(),
service: params["service"],
}
to.scopes = getTokenScopes(ctx, params)
if len(to.scopes) == 0 {
return errors.Errorf("no scope specified for token auth challenge")
}
var token string
if r.secret != "" {
// Credential information is provided, use oauth POST endpoint
token, err = r.fetchTokenWithOAuth(ctx, to)
if err != nil {
return errors.Wrap(err, "failed to fetch oauth token")
}
} else {
// Do request anonymously
token, err = r.fetchToken(ctx, to)
if err != nil {
return errors.Wrap(err, "failed to fetch anonymous token")
}
}
r.setToken(token)
return nil
}
type tokenOptions struct {
realm string
service string
scopes []string
}
type postTokenResponse struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresIn int `json:"expires_in"`
IssuedAt time.Time `json:"issued_at"`
Scope string `json:"scope"`
}
func (r *dockerBase) fetchTokenWithOAuth(ctx context.Context, to tokenOptions) (string, error) {
form := url.Values{}
form.Set("scope", strings.Join(to.scopes, " "))
form.Set("service", to.service)
// TODO: Allow setting client_id
form.Set("client_id", "containerd-dist-tool")
if r.username == "" {
form.Set("grant_type", "refresh_token")
form.Set("refresh_token", r.secret)
} else {
form.Set("grant_type", "password")
form.Set("username", r.username)
form.Set("password", r.secret)
}
resp, err := ctxhttp.PostForm(ctx, r.client, to.realm, form)
if err != nil {
return "", err
}
defer resp.Body.Close()
// Registries without support for POST may return 404 for POST /v2/token.
// As of September 2017, GCR is known to return 404.
// As of February 2018, JFrog Artifactory is known to return 401.
if (resp.StatusCode == 405 && r.username != "") || resp.StatusCode == 404 || resp.StatusCode == 401 {
return r.fetchToken(ctx, to)
} else if resp.StatusCode < 200 || resp.StatusCode >= 400 {
b, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 64000)) // 64KB
log.G(ctx).WithFields(logrus.Fields{
"status": resp.Status,
"body": string(b),
}).Debugf("token request failed")
// TODO: handle error body and write debug output
return "", errors.Errorf("unexpected status: %s", resp.Status)
}
decoder := json.NewDecoder(resp.Body)
var tr postTokenResponse
if err = decoder.Decode(&tr); err != nil {
return "", fmt.Errorf("unable to decode token response: %s", err)
}
return tr.AccessToken, nil
}
type getTokenResponse struct {
Token string `json:"token"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
IssuedAt time.Time `json:"issued_at"`
RefreshToken string `json:"refresh_token"`
}
// getToken fetches a token using a GET request
func (r *dockerBase) fetchToken(ctx context.Context, to tokenOptions) (string, error) {
req, err := http.NewRequest("GET", to.realm, nil)
if err != nil {
return "", err
}
reqParams := req.URL.Query()
if to.service != "" {
reqParams.Add("service", to.service)
}
for _, scope := range to.scopes {
reqParams.Add("scope", scope)
}
if r.secret != "" {
req.SetBasicAuth(r.username, r.secret)
}
req.URL.RawQuery = reqParams.Encode()
resp, err := ctxhttp.Do(ctx, r.client, req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
// TODO: handle error body and write debug output
return "", errors.Errorf("unexpected status: %s", resp.Status)
}
decoder := json.NewDecoder(resp.Body)
var tr getTokenResponse
if err = decoder.Decode(&tr); err != nil {
return "", fmt.Errorf("unable to decode token response: %s", err)
}
// `access_token` is equivalent to `token` and if both are specified
// the choice is undefined. Canonicalize `access_token` by sticking
// things in `token`.
if tr.AccessToken != "" {
tr.Token = tr.AccessToken
}
if tr.Token == "" {
return "", ErrNoToken
}
return tr.Token, nil
}

View File

@ -21,6 +21,7 @@ import (
"context" "context"
"io" "io"
"os" "os"
gruntime "runtime"
"strings" "strings"
eventstypes "github.com/containerd/containerd/api/events" eventstypes "github.com/containerd/containerd/api/events"
@ -31,6 +32,7 @@ import (
"github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/ttrpc" "github.com/containerd/ttrpc"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus"
) )
func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary { func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary {
@ -52,13 +54,18 @@ type binary struct {
} }
func (b *binary) Start(ctx context.Context) (_ *shim, err error) { func (b *binary) Start(ctx context.Context) (_ *shim, err error) {
args := []string{"-id", b.bundle.ID}
if logrus.GetLevel() == logrus.DebugLevel {
args = append(args, "-debug")
}
args = append(args, "start")
cmd, err := client.Command( cmd, err := client.Command(
ctx, ctx,
b.runtime, b.runtime,
b.containerdAddress, b.containerdAddress,
b.bundle.Path, b.bundle.Path,
"-id", b.bundle.ID, args...,
"start",
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -103,7 +110,22 @@ func (b *binary) Start(ctx context.Context) (_ *shim, err error) {
func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) { func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
log.G(ctx).Info("cleaning up dead shim") log.G(ctx).Info("cleaning up dead shim")
cmd, err := client.Command(ctx, b.runtime, b.containerdAddress, b.bundle.Path, "-id", b.bundle.ID, "delete")
// Windows cannot delete the current working directory while an
// executable is in use with it. For the cleanup case we invoke with the
// default work dir and forward the bundle path on the cmdline.
var bundlePath string
if gruntime.GOOS != "windows" {
bundlePath = b.bundle.Path
}
cmd, err := client.Command(ctx,
b.runtime,
b.containerdAddress,
bundlePath,
"-id", b.bundle.ID,
"-bundle", b.bundle.Path,
"delete")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -53,11 +53,21 @@ type Shim interface {
StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error)
} }
// OptsKey is the context key for the Opts value.
type OptsKey struct{}
// Opts are context options associated with the shim invocation.
type Opts struct {
BundlePath string
Debug bool
}
var ( var (
debugFlag bool debugFlag bool
idFlag string idFlag string
namespaceFlag string namespaceFlag string
socketFlag string socketFlag string
bundlePath string
addressFlag string addressFlag string
containerdBinaryFlag string containerdBinaryFlag string
action string action string
@ -68,6 +78,7 @@ func parseFlags() {
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim") flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
flag.StringVar(&idFlag, "id", "", "id of the task") flag.StringVar(&idFlag, "id", "", "id of the task")
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve") flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd") flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)") flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
@ -133,6 +144,7 @@ func run(id string, initFunc Init) error {
return fmt.Errorf("shim namespace cannot be empty") return fmt.Errorf("shim namespace cannot be empty")
} }
ctx := namespaces.WithNamespace(context.Background(), namespaceFlag) ctx := namespaces.WithNamespace(context.Background(), namespaceFlag)
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id)) ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id))
service, err := initFunc(ctx, idFlag, publisher) service, err := initFunc(ctx, idFlag, publisher)

View File

@ -27,6 +27,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"sync" "sync"
"syscall"
"unsafe" "unsafe"
winio "github.com/Microsoft/go-winio" winio "github.com/Microsoft/go-winio"
@ -39,6 +40,10 @@ import (
"golang.org/x/sys/windows" "golang.org/x/sys/windows"
) )
const (
errorConnectionAborted syscall.Errno = 1236
)
// setupSignals creates a new signal handler for all signals // setupSignals creates a new signal handler for all signals
func setupSignals() (chan os.Signal, error) { func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 32) signals := make(chan os.Signal, 32)
@ -119,21 +124,150 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
} }
} }
var _ = (io.WriterTo)(&blockingBuffer{})
var _ = (io.Writer)(&blockingBuffer{})
// blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once
// `capacity` is reached the calls to `Write` will block until a successful call
// to `WriterTo` frees up the buffer space.
//
// Note: This has the same threadding semantics as bytes.Buffer with no
// additional locking so multithreading is not supported.
type blockingBuffer struct {
c *sync.Cond
capacity int
buffer bytes.Buffer
}
func newBlockingBuffer(capacity int) *blockingBuffer {
return &blockingBuffer{
c: sync.NewCond(&sync.Mutex{}),
capacity: capacity,
}
}
func (bb *blockingBuffer) Len() int {
bb.c.L.Lock()
defer bb.c.L.Unlock()
return bb.buffer.Len()
}
func (bb *blockingBuffer) Write(p []byte) (int, error) {
if len(p) > bb.capacity {
return 0, errors.Errorf("len(p) (%d) too large for capacity (%d)", len(p), bb.capacity)
}
bb.c.L.Lock()
for bb.buffer.Len()+len(p) > bb.capacity {
bb.c.Wait()
}
defer bb.c.L.Unlock()
return bb.buffer.Write(p)
}
func (bb *blockingBuffer) WriteTo(w io.Writer) (int64, error) {
bb.c.L.Lock()
defer bb.c.L.Unlock()
defer bb.c.Signal()
return bb.buffer.WriteTo(w)
}
// deferredShimWriteLogger exists to solve the upstream loggin issue presented
// by using Windows Named Pipes for logging. When containerd restarts it tries
// to reconnect to any shims. This means that the connection to the logger will
// be severed but when containerd starts up it should reconnect and start
// logging again. We abstract all of this logic behind what looks like a simple
// `io.Writer` that can reconnect in the lifetime and buffers logs while
// disconnected.
type deferredShimWriteLogger struct { type deferredShimWriteLogger struct {
mu sync.Mutex
ctx context.Context ctx context.Context
wg sync.WaitGroup connected bool
aborted bool
buffer *blockingBuffer
l net.Listener
c net.Conn c net.Conn
conerr error conerr error
} }
// beginAccept issues an accept to wait for a connection. Once a conneciton
// occurs drains any outstanding buffer. While draining the buffer any writes
// are blocked. If the buffer fails to fully drain due to a connection drop a
// call to `beginAccept` is re-issued waiting for another connection from
// containerd.
func (dswl *deferredShimWriteLogger) beginAccept() {
dswl.mu.Lock()
if dswl.connected {
return
}
dswl.mu.Unlock()
c, err := dswl.l.Accept()
if err == errorConnectionAborted {
dswl.mu.Lock()
dswl.aborted = true
dswl.l.Close()
dswl.conerr = errors.New("connection closed")
dswl.mu.Unlock()
return
}
dswl.mu.Lock()
dswl.connected = true
dswl.c = c
// Drain the buffer
if dswl.buffer.Len() > 0 {
_, err := dswl.buffer.WriteTo(dswl.c)
if err != nil {
// We lost our connection draining the buffer.
dswl.connected = false
dswl.c.Close()
go dswl.beginAccept()
}
}
dswl.mu.Unlock()
}
func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) { func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) {
dswl.wg.Wait() dswl.mu.Lock()
if dswl.c == nil { defer dswl.mu.Unlock()
if dswl.aborted {
return 0, dswl.conerr return 0, dswl.conerr
} }
return dswl.c.Write(p)
if dswl.connected {
// We have a connection. beginAccept would have drained the buffer so we just write our data to
// the connection directly.
written, err := dswl.c.Write(p)
if err != nil {
// We lost the connection.
dswl.connected = false
dswl.c.Close()
go dswl.beginAccept()
// We weren't able to write the full `p` bytes. Buffer the rest
if written != len(p) {
w, err := dswl.buffer.Write(p[written:])
if err != nil {
// We failed to buffer. Return this error
return written + w, err
}
written += w
}
}
return written, nil
}
// We are disconnected. Buffer the contents.
return dswl.buffer.Write(p)
} }
// openLog on Windows acts as the server of the log pipe. This allows the // openLog on Windows acts as the server of the log pipe. This allows the
@ -143,26 +277,17 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
dswl := &deferredShimWriteLogger{
ctx: ctx,
buffer: newBlockingBuffer(64 * 1024), // 64KB,
}
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil) l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
dswl := &deferredShimWriteLogger{ dswl.l = l
ctx: ctx, go dswl.beginAccept()
}
// TODO: JTERRY75 - this will not work with restarts. Only the first
// connection will work and all +1 connections will return 'use of closed
// network connection'. Make this reconnect aware.
dswl.wg.Add(1)
go func() {
c, conerr := l.Accept()
if conerr != nil {
l.Close()
dswl.conerr = conerr
}
dswl.c = c
dswl.wg.Done()
}()
return dswl, nil return dswl, nil
} }

View File

@ -51,11 +51,12 @@ func SocketAddress(ctx context.Context, id string) (string, error) {
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) { func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
var c net.Conn var c net.Conn
var lastError error var lastError error
timedOutError := errors.Errorf("timed out waiting for npipe %s", address)
start := time.Now() start := time.Now()
for { for {
remaining := timeout - time.Now().Sub(start) remaining := timeout - time.Now().Sub(start)
if remaining <= 0 { if remaining <= 0 {
lastError = errors.Errorf("timed out waiting for npipe %s", address) lastError = timedOutError
break break
} }
c, lastError = winio.DialPipe(address, &remaining) c, lastError = winio.DialPipe(address, &remaining)
@ -65,6 +66,15 @@ func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
if !os.IsNotExist(lastError) { if !os.IsNotExist(lastError) {
break break
} }
// There is nobody serving the pipe. We limit the timeout for this case
// to 5 seconds because any shim that would serve this endpoint should
// serve it within 5 seconds. We use the passed in timeout for the
// `DialPipe` timeout if the pipe exists however to give the pipe time
// to `Accept` the connection.
if time.Now().Sub(start) >= 5*time.Second {
lastError = timedOutError
break
}
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
} }
return c, lastError return c, lastError

View File

@ -14,7 +14,7 @@
limitations under the License. limitations under the License.
*/ */
package commands package containerd
import ( import (
"syscall" "syscall"

View File

@ -16,7 +16,7 @@
limitations under the License. limitations under the License.
*/ */
package commands package containerd
import ( import (
"syscall" "syscall"

View File

@ -14,7 +14,7 @@
limitations under the License. limitations under the License.
*/ */
package commands package containerd
import ( import (
"syscall" "syscall"

105
vendor/github.com/containerd/containerd/signals.go generated vendored Normal file
View File

@ -0,0 +1,105 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package containerd
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"syscall"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/opencontainers/image-spec/specs-go/v1"
)
// StopSignalLabel is a well-known containerd label for storing the stop
// signal specified in the OCI image config
const StopSignalLabel = "io.containerd.image.config.stop-signal"
// GetStopSignal retrieves the container stop signal, specified by the
// well-known containerd label (StopSignalLabel)
func GetStopSignal(ctx context.Context, container Container, defaultSignal syscall.Signal) (syscall.Signal, error) {
labels, err := container.Labels(ctx)
if err != nil {
return -1, err
}
if stopSignal, ok := labels[StopSignalLabel]; ok {
return ParseSignal(stopSignal)
}
return defaultSignal, nil
}
// GetOCIStopSignal retrieves the stop signal specified in the OCI image config
func GetOCIStopSignal(ctx context.Context, image Image, defaultSignal string) (string, error) {
_, err := ParseSignal(defaultSignal)
if err != nil {
return "", err
}
ic, err := image.Config(ctx)
if err != nil {
return "", err
}
var (
ociimage v1.Image
config v1.ImageConfig
)
switch ic.MediaType {
case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config:
p, err := content.ReadBlob(ctx, image.ContentStore(), ic)
if err != nil {
return "", err
}
if err := json.Unmarshal(p, &ociimage); err != nil {
return "", err
}
config = ociimage.Config
default:
return "", fmt.Errorf("unknown image config media type %s", ic.MediaType)
}
if config.StopSignal == "" {
return defaultSignal, nil
}
return config.StopSignal, nil
}
// ParseSignal parses a given string into a syscall.Signal
// it checks that the signal exists in the platform-appropriate signalMap
func ParseSignal(rawSignal string) (syscall.Signal, error) {
s, err := strconv.Atoi(rawSignal)
if err == nil {
sig := syscall.Signal(s)
for _, msig := range signalMap {
if sig == msig {
return sig, nil
}
}
return -1, fmt.Errorf("unknown signal %q", rawSignal)
}
signal, ok := signalMap[strings.TrimPrefix(strings.ToUpper(rawSignal), "SIG")]
if !ok {
return -1, fmt.Errorf("unknown signal %q", rawSignal)
}
return signal, nil
}

View File

@ -4,7 +4,7 @@ github.com/containerd/cgroups 5e610833b72089b37d0e615de9a92dfc043757c2
github.com/containerd/typeurl a93fcdb778cd272c6e9b3028b2f42d813e785d40 github.com/containerd/typeurl a93fcdb778cd272c6e9b3028b2f42d813e785d40
github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c
github.com/containerd/btrfs 2e1aa0ddf94f91fa282b6ed87c23bf0d64911244 github.com/containerd/btrfs 2e1aa0ddf94f91fa282b6ed87c23bf0d64911244
github.com/containerd/continuity 7f53d412b9eb1cbf744c2063185d703a0ee34700 github.com/containerd/continuity bd77b46c8352f74eb12c85bdc01f4b90f69d66b4
github.com/coreos/go-systemd 48702e0da86bd25e76cfef347e2adeb434a0d0a6 github.com/coreos/go-systemd 48702e0da86bd25e76cfef347e2adeb434a0d0a6
github.com/docker/go-metrics 4ea375f7759c82740c893fc030bc37088d2ec098 github.com/docker/go-metrics 4ea375f7759c82740c893fc030bc37088d2ec098
github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9 github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9
@ -33,7 +33,7 @@ golang.org/x/sync 450f422ab23cf9881c94e2db30cac0eb1b7cf80c
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895 github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895
github.com/grpc-ecosystem/go-grpc-prometheus 6b7015e65d366bf3f19b2b2a000a831940f0f7e0 github.com/grpc-ecosystem/go-grpc-prometheus 6b7015e65d366bf3f19b2b2a000a831940f0f7e0
github.com/Microsoft/go-winio v0.4.10 github.com/Microsoft/go-winio v0.4.10
github.com/Microsoft/hcsshim v0.7.4 github.com/Microsoft/hcsshim v0.7.6
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944 google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4 golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4
github.com/containerd/ttrpc 2a805f71863501300ae1976d29f0454ae003e85a github.com/containerd/ttrpc 2a805f71863501300ae1976d29f0454ae003e85a
@ -43,7 +43,7 @@ github.com/google/go-cmp v0.1.0
go.etcd.io/bbolt v1.3.1-etcd.8 go.etcd.io/bbolt v1.3.1-etcd.8
# cri dependencies # cri dependencies
github.com/containerd/cri 9f39e3289533fc228c5e5fcac0a6dbdd60c6047b # release/1.2 branch github.com/containerd/cri 8506fe836677cc3bb23a16b68145128243d843b5 # release/1.2 branch
github.com/containerd/go-cni 6d7b509a054a3cb1c35ed1865d4fde2f0cb547cd github.com/containerd/go-cni 6d7b509a054a3cb1c35ed1865d4fde2f0cb547cd
github.com/blang/semver v3.1.0 github.com/blang/semver v3.1.0
github.com/containernetworking/cni v0.6.0 github.com/containernetworking/cni v0.6.0
@ -73,12 +73,12 @@ golang.org/x/oauth2 a6bd8cefa1811bd24b86f8902872e4e8225f74c4
golang.org/x/time f51c12702a4d776e4c1fa9b0fabab841babae631 golang.org/x/time f51c12702a4d776e4c1fa9b0fabab841babae631
gopkg.in/inf.v0 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 gopkg.in/inf.v0 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4
gopkg.in/yaml.v2 v2.2.1 gopkg.in/yaml.v2 v2.2.1
k8s.io/api 012f271b5d41baad56190c5f1ae19bff16df0fd8 k8s.io/api kubernetes-1.12.0
k8s.io/apimachinery 6429050ef506887d121f3e7306e894f8900d8a63 k8s.io/apimachinery kubernetes-1.12.0
k8s.io/apiserver e9312c15296b6c2c923ebd5031ff5d1d5fd022d7 k8s.io/apiserver kubernetes-1.12.0
k8s.io/client-go 37c3c02ec96533daec0dbda1f39a6b1d68505c79 k8s.io/client-go kubernetes-1.12.0
k8s.io/kubernetes v1.12.0-beta.1 k8s.io/kubernetes v1.12.0
k8s.io/utils 982821ea41da7e7c15f3d3738921eb2e7e241ccd k8s.io/utils cd34563cd63c2bd7c6fe88a73c4dcf34ed8a67cb
# zfs dependencies # zfs dependencies
github.com/containerd/zfs 9a0b8b8b5982014b729cd34eb7cd7a11062aa6ec github.com/containerd/zfs 9a0b8b8b5982014b729cd34eb7cd7a11062aa6ec

View File

@ -21,7 +21,7 @@ var (
Package = "github.com/containerd/containerd" Package = "github.com/containerd/containerd"
// Version holds the complete version number. Filled in at linking time. // Version holds the complete version number. Filled in at linking time.
Version = "1.2.0-rc.0+unknown" Version = "1.2.0-rc.1+unknown"
// Revision is filled with the VCS (e.g. git) revision being used to build // Revision is filled with the VCS (e.g. git) revision being used to build
// the program at linking time. // the program at linking time.