StorageOS update api library

This commit is contained in:
Sunny
2018-09-14 19:36:57 +05:30
parent 8b98e802ed
commit a4cc754fb4
33 changed files with 795 additions and 1695 deletions

View File

@@ -3,27 +3,32 @@ package storageos
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"github.com/storageos/go-api/netutil"
"github.com/storageos/go-api/serror"
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/storageos/go-api/netutil"
"github.com/storageos/go-api/serror"
)
const (
userAgent = "go-storageosclient"
// DefaultUserAgent is the default User-Agent header to include in HTTP requests.
DefaultUserAgent = "go-storageosclient"
// DefaultVersionStr is the string value of the default API version.
DefaultVersionStr = "1"
DefaultVersion = 1
// DefaultVersion is the default API version.
DefaultVersion = 1
)
var (
@@ -36,14 +41,17 @@ var (
// ErrInvalidVersion is returned when a versioned client was requested but no version specified.
ErrInvalidVersion = errors.New("invalid version")
// DefaultPort is the default API port
// ErrProxyNotSupported is returned when a client is unable to set a proxy for http requests.
ErrProxyNotSupported = errors.New("client does not support http proxy")
// DefaultPort is the default API port.
DefaultPort = "5705"
// DataplaneHealthPort is the the port used by the dataplane health-check service
// DataplaneHealthPort is the the port used by the dataplane health-check service.
DataplaneHealthPort = "5704"
// DefaultHost is the default API host
DefaultHost = "tcp://localhost:" + DefaultPort
// DefaultHost is the default API host.
DefaultHost = "http://localhost:" + DefaultPort
)
// APIVersion is an internal representation of a version of the Remote API.
@@ -70,16 +78,21 @@ func (version APIVersion) String() string {
// Client is the basic type of this package. It provides methods for
// interaction with the API.
type Client struct {
httpClient *http.Client
addresses []string
username string
secret string
userAgent string
configLock *sync.RWMutex // Lock for config changes
addressLock *sync.Mutex // Lock used to copy/update the address slice
requestedAPIVersion APIVersion
serverAPIVersion APIVersion
expectedAPIVersion APIVersion
SkipServerVersionCheck bool
HTTPClient *http.Client
TLSConfig *tls.Config
username string
secret string
requestedAPIVersion APIVersion
serverAPIVersion APIVersion
expectedAPIVersion APIVersion
nativeHTTPClient *http.Client
useTLS bool
}
// ClientVersion returns the API version of the client
@@ -103,6 +116,7 @@ func NewClient(nodes string) (*Client, error) {
return nil, err
}
client.SkipServerVersionCheck = true
client.userAgent = DefaultUserAgent
return client, nil
}
@@ -110,22 +124,24 @@ func NewClient(nodes string) (*Client, error) {
// the given server endpoint, using a specific remote API version.
func NewVersionedClient(nodestring string, apiVersionString string) (*Client, error) {
nodes := strings.Split(nodestring, ",")
d, err := netutil.NewMultiDialer(nodes, nil)
addresses, err := netutil.AddressesFromNodes(nodes)
if err != nil {
return nil, err
}
var useTLS bool
if len(nodes) > 0 {
if u, err := url.Parse(nodes[0]); err != nil && u.Scheme == "https" {
useTLS = true
}
if len(addresses) > 1 {
// Shuffle returned addresses in attempt to spread the load
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
rnd.Shuffle(len(addresses), func(i, j int) {
addresses[i], addresses[j] = addresses[j], addresses[i]
})
}
c := &Client{
HTTPClient: defaultClient(d),
useTLS: useTLS,
client := &Client{
httpClient: defaultClient(),
addresses: addresses,
configLock: &sync.RWMutex{},
addressLock: &sync.Mutex{},
}
if apiVersionString != "" {
@@ -133,15 +149,24 @@ func NewVersionedClient(nodestring string, apiVersionString string) (*Client, er
if err != nil {
return nil, err
}
c.requestedAPIVersion = APIVersion(version)
client.requestedAPIVersion = APIVersion(version)
}
return c, nil
return client, nil
}
// SetUserAgent sets the client useragent.
func (c *Client) SetUserAgent(useragent string) {
c.configLock.Lock()
defer c.configLock.Unlock()
c.userAgent = useragent
}
// SetAuth sets the API username and secret to be used for all API requests.
// It should not be called concurrently with any other Client methods.
func (c *Client) SetAuth(username string, secret string) {
c.configLock.Lock()
defer c.configLock.Unlock()
if username != "" {
c.username = username
}
@@ -150,15 +175,31 @@ func (c *Client) SetAuth(username string, secret string) {
}
}
// SetProxy will set the proxy URL for both the HTTPClient.
// If the transport method does not support usage
// of proxies, an error will be returned.
func (c *Client) SetProxy(proxy *url.URL) error {
c.configLock.Lock()
defer c.configLock.Unlock()
if client := c.httpClient; client != nil {
transport, supported := client.Transport.(*http.Transport)
if !supported {
return ErrProxyNotSupported
}
transport.Proxy = http.ProxyURL(proxy)
}
return nil
}
// SetTimeout takes a timeout and applies it to both the HTTPClient and
// nativeHTTPClient. It should not be called concurrently with any other Client
// methods.
func (c *Client) SetTimeout(t time.Duration) {
if c.HTTPClient != nil {
c.HTTPClient.Timeout = t
}
if c.nativeHTTPClient != nil {
c.nativeHTTPClient.Timeout = t
c.configLock.Lock()
defer c.configLock.Unlock()
if c.httpClient != nil {
c.httpClient.Timeout = t
}
}
@@ -171,6 +212,8 @@ func (c *Client) checkAPIVersion() error {
if err != nil {
return err
}
c.configLock.Lock()
defer c.configLock.Unlock()
if c.requestedAPIVersion == 0 {
c.expectedAPIVersion = c.serverAPIVersion
} else {
@@ -191,8 +234,7 @@ func (c *Client) Ping() error {
if resp.StatusCode != http.StatusOK {
return newError(resp)
}
resp.Body.Close()
return nil
return resp.Body.Close()
}
func (c *Client) getServerAPIVersionString() (version string, err error) {
@@ -204,16 +246,19 @@ func (c *Client) getServerAPIVersionString() (version string, err error) {
}
type doOptions struct {
data interface{}
context context.Context
data interface{}
values url.Values
headers map[string]string
fieldSelector string
labelSelector string
namespace string
forceJSON bool
force bool
values url.Values
headers map[string]string
unversioned bool
context context.Context
forceJSON bool
force bool
unversioned bool
}
func (c *Client) do(method, urlpath string, doOptions doOptions) (*http.Response, error) {
@@ -247,76 +292,118 @@ func (c *Client) do(method, urlpath string, doOptions doOptions) (*http.Response
query.Add("force", "1")
}
httpClient := c.HTTPClient
u := c.getAPIPath(urlpath, query, doOptions.unversioned)
// Obtain a reader lock to prevent the http client from being
// modified underneath us during a do().
c.configLock.RLock()
defer c.configLock.RUnlock() // This defer matches both the initial and the above lock
req, err := http.NewRequest(method, u, params)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", userAgent)
if doOptions.data != nil {
req.Header.Set("Content-Type", "application/json")
} else if method == "POST" {
req.Header.Set("Content-Type", "plain/text")
}
if c.username != "" && c.secret != "" {
req.SetBasicAuth(c.username, c.secret)
}
for k, v := range doOptions.headers {
req.Header.Set(k, v)
}
httpClient := c.httpClient
endpoint := c.getAPIPath(urlpath, query, doOptions.unversioned)
// The doOptions Context is shared for every attempted request in the do.
ctx := doOptions.context
if ctx == nil {
ctx = context.Background()
}
resp, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
// If it is a custom error, return it. It probably knows more than us
if serror.IsStorageOSError(err) {
var failedAddresses = map[string]struct{}{}
c.addressLock.Lock()
var addresses = make([]string, len(c.addresses))
copy(addresses, c.addresses)
c.addressLock.Unlock()
for _, address := range addresses {
target := address + endpoint
req, err := http.NewRequest(method, target, params)
if err != nil {
// Probably should not try and continue if we're unable
// to create the request.
return nil, err
}
if strings.Contains(err.Error(), "connection refused") {
return nil, ErrConnectionRefused
req.Header.Set("User-Agent", c.userAgent)
if doOptions.data != nil {
req.Header.Set("Content-Type", "application/json")
} else if method == "POST" {
req.Header.Set("Content-Type", "plain/text")
}
if c.username != "" && c.secret != "" {
req.SetBasicAuth(c.username, c.secret)
}
return nil, chooseError(ctx, err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, newError(resp)
}
return resp, nil
}
// if error in context, return that instead of generic http error
func chooseError(ctx context.Context, err error) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return err
for k, v := range doOptions.headers {
req.Header.Set(k, v)
}
resp, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
// If it is a custom error, return it. It probably knows more than us
if serror.IsStorageOSError(err) {
switch serror.ErrorKind(err) {
case serror.APIUncontactable:
// If API isn't contactable we should try the next address
failedAddresses[address] = struct{}{}
continue
case serror.InvalidHostConfig:
// If invalid host or unknown error, we should report back
fallthrough
case serror.UnknownError:
return nil, err
}
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
if _, ok := err.(net.Error); ok {
// Be optimistic and try the next endpoint
failedAddresses[address] = struct{}{}
continue
}
return nil, err
}
}
// If we get to the point of response, we should move any failed
// addresses to the back.
failed := len(failedAddresses)
if failed > 0 {
// Copy addresses we think are okay into the head of the list
newOrder := make([]string, 0, len(addresses)-failed)
for _, addr := range addresses {
if _, exists := failedAddresses[addr]; !exists {
newOrder = append(newOrder, addr)
}
}
for addr := range failedAddresses {
newOrder = append(newOrder, addr)
}
c.addressLock.Lock()
// Bring in the new order
c.addresses = newOrder
c.addressLock.Unlock()
}
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, newError(resp) // These status codes are likely to be fatal
}
return resp, nil
}
return nil, netutil.ErrAllFailed(addresses)
}
func (c *Client) getAPIPath(path string, query url.Values, unversioned bool) string {
// The custom dialer contacts the hosts for us, making this hosname irrelevant
var urlStr string
if c.useTLS {
urlStr = "https://storageos-cluster"
} else {
urlStr = "http://storageos-cluster"
}
var apiPath = strings.TrimLeft(path, "/")
var apiPath string
path = strings.TrimLeft(path, "/")
if unversioned {
apiPath = fmt.Sprintf("%s/%s", urlStr, path)
if !unversioned {
apiPath = fmt.Sprintf("/%s/%s", c.requestedAPIVersion, apiPath)
} else {
apiPath = fmt.Sprintf("%s/%s/%s", urlStr, c.requestedAPIVersion, path)
apiPath = fmt.Sprintf("/%s", apiPath)
}
if len(query) > 0 {
@@ -441,23 +528,14 @@ func (e *Error) Error() string {
return fmt.Sprintf("API error (%s): %s", http.StatusText(e.Status), e.Message)
}
// defaultTransport returns a new http.Transport with the same default values
// as http.DefaultTransport, but with idle connections and keepalives disabled.
func defaultTransport(d Dialer) *http.Transport {
transport := defaultPooledTransport(d)
transport.DisableKeepAlives = true
transport.MaxIdleConnsPerHost = -1
return transport
}
// defaultPooledTransport returns a new http.Transport with similar default
// values to http.DefaultTransport. Do not use this for transient transports as
// it can leak file descriptors over time. Only use this for transports that
// will be re-used for the same host(s).
func defaultPooledTransport(d Dialer) *http.Transport {
func defaultPooledTransport(dialer Dialer) *http.Transport {
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: d.Dial,
Dial: dialer.Dial,
TLSHandshakeTimeout: 5 * time.Second,
DisableKeepAlives: false,
MaxIdleConnsPerHost: 1,
@@ -469,15 +547,13 @@ func defaultPooledTransport(d Dialer) *http.Transport {
// http.Client, but with a non-shared Transport, idle connections disabled, and
// keepalives disabled.
// If a custom dialer is not provided, one with sane defaults will be created.
func defaultClient(d Dialer) *http.Client {
if d == nil {
d = &net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 5 * time.Second,
}
func defaultClient() *http.Client {
dialer := &net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 5 * time.Second,
}
return &http.Client{
Transport: defaultTransport(d),
Transport: defaultPooledTransport(dialer),
}
}